In [1]:
import numpy as np
import os
import pandas as pd
import deepspeech
import sox
import wave
import logging
import pydub 
from pydub.playback import play
import time

In [2]:
from IPython.display import clear_output

In [3]:
# TODO(MMAZ) lower() and remove punctuation, etc
# pasted from https://stackoverflow.com/a/32558749
def levenshteinDistance(s1, s2):
    if len(s1) > len(s2):
        s1, s2 = s2, s1

    distances = range(len(s1) + 1)
    for i2, c2 in enumerate(s2):
        distances_ = [i2+1]
        for i1, c1 in enumerate(s1):
            if c1 == c2:
                distances_.append(distances[i1])
            else:
                distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1])))
        distances = distances_
    return distances[-1]

In [4]:
model = deepspeech.Model("../deepspeech/deepspeech-0.7.0-models.pbmm")

In [5]:
def deepspeech_inference(
    dataset_ix, df, clips_path, model, tmp_wav_filepath="./tmp/tmp.wav", rate=16000
):
    mp3_path = clips_path + df.iloc[dataset_ix].path
    # convert to wav (will overwrite)
    transformer = sox.Transformer()
    transformer.convert(samplerate=rate)
    transformer.build(mp3_path, tmp_wav_filepath)
    wav = wave.open(tmp_wav_filepath, "rb")
    audio = np.frombuffer(wav.readframes(wav.getnframes()), np.int16)
    # choose the likeliest inference
    return mp3_path, model.sttWithMetadata(audio, 1)


def metadata_to_string(metadata):
    return "".join(token.text for token in metadata.tokens)


def listen_to_split_on_boundaries(inference, clip, extent_ms=0):
    tokens = inference.transcripts[0].tokens
    spaces = [ix for ix, token in enumerate(tokens) if token.text == " "]
    word_boundaries = [0] + list(map(lambda ix: ix + 1, spaces))
    if word_boundaries[-1] < len(tokens):
        logging.warning("deepspeech.stt returned an inference ending in a space")
        word_boundaries = word_boundaries[:-1]

    text = metadata_to_string(inference.transcripts[0])

    # everything but the last word
    # TODO(MMAZ) look for bugs/can this be improved?
    ix = 0
    for start_token, end_token in zip(word_boundaries, spaces):
        print("word", ix, ": ", text[start_token:end_token])
        ix += 1
        # include the space before the word boundary
        start_token = np.maximum(0, start_token - 1)
        start_ms = tokens[start_token].start_time * 1000 - extent_ms
        # don't include audio before the start
        start_ms = np.maximum(0, start_ms)
        end_ms = tokens[end_token].start_time * 1000 + extent_ms
        play(clip[start_ms:end_ms])
        time.sleep(1)

    # the last word
    print("word", ix, ": ", text[end_token:])
    start_ms = tokens[word_boundaries[-1]].start_time * 1000 - extent_ms
    start_ms = np.maximum(0, start_ms)
    play(clip[start_ms:])


def listen_to_sample(df, ix, clips_path, model):
    print("Sample Index:", ix)
    gt = df.iloc[ix].sentence
    print("Groundtruth:", gt)
    gt_wc = gt.count(" ") + 1
    print("Groundtruth # words:", gt_wc)

    mp3_path, inference = deepspeech_inference(ix, df, clips_path, model)
    text = metadata_to_string(inference.transcripts[0])
    print("Inference:", text)
    text_wc = text.count(" ") + 1
    print("Inference # words", text_wc)
    print("Levenshtein Distance:", levenshteinDistance(gt, text))
    clip = pydub.AudioSegment.from_mp3(mp3_path)
    play(clip)
    return inference, clip


In [6]:
clips_path = "../mozilla_common_voice/clips/" # english corpus
df = pd.read_csv("../mozilla_common_voice/train.tsv", sep="\t")
print("number of mp3s in training set: ", df.shape[0])

number of mp3s in training set:  232975


Listen to an example clip:

In [7]:
ix = 194633 #3829
print(ix, '---', df.iloc[ix].sentence)
mp3_path = clips_path + df.iloc[ix].path
pydub.AudioSegment.from_mp3(mp3_path)

194633 --- All work was done by robots.


In [None]:
for example in range(1):
    print("Example", example)
    sample = np.random.randint(df.shape[0])
    inference, clip = listen_to_sample(df=df, ix=sample, clips_path=clips_path, model=model)
    listen_to_split_on_boundaries(inference, clip)
    print(inference)
    time.sleep(1)
    clear_output(wait=True)