In [1]:
import librosa
import torch
import torchaudio
from datasets import load_dataset
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
import audio2numpy as an


test_dataset = load_dataset("common_voice", "fi", split="test[:2%]")

processor = Wav2Vec2Processor.from_pretrained("aapot/wav2vec2-large-xlsr-53-finnish")
model = Wav2Vec2ForCTC.from_pretrained("aapot/wav2vec2-large-xlsr-53-finnish")

resampler = lambda sr, y: librosa.resample(y.numpy().squeeze(), sr, 16_000)

# Preprocessing the datasets.
# We need to read the audio files as arrays
def speech_file_to_array_fn(batch):
    speech_array, sampling_rate = an.open_audio(batch["path"])
    speech_array = torch.tensor(speech_array)
    batch["speech"] = resampler(sampling_rate, speech_array).squeeze()
    return batch

test_dataset = test_dataset.map(speech_file_to_array_fn)
inputs = processor(test_dataset["speech"][:2], sampling_rate=16_000, return_tensors="pt", padding=True)

with torch.no_grad():
    logits = model(inputs.input_values, attention_mask=inputs.attention_mask).logits


    
predicted_ids = torch.argmax(logits, dim=-1)

print("Prediction:", processor.batch_decode(predicted_ids))
print("Reference:", test_dataset["sentence"][:2])

Reusing dataset common_voice (/home/sampo/.cache/huggingface/datasets/common_voice/fi/6.1.0/0041e06ab061b91d0a23234a2221e87970a19cf3a81b20901474cffffeb7869f)
Special tokens have been added in the vocabulary, make sure the associated word embedding are fine-tuned or trained.
Loading cached processed dataset at /home/sampo/.cache/huggingface/datasets/common_voice/fi/6.1.0/0041e06ab061b91d0a23234a2221e87970a19cf3a81b20901474cffffeb7869f/cache-b1022a402f3c42fb.arrow


Prediction: ['mysteerimies oli oppinut morallinsa taruista elokuvista ja peleistä', 'äänestin mietinnön puolesta n']
Reference: ['Mysteerimies oli oppinut moraalinsa taruista, elokuvista ja peleistä.', 'Äänestin mietinnön puolesta!']


In [2]:
class SpeechRecognizer:
    
    def __init__(self, model, processor, device="cuda"):
        self.model = model
        model.to(device)
        self.processor = processor
        self.device = device
        
    def _prepareaudio(self, path : str):
        # load audio
        # regex
        audio, sr = an.open_audio(path)
        try:
            audio = audio[:, 1]
        except:
            pass
        audio = torch.tensor(audio)
        audio = librosa.resample(audio.numpy().squeeze(), sr, 16_000)
        return audio
    
    def decode(self, output: torch.tensor, mode: str = "argmax"):
        
        if mode=="argmax":
            pred_ids = torch.argmax(output, dim=-1)
            
        return pred_ids
    
    
    @torch.no_grad()
    def __call__(self, path: str):
        audio = self._prepareaudio(path)
        inputs = processor(audio, sampling_rate=16_000, return_tensors="pt", padding=True)
        logits = model(inputs.input_values.to(self.device), attention_mask=inputs.attention_mask.to(self.device)).logits
        pred_ids = self.decode(logits)
        prediction = processor.batch_decode(pred_ids)
        return prediction, logits
    
recog = SpeechRecognizer(model, processor, device="cuda")

pred, logits = recog("data/testi.mp3")
logits = logits.squeeze(0)
#pred = torch.argmax(logits[:,:, :], dim=-1)
#pred = processor.batch_decode(pred)
print(pred)

['saattaavatta riipasen kovan kännintäneän']


In [4]:
#help(processor.tokenizer)
vocab = processor.tokenizer.get_vocab()
print(vocab)

{'b': 0, 'y': 1, 'w': 2, 'u': 3, 'a': 4, 'r': 5, 'k': 6, 'e': 7, 'ö': 8, 'j': 9, 'm': 11, 'ä': 12, 'i': 13, 'h': 14, 'x': 15, 't': 16, 'z': 17, 'l': 18, 'c': 19, 'v': 20, 'f': 21, 'p': 22, 'q': 23, 'o': 24, 'n': 25, 'd': 26, 's': 27, 'g': 28, '|': 10, '[UNK]': 29, '[PAD]': 30, '<s>': 31, '</s>': 32}


# argmax decoder
1. first take argmax
2. merge repeats 
3. remove blanks
4. map ids to letters
5. change word separation to space

In [3]:
from itertools import groupby
import numpy as np
def invert_dict(dict):
    return {v: k for k, v in dict.items()}

dd = invert_dict(vocab)

def argmax_decode(probs, vocab, blank_id = 30):
    """
    argmax decoder for ctc
    uses a specific vocab
    
    inputs: probs: torch tensor of shape T,V
            vocab: vocab mapping ids to characters
            blank_id: the id of a blank token
            
    output: decoded sequence of characters
    
    """
    #argmax
    pred_ids = torch.argmax(probs, dim=-1)
    pred_ids = pred_ids.numpy()
    #merge repeats
    merged = np.array([k for k,_ in groupby(pred_ids)])
    #remove blanks
    result = merged[merged!=30]
    #map with dictionary
    result = [dd[i] for i in result]
    #remove extra spaces
    result = result
    
    return "".join(result).replace('|', ' ')
    
    
#argmax_decode(logits, dd)

NameError: name 'vocab' is not defined

In [20]:
vals = sorted(vocab.items(), key = lambda x:x[1])
labels = "".join([x[0] for x in vals[:-2]])
print(labels)

bywuarkeöj|mäihxtzlcvfpqondsg[UNK][PAD]


In [7]:
import collections
NEG_INF = -float("inf")


def map_to_chars(ids, vocab):
    result = [vocab[i] for i in ids]
    return "".join(result).replace('|', ' ')

In [8]:
import IPython.display as ipd
ipd.Audio("data/testi.mp3")

In [10]:
from sample_decode import decode
import time

t1 = time.time()
pred, logits = recog("data/testi.mp3")
t2 = time.time()
logits = logits.squeeze(0)
print(logits.cpu().numpy().shape)
t3 = time.time()
ids = decode(logits.softmax(dim=1).cpu().numpy(), beam_size=2048, blank=30, prune_t = 0.00001)
t4 = time.time()
print(map_to_chars(ids[0], dd))
print(argmax_decode(logits.cpu(), dd))
print(t2-t1)
print(t4-t3)

(273, 31)
saattaavatta riipasen kovan kännintäneän 
saattaavatta riipasen kovan kännintäneän 
0.2827317714691162
3.308340311050415


In [21]:
from ctcdecode import CTCBeamDecoder

pred, logits = recog("data/testi.mp3")

probs = logits.softmax(dim=2).cpu()

decoder = CTCBeamDecoder(
    labels,
    model_path=None,
    alpha=0,
    beta=0,
    cutoff_top_n=10,
    cutoff_prob=1.0,
    beam_width=100,
    num_processes=4,
    blank_id=30,
    log_probs_input=False
)

beam_results, beam_scores, timesteps, out_lens = decoder.decode(output)