# ASR: Experiment 3, Whisper vs Wav2Vec2 vs Vosk

In [None]:
# shared variables

# indicates how many lines of the data sets we should go through (max for the drive is 3994)
LIMIT = 20
# indicates if we need to install the vosk model (since it takes a bit, 1.8 GB)
INSTALL = False
# indicates if we need to convert the files from .mp3 to .wav for Vosk
CONVERT = False
# indicates if we want to perform a clean run (get rid of everything installed for Vosk)
CLEAN = False

In [None]:
# clean up functions for a clean run, plus removes th egarbage from the installation
import shutil

if CLEAN:
  try:
    shutil.rmtree('/content/vosk-model-en-us-0.22')
    shutil.rmtree('/content/vosk-api')
    shutil.rmtree('/content/sample_data')
  except Exception:
    print("No folder to clean, continuing..")

In [None]:
# shared install
!apt install ffmpeg
!pip install tqdm
!pip install transformers

In [None]:
# shared imports
from os import path
from tqdm import tqdm
from collections import Counter
from google.colab import drive
import sys
import string
import re
import math
import pandas as pd
import numpy as np
import torch
import torchaudio

In [None]:
# mounting the drive with the dataset
drive.mount('/content/drive')

In [None]:
# loading in the dataset csv
# data keys are: filename, text, up_votes, down_votes, age, gender, accent and duration
data = pd.read_csv("/content/drive/MyDrive/cv-valid-test.csv")

In [None]:
# shared utility functions

# gets short, average and long keywords in a sentence
def grab_keywords(sentence):
    words = sentence.split(" ")
    keywords = [[],[],[]]
    for i in words:
        if len(i) <= 3:
            keywords[0].append(i)
    for i in words:
        if len(i) == 4 or len(i) == 5:
            keywords[1].append(i)
    for i in words:
        if len(i) > 5:
            keywords[2].append(i)
    return [" ".join(keywords[0])," ".join(keywords[1])," ".join(keywords[2])]

# mitch scoring stuff
def cosinesimularity(vec1, vec2):
    intersection = set(vec1.keys()) & set(vec2.keys())
    numerator = sum([vec1[x] * vec2[x] for x in intersection])
    sum1 = sum([vec1[x]**2 for x in list(vec1.keys())])
    sum2 = sum([vec2[x]**2 for x in list(vec2.keys())])
    denominator = math.sqrt(sum1) * math.sqrt(sum2)
    if not denominator:
        return 0.0
    else:
        return float(numerator) / denominator

def text_to_vector(text):
    WORD = re.compile(r"\w+")
    words = WORD.findall(text)
    return Counter(words)

def get_cosinesimularity(text1, text2):
    if text1 == '' and text2 == '':
      return -1
    vector1 = text_to_vector(text1.lower().translate(str.maketrans('', '', string.punctuation)))
    vector2 = text_to_vector(text2.lower().translate(str.maketrans('', '', string.punctuation)))
    cosine = cosinesimularity(vector1, vector2)
    return cosine

# used for final prints
def get_label(index):
    if index == 0:
      return 'short'
    if index == 1:
      return 'average'
    if index == 2:
      return 'long'

**Whisper**

In [None]:
# whisper imports
from transformers import WhisperProcessor, WhisperForConditionalGeneration

In [None]:
# load model and processor
whisper_processor = WhisperProcessor.from_pretrained("openai/whisper-base.en")
whisper_med_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-base.en")

In [None]:
# WHISPER FULL TRANSCRIPT SCORE

scores = []

print("[!] Starting experiment")

for index, samp in enumerate(tqdm(data.iterrows())):
  # limiter for testing
  if index == 20:
    break
  file_path = "/content/drive/MyDrive/"+samp[1][0]

  # load the sample
  waveform, sample_rate = torchaudio.load(file_path)

  # whisper only works with 16k sample rate so if we have others we convert it
  if sample_rate != 16000:
     waveform = torchaudio.functional.resample(waveform, sample_rate, 16000)

  # tokenize
  input_features = whisper_processor(waveform.numpy()[0], sampling_rate=16000, return_tensors="pt").input_features

  # generate token ids
  predicted_ids = whisper_med_model.generate(input_features)

  # decode token ids to text
  transcript = whisper_processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
  text = samp[1][1]

  # debug print
  print()
  # print(text)
  # print(transcript)

  # filling scores
  score = get_cosinesimularity(text, transcript)
  scores.append(score)

print("[!] End of experiment, aggregating scores")

print("[*] Mean scores")
print("Mean score for full sentences: {}".format(np.mean(scores)))
print("[*] Median scores")
print("Median score for full sentences: {}".format(np.median(scores)))

