# Live Colab Example


## Dependencies and Imports

In [1]:
# #@title Install dependencies

# !pip install -q omegaconf
# !pip install -q torchaudio
# !pip install -q soundfile
# !pip install -q pydub

In [2]:
import soundfile

In [3]:
import os
import torch
import random
import base64
import tempfile
import warnings
import torchaudio
import soundfile

from os.path import exists
from glob import glob
from omegaconf import OmegaConf
from typing import List, Optional
from itertools import groupby
from pydub import AudioSegment

  '"sox" backend is being deprecated. '


In [4]:
# Utils

torchaudio.USE_SOUNDFILE_LEGACY_INTERFACE = False

torchaudio.set_audio_backend("soundfile")  # switch backend

def read_batch(audio_paths: List[str]):
    return [read_audio(audio_path)
            for audio_path
            in audio_paths]


def split_into_batches(lst: List[str],
                       batch_size: int = 10):
    return [lst[i:i + batch_size]
            for i in
            range(0, len(lst), batch_size)]


def read_audio(path: str,
               target_sr: int = 16000):

    assert torchaudio.get_audio_backend() == 'soundfile'
#     wav, sr = torchaudio.load(path,
#                               normalization=True,
#                               channels_first=True)
    
    wav, sr = torchaudio.load(path)

    if wav.size(0) > 1:
        wav = wav.mean(dim=0, keepdim=True)

    if sr != target_sr:
        transform = torchaudio.transforms.Resample(orig_freq=sr,
                                                   new_freq=target_sr)
        wav = transform(wav)
        sr = target_sr

    assert sr == target_sr
    return wav.squeeze(0)


def prepare_model_input(batch: List[torch.Tensor],
                        device=torch.device('cpu')):
    
    max_seqlength = max(max([len(_) for _ in batch]), 12800)
    inputs = torch.zeros(len(batch), max_seqlength)
    for i, wav in enumerate(batch):
        inputs[i, :len(wav)].copy_(wav)
    inputs = inputs.to(device)
    return inputs


class Decoder():
    def __init__(self,
                 labels: List[str]):
        self.labels = labels
        self.blank_idx = self.labels.index('_')

    def process(self,
                probs):
        assert len(self.labels) == probs.shape[1]
        for_string = []
        argm = torch.argmax(probs, axis=1)
        for i in argm:
            if i == self.labels.index('2'):
                try:
                    prev = for_string[-1]
                    for_string.append('$')
                    for_string.append(prev)
                    continue
                except:
                    for_string.append(' ')
                    warnings.warn('Token "2" detected a the beginning of sentence, omitting')
            if i != self.blank_idx:
                for_string.append(self.labels[i])
        string = ''.join([x[0] for x in groupby(for_string)]).replace('$', '').strip()
        return string

    def __call__(self,
                 probs: torch.Tensor):
        return self.process(probs)


def init_jit_model(model_url: str,
                   device: torch.device = torch.device('cpu')):
    
    torch.set_grad_enabled(False)
    
    with tempfile.NamedTemporaryFile('wb', suffix='.model') as f:
        torch.hub.download_url_to_file(model_url,
                                       f.name,
                                       progress=True)
        model = torch.jit.load(f.name, map_location=device)
        model.eval()
    return model, Decoder(model.labels)

In [5]:
#
# a modified version of this script https://github.com/magenta/ddsp/blob/master/ddsp/colab/colab_utils.py
# modified in line with the rest of examples code
#
# from google.colab import files
# from google.colab import output

from IPython import display as _display

# from IPython.display import Audio, display, clear_output


def record_audio(seconds: int = 3,
                 normalize_db: float = 0.1):
    # Use Javascript to record audio.
    record_js_code = """
      const sleep  = time => new Promise(resolve => setTimeout(resolve, time))
      const b2text = blob => new Promise(resolve => {
        const reader = new FileReader()
        reader.onloadend = e => resolve(e.srcElement.result)
        reader.readAsDataURL(blob)
      })
      var record = time => new Promise(async resolve => {
        stream = await navigator.mediaDevices.getUserMedia({ audio: true })
        recorder = new MediaRecorder(stream)
        chunks = []
        recorder.ondataavailable = e => chunks.push(e.data)
        recorder.start()
        await sleep(time)
        recorder.onstop = async ()=>{
          blob = new Blob(chunks)
          text = await b2text(blob)
          resolve(text)
        }
        recorder.stop()
      })
      """
    print(f'Starting recording for {seconds} seconds...')
    _display.display(_display.Javascript(record_js_code))
    audio_string = output.eval_js('record(%d)' % (seconds * 1000.0))
    print('Finished recording!')
    audio_bytes = base64.b64decode(audio_string.split(',')[1])
    return audio_bytes_to_np(audio_bytes,
                             normalize_db=normalize_db)