print("[!] Complete")

In [None]:
# WHISPER KEYWORDS SCORE (<=3 && 4,5 && >5)

scores = [[],[],[]]

print("[!] Starting experiment")

for index, samp in enumerate(tqdm(data.iterrows())):
  # limiter for testing
  if index == LIMIT:
    break
  file_path = "/content/drive/MyDrive/"+samp[1][0]

  # load the sample
  waveform, sample_rate = torchaudio.load(file_path)

  # whisper only works with 16k sample rate so if we have others we convert it
  if sample_rate != 16000:
     waveform = torchaudio.functional.resample(waveform, sample_rate, 16000)

  # tokenize
  input_features = whisper_processor(waveform.numpy()[0], sampling_rate=16000, return_tensors="pt").input_features

  # generate token ids
  predicted_ids = whisper_med_model.generate(input_features)

  # decode token ids to text
  transcript = whisper_processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
  text = samp[1][1]

  # get keywords of different length
  keywords_tr = grab_keywords(transcript)
  keywords_te = grab_keywords(text)

  # debug print
  print()
  # print(keywords_tr)
  # print(keywords_te)

  # filling scores
  for i in range(3):
    score = get_cosinesimularity(keywords_tr[i], keywords_te[i])
    if score != -1:
      scores[i].append(score)

print("[!] End of experiment, aggregating scores")

print("[*] Mean scores")
for i in range(3):
    print("Mean score for {} words: {}".format(get_label(i), np.mean(scores[i])))
print("[*] Median scores")
for i in range(3):
    print("Median score for {} words: {}".format(get_label(i), np.median(scores[i])))

print("[!] Complete")

**Wav2vec**

In [None]:
# imports
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC

In [None]:
# load processor and model
wav2vec_processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
wav2vec_model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")

In [None]:
# WAV2VEC FULL TRANSCRIPT SCORE

scores = []

print("[!] Starting experiment")

for index, samp in enumerate(tqdm(data.iterrows())):
  # limiter for testing
  if index == LIMIT:
    break
  file_path = "/content/drive/MyDrive/"+samp[1][0]

  # load the sample
  waveform, sample_rate = torchaudio.load(file_path)

  # wav2vec only works with 16k sample rate so if we have others we convert it
  if sample_rate != 16000:
     waveform = torchaudio.functional.resample(waveform, sample_rate, 16000)

  # tokenize
  input_values = wav2vec_processor(waveform.numpy()[0], sampling_rate=16000, return_tensors="pt", padding="longest").input_values

  # retrieve logits
  logits = wav2vec_model(input_values).logits

  # take argmax and decode
  predicted_ids = torch.argmax(logits, dim=-1)
  transcript = wav2vec_processor.batch_decode(predicted_ids)[0]

  text = samp[1][1]

  # debug print
  print()
  # print(text)
  # print(transcript)

  # filling scores
  score = get_cosinesimularity(text, transcript)
  scores.append(score)

print("[!] End of experiment, aggregating scores")

print("[*] Mean scores")
print("Mean score for full sentences: {}".format(np.mean(scores)))
print("[*] Median scores")
print("Median score for full sentences: {}".format(np.median(scores)))

print("[!] Complete")

In [None]:
# WAV2VEC KEYWORDS SCORE (<=3 && 4,5 && >5)

scores = [[],[],[]]

print("[!] Starting experiment")

for index, samp in enumerate(tqdm(data.iterrows())):
  # limiter for testing
  if index == LIMIT:
    break
  file_path = "/content/drive/MyDrive/"+samp[1][0]
  # load the sample
  waveform, sample_rate = torchaudio.load(file_path)

  # wav2vec only works with 16k sample rate so if we have others we convert it
  if sample_rate != 16000:
     waveform = torchaudio.functional.resample(waveform, sample_rate, 16000)

  # tokenize
  input_values = wav2vec_processor(waveform.numpy()[0], sampling_rate=16000, return_tensors="pt", padding="longest").input_values

  # retrieve logits
  logits = wav2vec_model(input_values).logits

  # take argmax and decode
  predicted_ids = torch.argmax(logits, dim=-1)
  transcript = wav2vec_processor.batch_decode(predicted_ids)[0]

  text = samp[1][1]

  # get keywords of different length
  keywords_tr = grab_keywords(transcript)
  keywords_te = grab_keywords(text)

  # debug print
  print()
  # print(keywords_tr)
  # print(keywords_te)

  # filling scores
  for i in range(3):
    score =  get_cosinesimularity(keywords_tr[i], keywords_te[i])
    if score != -1:
      scores[i].append(score)

print("[!] End of experiment, aggregating scores")

print("[*] Mean scores")
for i in range(3):
    print("Mean score for {} words: {}".format(get_label(i), np.mean(scores[i])))
print("[*] Median scores")
for i in range(3):
    print("Median score for {} words: {}".format(get_label(i), np.median(scores[i])))

print("[!] Complete")

**Vosk**

In [None]:
# install vosk api and pydub [once per session]
!pip install vosk
# !git clone https://github.com/alphacep/vosk-api
!pip install pydub

In [None]:
# get model [once per session]
if INSTALL:
  !wget https://alphacephei.com/kaldi/models/vosk-model-en-us-0.22.zip
  !unzip vosk-model-en-us-0.22.zip
  %mv vosk-model-en-us-0.22

In [None]:
# imports
from vosk import Model, KaldiRecognizer
from pydub import AudioSegment
import wave
import json

In [None]:
# convert files for experiments [one time thing]
if CONVERT:
  for index, samp in enumerate(tqdm(data.iterrows())):
    # get full path of sample
    file_path = "/content/drive/MyDrive/"+samp[1][0]
    # destination path for sample
    dest = file_path.replace('.mp3', '.wav')
    # we skip the ones already converted
    if path.isfile(dest):
      continue
    # convert wav to mp3, vosk only uses WAV mono PCM
    sound = AudioSegment.from_mp3(file_path)
    sound.export(dest, format="wav")
    print()
    print("Converted {} to {}".format(file_path, dest))

In [None]:
# import model
vosk_model = Model("/content/vosk-model-en-us-0.22")

In [None]:
# VOSK FULL TRANSCRIPT SCORE
scores = []

print("[!] Starting experiment")

for index, samp in enumerate(tqdm(data.iterrows())):
  # limiter for testing
  if index == LIMIT:
    break

  # get full path of sample
  file_path = "/content/drive/MyDrive/"+samp[1][0]

  # destination path for sample
  dest = file_path.replace('.mp3', '.wav')

  # open file
  wf = wave.open(dest,"rb")

  # check if the conversion is proper
  if wf.getnchannels() != 1 or wf.getsampwidth() != 2 or wf.getcomptype() != "NONE":
    exit (1)

  # initialise recogniser
  rec = KaldiRecognizer(vosk_model, wf.getframerate())

  # recognition loop
  while True:
    d = wf.readframes(4000)
    if len(d) == 0:
        break
    rec.AcceptWaveform(d)

  text = samp[1].text
  transcript = json.loads(rec.FinalResult())["text"]

  # debug print
  print()
  # print(text)
  # print(transcript)

  # filling scores
  score =  get_cosinesimularity(transcript, text)
  scores.append(score)

print("[!] End of experiment, aggregating scores")

print("[*] Mean scores")
print("Mean score for full sentences: {}".format(np.mean(scores)))
print("[*] Median scores")
print("Median score for full sentences: {}".format(np.median(scores)))

print("[!] Complete")

In [None]:
# VOSK KEYWORDS SCORE (<=3 && 4,5 && >5)

scores = [[],[],[]]

print("[!] Starting experiment")

for index, samp in enumerate(tqdm(data.iterrows())):

  # limiter for testing
  if index == LIMIT:
    break

  # get full path of sample
  file_path = "/content/drive/MyDrive/"+samp[1][0]

  # destination path for sample
  dest = file_path.replace('.mp3', '.wav')

  # open file
  wf = wave.open(dest,"rb")

  # check if the conversion is proper
  if wf.getnchannels() != 1 or wf.getsampwidth() != 2 or wf.getcomptype() != "NONE":
    exit (1)

  # initialise recogniser
  rec = KaldiRecognizer(vosk_model, wf.getframerate())

  # recognition loop
  while True:
    d = wf.readframes(4000)
    if len(d) == 0:
        break
    rec.AcceptWaveform(d)

  text = samp[1].text
  transcript = json.loads(rec.FinalResult())["text"]

  # get keywords of different length
  keywords_tr = grab_keywords(transcript)
  keywords_te = grab_keywords(text)

  # debug print
  print()
  # print(keywords_tr)
  # print(keywords_te)

  # filling scores
  for i in range(3):
    score =  get_cosinesimularity(keywords_tr[i], keywords_te[i])
    if score != -1:
      scores[i].append(score)

print("[!] End of experiment, aggregating scores")

print("[*] Mean scores")
for i in range(3):
    print("Mean score for {} words: {}".format(get_label(i), np.mean(scores[i])))
print("[*] Median scores")
for i in range(3):
    print("Median score for {} words: {}".format(get_label(i), np.median(scores[i])))

print("[!] Complete")