def audio_bytes_to_np(wav_data: bytes,
                      normalize_db: float = 0.1):
    # Parse and normalize the audio.
    audio = AudioSegment.from_file(io.BytesIO(wav_data))
    audio.remove_dc_offset()
    if normalize_db is not None:
        audio.normalize(headroom=normalize_db)
    # Save to tempfile and load with librosa.
    with tempfile.NamedTemporaryFile(suffix='.wav') as temp_wav_file:
        fname = temp_wav_file.name
        audio.export(fname, format='wav')
        wav = read_audio(fname)
    return wav


def upload_audio(normalize_db: Optional[float] = None):
#     audio_files = files.upload()
    audio_files = "./1183-124566-0005.wav"
#     fnames = list(audio_files.keys())
#     if len(fnames) == 0:
#         return None
#     return read_audio(fnames[0])
    return read_audio(audio_files)

In [6]:
# import torchaudio.functional

In [7]:
# from utils import (init_jit_model, 
#                    split_into_batches,
#                    read_audio,
#                    read_batch,
#                    prepare_model_input)

# from colab_utils import (record_audio,
#                          audio_bytes_to_np,
#                          upload_audio)

device = torch.device('cpu')   # you can use any pytorch device
models = OmegaConf.load('models.yml')

# imports for uploading/recording
import numpy as np
import ipywidgets as widgets
from scipy.io import wavfile
from IPython.display import Audio, display, clear_output
# from torchaudio.functional import vad


# wav to text method
def wav_to_text(f='test.wav'):
    batch = read_batch([f])
    input = prepare_model_input(batch, device=device)
    output = model(input)
    return decoder(output[0].cpu())

## Transcribe

In [8]:
# model, decoder = init_jit_model(models.stt_models.en.latest.jit, device=device)

In [9]:
#@markdown { run: "auto" }

language = "English" #@param ["English", "German", "Spanish"]

print(language)

# if language == 'German':
#     model, decoder = init_jit_model(models.stt_models.de.latest.jit, device=device)
# elif language == "Spanish":
#     model, decoder = init_jit_model(models.stt_models.es.latest.jit, device=device)
# else:

model, decoder = init_jit_model(models.stt_models.en.latest.jit, device=device)

English


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=182379714.0), HTML(value='')))




In [10]:
#@markdown { run: "auto" }

use_VAD = "No" #@param ["Yes", "No"]
# use_VAD = "Yes" #@param ["Yes", "No"]

In [11]:
#@markdown Either record audio from microphone or upload audio from file (.mp3 or .wav) { run: "auto" }

record_or_upload = "Upload" #@param ["Record", "Upload (.mp3 or .wav)"]
record_seconds = 5 #@param {type:"number", min:1, max:10, step:1}
sample_rate = 16000

def _apply_vad(audio, boot_time=0, trigger_level=9, **kwargs):
    print('\nVAD applied\n')
    vad_kwargs = dict(locals().copy(), **kwargs)
    vad_kwargs['sample_rate'] = sample_rate
    del vad_kwargs['kwargs'], vad_kwargs['audio']
    audio = vad(torch.flip(audio, ([0])), **vad_kwargs)
    return vad(torch.flip(audio, ([0])), **vad_kwargs)

def _recognize(audio):
    display(Audio(audio, rate=sample_rate, autoplay=True))
    if use_VAD == "Yes":
        audio = _apply_vad(audio)
    wavfile.write('test.wav', sample_rate, (32767*audio).numpy().astype(np.int16))
    transcription = wav_to_text()
    print('\n\nTRANSCRIPTION:\n')
    print(transcription)
    return transcription

def _record_audio(b):
    clear_output()
    audio = record_audio(record_seconds)
    wavfile.write('recorded.wav', sample_rate, (32767*audio).numpy().astype(np.int16))
    _recognize(audio)

def _upload_audio(b):
    clear_output()
    audio = upload_audio()
    transc = _recognize(audio)
    return audio, transc

if record_or_upload == "Record":
    button = widgets.Button(description="Record Speech")
    button.on_click(_record_audio)
    display(button)
else:
    audio, transcription = _upload_audio("")



TRANSCRIPTION:

twenty nine


  result = self.forward(*input, **kwargs)
  result = self.forward(*input, **kwargs)


In [28]:
transcription

'twenty nine'

In [1]:
# #@markdown Check audio after applying VAD { run: "auto" }

# if record_or_upload == "Record":
#     audio = read_audio('recorded.wav', sample_rate)
    
# display(Audio(_apply_vad(audio), rate=sample_rate, autoplay=True))

# PyTorch Example


In [None]:
#@title Install Dependencies

# this assumes that you have a relevant version of PyTorch installed
# !pip install -q torchaudio
# !pip install -q omegaconf
# !pip install -q soundfile

# import os
# from os.path import exists

# if not exists('silero-models'):
#     !git clone -q --depth 1 https://github.com/snakers4/silero-models

# %cd silero-models

# import torch
# import random
# from glob import glob
# from omegaconf import OmegaConf
# from utils import (init_jit_model, 
#                    split_into_batches,
#                    read_batch,
#                    prepare_model_input)
# from IPython.display import display, Audio

In [None]:
#@title Random English Validation Dataset (optional)

# if not exists('scottish_english_female'):
#   !wget http://www.openslr.org/resources/83/scottish_english_female.zip
#   !unzip -qq scottish_english_female.zip -d scottish_english_female

In [None]:
#@title Random Spanish Validation Dataset (optional)

# if not exists('es_pr_female'):
#   !wget http://www.openslr.org/resources/74/es_pr_female.zip
#   !unzip -qq es_pr_female.zip -d es_pr_female

## Example cells

In [None]:
models = OmegaConf.load('models.yml')  # all available models are listed in the yml file
print(list(models.stt_models.keys()),
      list(models.stt_models.en.keys()),
      list(models.stt_models.en.latest.keys()),
      models.stt_models.en.latest.jit)
device = torch.device('cpu')   # you can use any pytorch device
model, decoder = init_jit_model(models.stt_models.en.latest.jit, device=device)

In [None]:
device = torch.device('cpu')   # you can use any pytorch device
model, decoder = init_jit_model(models.stt_models.en.latest.jit, device=device)

In [None]:
# test_files = glob('path/to/your/file/*.opus')
test_files = glob('scottish_english_female/*.wav')  # replace with your data
batches = split_into_batches(test_files, batch_size=10)

In [None]:
# transcribe a set of files
input = prepare_model_input(read_batch(random.sample(batches, k=1)[0]),
                            device=device)
output = model(input)
for example in output:
    print(decoder(example.cpu()))

In [None]:
# listen to one file
batch = read_batch(random.sample(batches, k=1)[0])
input = prepare_model_input(batch,
                            device=device)
output = model(input)

for i, example in enumerate(output):
    print(decoder(example.cpu()))
    display(Audio(batch[i], rate=16000))  # audio was resampled to 16kHz
    break

# ONNX Example

In [None]:
#@title Install and Import Dependencies

# this assumes that you have a relevant version of PyTorch installed
!pip install -q torchaudio omegaconf soundfile onnx onnxruntime

import onnx
import torch
import onnxruntime
from omegaconf import OmegaConf

## Example Cells

In [None]:
language = 'en' # also available 'de', 'es'

# load provided utils
_, decoder, utils = torch.hub.load(github='snakers4/silero-models', model='silero_stt', language=language)
(read_batch, split_into_batches,
 read_audio, prepare_model_input) = utils

In [None]:
# see available models
torch.hub.download_url_to_file('https://raw.githubusercontent.com/snakers4/silero-models/master/models.yml', 'models.yml')
models = OmegaConf.load('models.yml')
available_languages = list(models.stt_models.keys())
assert language in available_languages

In [None]:
# load the actual ONNX model
torch.hub.download_url_to_file(models.stt_models.en.latest.onnx, 'model.onnx', progress=True)
onnx_model = onnx.load('model.onnx')
onnx.checker.check_model(onnx_model)
ort_session = onnxruntime.InferenceSession('model.onnx')

In [None]:
# download a single file, any format compatible with TorchAudio (soundfile backend)
torch.hub.download_url_to_file('https://opus-codec.org/static/examples/samples/speech_orig.wav', dst ='speech_orig.wav', progress=True)
test_files = ['speech_orig.wav']
batches = split_into_batches(test_files, batch_size=10)
input = prepare_model_input(read_batch(batches[0]))

In [None]:
# actual onnx inference and decoding
onnx_input = input.detach().cpu().numpy()[0]
ort_inputs = {'input': onnx_input}
ort_outs = ort_session.run(None, ort_inputs)
decoded = decoder(torch.Tensor(ort_outs[0]))
print(decoded)

# TensorFlow Example

In [None]:
#@title Install and Import Dependencies

# this assumes that you have a relevant version of PyTorch installed
!pip install -q torchaudio omegaconf soundfile

import os
import torch
import tensorflow as tf
import tensorflow_hub as tf_hub
from omegaconf import OmegaConf

## Example cells

In [None]:
language = 'en' # also available 'de', 'es'

# load provided utils using torch.hub for brevity
_, decoder, utils = torch.hub.load(github='snakers4/silero-models', model='silero_stt', language=language)
(read_batch, split_into_batches,
 read_audio, prepare_model_input) = utils

In [None]:
# see available models
torch.hub.download_url_to_file('https://raw.githubusercontent.com/snakers4/silero-models/master/models.yml', 'models.yml')
models = OmegaConf.load('models.yml')
available_languages = list(models.stt_models.keys())
assert language in available_languages

# load the actual tf model
tf_model = tf_hub.load(models.stt_models.en.latest.tf)

In [None]:
# download a single file, any format compatible with TorchAudio (soundfile backend)
torch.hub.download_url_to_file('https://opus-codec.org/static/examples/samples/speech_orig.wav', dst ='speech_orig.wav', progress=True)
test_files = ['speech_orig.wav']
batches = split_into_batches(test_files, batch_size=10)
input = prepare_model_input(read_batch(batches[0]))

In [None]:
# tf inference
res = tf_model.signatures["serving_default"](tf.constant(input.numpy()[0]))['output_0']
print(decoder(torch.Tensor(res.numpy())))