## Setup environment

In [1]:
%pip install mmdet==2.13.0
%pip install Cython==0.29.5
%pip install torch==1.9.0 torchvision==0.10.0
%pip install nemo_toolkit['all']
%pip install ffmpeg

In [43]:
import os
import glob
import json
import wget
import copy
import scipy
import ffmpeg
import tarfile
import zipfile
import numpy as np 
import pandas as pd

import librosa
from librosa.display import waveplot
import IPython.display as ipd

import pytorch_lightning as pl
import matplotlib.pyplot as plt

import nemo
import nemo.collections.asr as nemo_asr
from nemo.collections.asr.metrics.wer import WER, word_error_rate
from nemo.collections.asr.models import EncDecCTCModel
import torch 

from tqdm import tqdm
from shutil import copy
from scipy.io import wavfile
from omegaconf import DictConfig
from contextlib import contextmanager

## Functions for dataset extraction and augmentation

In [7]:
def download_extract_libri(data_dir, dataset_name):
    '''
    downloading the tar file with dataset and extract it into the directory
    '''
    if not os.path.exists(os.path.join(data_dir, f'{dataset_name}.tar.gz')):
        libri_url = f'http://www.openslr.org/resources/12/{dataset_name}.tar.gz'
        libri_path = wget.download(libri_url, data_dir)
        print(f"Dataset in .tar format downloaded at: {libri_path}")
    else:
        print("Tarfile already exists")
        libri_path = os.path.join(data_dir, f'{dataset_name}.tar.gz')
    extracted_dir = os.path.join(data_dir, 'Librispeech')
    if not os.path.exists(extracted_dir):
        tar = tarfile.open(libri_path)
        tar.extractall(path=data_dir)
        print(f"Tarfile extracted in {extracted_dir}")
    else:
        print(f"Tarfile already extracted in {extracted_dir}")
        
def flac2wav(data_dir, dataset_name):
    '''
    converting flac to wav
    '''
    print('Converting flac to wav....')
    flac_list = glob.glob(os.path.join(data_dir, f'LibriSpeech/{dataset_name}/**/**/*.flac'))  
    for flac_path in tqdm(flac_list, position=0, leave=False):
        wav_path = flac_path[:-5] + '.wav'
        os.system(f'ffmpeg -i {flac_path} {wav_path}')
        
    wav_files = glob.glob(os.path.join(data_dir, f'LibriSpeech/{dataset_name}/**/**/*.wav'))
    broken_files = [file for file in wav_files if os.path.getsize(file) < 100]
    assert len(broken_files) == 0
    for flac_path in flac_list:
        os.remove(flac_path)
    print('Finished')
    
def build_manifest(data_dir, dataset_name, manifest_path=os.getcwd() + '/LibriSpeech/manifest.json'):
    '''
    build_manifest(for training)
    '''
    print('Building manifest...')
    
    transcripts = glob.glob(os.path.join(data_dir, f'LibriSpeech/{dataset_name}/**/**/*.txt'))
    with open(manifest_path, 'w') as out_file:
        for trans_path in tqdm(transcripts, position=0, leave=False):
            with open(trans_path, 'r') as file:
                for line in file.readlines():   
                    transcript = line.lower()[:-1]
                    audio_name = transcript.split(' ')[0]
                    text = transcript[len(transcript.split(' ')[0]) + 1:]
                    path_to_folder, _ = trans_path.rsplit('/', 1)
                    audio_path = os.path.join(path_to_folder, audio_name+'.wav')
                    duration = librosa.core.get_duration(filename=audio_path)
                    
                    metadata = {
                        "audio_filepath": audio_path,
                        "duration": duration,
                        "text": text
                    }
                    json.dump(metadata, out_file)
                    out_file.write('\n')
    print('Finished')

@contextmanager
def autocast(enabled=None):
    yield
    
def transcribe_audios(asr_model):
    '''
    Transcribe audios from created manifest file
    '''
    asr_model = asr_model.cuda()
    asr_model.eval()
    labels_map = dict([(i, asr_model.decoder.vocabulary[i]) for i in range(len(asr_model.decoder.vocabulary))])
    wer = WER(vocabulary=asr_model.decoder.vocabulary)
    hypotheses = []
    references = []
    for test_batch in asr_model.test_dataloader():
        if torch.cuda.is_available():
            test_batch = [x.cuda() for x in test_batch]
        with autocast():
            log_probs, encoded_len, greedy_predictions = asr_model(
                input_signal=test_batch[0], input_signal_length=test_batch[1]
            )
        hypotheses += wer.ctc_decoder_predictions_tensor(greedy_predictions)
        for batch_ind in range(greedy_predictions.shape[0]):
            reference = ''.join([labels_map[c] for c in test_batch[2][batch_ind].cpu().detach().numpy()])
            references.append(reference)
        del test_batch
    wer_value = word_error_rate(hypotheses=hypotheses, references=references)
    return hypotheses, references, wer_value

In [25]:
class NoiseAgumentator:
    def __init__(self, dir_noises):
        self.noises = [(file_path.rsplit('/',1)[1].split('.')[0], librosa.load(file_path, sr=16000)[0])  # np.array(scipy.io.wavfile.read(file_path)[1], dtype=np.float32))
                       for file_path in glob.glob(dir_noises +'/*.wav')]

    def normalize_audio(self, audio):
        max_amp = max(abs(audio))
        audio = audio * 0.73 / max_amp
        audio = audio * 32768
        return audio.astype(np.int16)
                 
    def apply_noise(self, source_audio, noise_audio, weight):
        source_audio = self.normalize_audio(source_audio)
        noise_audio = self.normalize_audio(noise_audio)
        if len(source_audio) < len(noise_audio):
            noise_audio = noise_audio[:len(source_audio)]
        else:
            noise_audio = np.pad(noise_audio, (0, len(source_audio) - len(noise_audio)))
        return self.normalize_audio((1.0 - weight) * source_audio + weight * noise_audio)
                        
    def make_noisy(self, audio, weights=[0.1], poison_coef=1):
        added_noises_idx = []
        # audio = np.array(audio, dtype=np.float32)
        noised_audios = len(weights) * [audio]
        total = 1 + np.random.poisson(poison_coef)
        i = 0
        while i < total :
            noise_idx, noise = self.noises[np.random.randint(0, len(self.noises) - 1)]
            added_noises_idx.append(noise_idx)
            # noise = np.resize(noise, len(audio))
            for idx, noise_weight in enumerate(weights):
                # noised_audios[idx] = (1.0 - noise_weight) * noised_audios[idx] + noise_weight * noise
                noised_audios[idx] = self.apply_noise(noised_audios[idx], noise, noise_weight)
            i += 1
        return noised_audios, added_noises_idx
    
def copy_directory_folders(inputpath, outputpath):
    """
    Copy folders from one directory to another
    """
    if not os.path.isdir(outputpath):
        os.mkdir(outputpath)
    for dirpath, dirnames, filenames in os.walk(inputpath):
        structure = os.path.join(outputpath, os.path.relpath(dirpath, inputpath))
        if not os.path.isdir(structure):
            os.mkdir(structure)
            
def copy_dir_txts_to_aug_folders(data_dir, dataset_name, noise_levels_idx, variants_per_noise_level):
    """
    Copy folders from one directory to another and txt files containing references
    """
    transcripts = glob.glob(os.path.join(data_dir, f'LibriSpeech/{dataset_name}/**/**/*.txt'))
    inputpath = os.path.join(data_dir, 'LibriSpeech', dataset_name)
    for noise_level_idx in noise_levels_idx:
        for variant in range(variants_per_noise_level):
            noise_dataset_name = f'{dataset_name}_noised_{noise_level_idx}_variant_{variant}'
            outputpath = os.path.join(data_dir, 'LibriSpeech', noise_dataset_name)
            copy_directory_folders(inputpath, outputpath)

            for transcript_path in transcripts:
                l, r = transcript_path.split(f"/{dataset_name}/")
                path_to_copy = os.path.join(l, noise_dataset_name ,r.rsplit('/', 1)[0])
                copy(transcript_path, path_to_copy)   
            
def aug_noise_datasets(dataset_name, noise_levels_idx, noise_levels, variants_per_noise_level):
    """
    Augument audio data from one folder - dataset_name to others with the specific coefs of noises
    """
    audios = glob.glob(os.path.join(data_dir, f'LibriSpeech/{dataset_name}/**/**/*.wav'))
    noises_info = []
    for audio_path in tqdm(audios, position=0, leave=False):
        sr, audio = wavfile.read(audio_path)
        assert sr == 16000
        part1, part2 = audio_path.split(dataset_name)
        for variant in range(variants_per_noise_level):
            noised, noises_idx = aug.make_noisy(audio, noise_levels)      
            for i, noise_level_idx in enumerate(noise_levels_idx):
                audio_noised = noised[i]
                audio_noised_path = f'{part1}{dataset_name}_noised_{noise_level_idx}_variant_{variant}{part2.split(".")[0]}.wav'
                wavfile.write(audio_noised_path, sr, audio_noised)
            # dataset, audio_name, variant, noises
            noises_info.append((dataset_name, part2.rsplit('/', 1)[1], variant, noises_idx))
    
    return noises_info

## Downloading of LibriSpeech datasets: dev-other, test-other

more dataset by link: http://www.openslr.org/12/

In [10]:
data_dir = os.getcwd()
dataset_names = ['dev-other', 'test-other']

In [11]:
def download_dataset(dataset_name):
    download_extract_libri(data_dir, dataset_name)
    flac2wav(data_dir, dataset_name)

In [12]:
for dataset_name in dataset_names:
    download_dataset(dataset_name)

## Downloading of noises (test dataset ~304 MB)

more noises by link: https://zenodo.org/record/2529934/files/FSDnoisy18k.audio_train.zip?download=1 ~9.2 GB

In [13]:
wget.download('https://zenodo.org/record/2529934/files/FSDnoisy18k.audio_test.zip?download=1')
with zipfile.ZipFile('FSDnoisy18k.audio_test.zip',"r") as zip_ref:
    zip_ref.extractall(os.path.join(data_dir, 'noises'))

In [26]:
dir_noises = os.path.join(data_dir, 'noises', 'FSDnoisy18k.audio_test')
aug = NoiseAgumentator(dir_noises)

## Augument data with noises

$ noised\ audio = (1 - coef)\cdot audio + coef \cdot noise $

In [15]:
# Considering some noise levels among grid with step 0.25% from 0.25% to 40.0%

noise_level_step = 0.0025
noise_levels_count = 160
noise_levels_idx = list(range(noise_levels_count))
noise_levels = [(i + 1) * noise_level_step for i in noise_levels_idx]

variations_per_noise_level = 1  # noise at fixed level is applied to each audio in multiple ways

print(f'noise_levels first 10 {noise_levels[:10]}')
print(f'noise_levels last 10 {noise_levels[-10:]}')

noise_levels_idx_chosen = range(9, 79 + 1, 10)  # choose any levels of your interest
noise_levels_chosen = [noise_levels[i] for i in noise_levels_idx_chosen]
print(f'noise_levels chosen {noise_levels_chosen}')

In [27]:
noises_info = []
for dataset_name in dataset_names:
    copy_dir_txts_to_aug_folders(data_dir, dataset_name, noise_levels_idx_chosen, variations_per_noise_level)
    noises_info += aug_noise_datasets(dataset_name, noise_levels_idx_chosen, noise_levels_chosen, variations_per_noise_level)

In [28]:
pd.DataFrame(
    noises_info, 
    columns=["dataset", "record", "variant", "noises"]
).to_json(f'noises.json', orient="records")

## Check noise effect

In [57]:
def plot(audio):
    plt.figure(figsize=(10, 2))
    plt.title('Waveform')
    plt.ylabel('Amplitude')
    waveplot(audio.astype(float))
    
def get_sample_audio(dir_suffix=''):
    audio_path = os.path.join(data_dir, f'LibriSpeech/{dataset_names[0]}{dir_suffix}/4323/13259/4323-13259-0014.wav')
    return wavfile.read(audio_path)

In [58]:
sr, audio = get_sample_audio()
plot(audio)
ipd.Audio(audio, rate=sr)

In [60]:
sr, audio = get_sample_audio('_noised_19_variant_0')
plot(audio)
ipd.Audio(audio, rate=sr)

In [61]:
sr, audio = get_sample_audio('_noised_79_variant_0')
plot(audio)
ipd.Audio(audio, rate=sr)

## Building manifests for transcribing audios

In [62]:
datasets = list(dataset_names)
for dataset_name in dataset_names:
    for variant in range(variations_per_noise_level):
        noised_datasets = [f'{dataset_name}_noised_{noise_level_idx}_variant_{variant}' for noise_level_idx in noise_levels_idx_chosen]
        datasets += noised_datasets

for dataset in datasets:
    build_manifest(data_dir, dataset,  manifest_path=os.path.join(data_dir, 'LibriSpeech', f'{dataset}_manifest.json'))

## Loading models: Jasper10x5Dr-En, QuartzNet15x5NR-En

In [63]:
#!L
# other available models
EncDecCTCModel.list_available_models()

In [66]:
#!L
model_names = ["stt_en_jasper10x5dr", "QuartzNet15x5Base-En"]

asr_models = [EncDecCTCModel.from_pretrained(model_name=model_name) for model_name in model_names]

In [67]:
#!L
recognitions_data = []
wers_data = []
for model_idx, asr_model in enumerate(asr_models):
    for dataset in datasets:
        model_name = model_names[model_idx]
        print(f'model {model_name} dataset {dataset}', flush=True)
        
        manifest_filepath = os.path.join(data_dir,  'LibriSpeech', f'{dataset}_manifest.json')
        asr_model.setup_test_data(
            test_data_config={
                'sample_rate': 16000,
                'manifest_filepath': manifest_filepath,
                'labels': asr_model.decoder.vocabulary,
                'batch_size': 4,
                'normalize_transcripts': True,
            }
        )
        
        with open(manifest_filepath) as f:
            audios_names = [
                json.loads(line)['audio_filepath'].split(dataset)[1].rsplit('/', 1)[1] 
                for line in f.read().split('\n') if len(line) > 0
            ]
    
        if dataset in dataset_names:
            dataset_name = dataset
            noise_level_idx = None
            noise_level = None
            variant = None
        else:
            # f'{dataset_name}_noised_{i}_variant_{j}'
            dataset_name, _, noise_level_idx, _, variant = dataset.split('_')
            variant = int(variant)
            noise_level_idx = int(noise_level_idx)
            noise_level = noise_levels[noise_levels_idx.index(noise_level_idx)]

        hyps, refs, wer = transcribe_audios(asr_model)
        for audio_index, hyp_ref in enumerate(zip(hyps, refs)):
            hyp, ref = hyp_ref
            audio_name = audios_names[audio_index]
            recognitions_data.append((
                dataset_name, model_name, audio_name, noise_level_idx, variant, ref, hyp
            ))
        
        
        wers_data.append((
            dataset_name, model_name, noise_level, variant, wer
        ))

## Save dataset

In [68]:
pd.DataFrame(
    recognitions_data, 
    columns=['dataset', 'model', 'record', 'noise_level', 'variant', 'reference', 'hypothesis']
).to_json(f'recognitions.json', orient="records")
        
    
wer_df = pd.DataFrame(
    wers_data,
    columns=['dataset', 'model', 'noise_level', 'variant', 'wer']
)
wer_df.to_json(f'wers.json', orient="records")

## (Noise level -> WER) plots

In [70]:
import matplotlib.pyplot as plt

def plot_wer_sample():
    for dataset in dataset_names:
        ax = plt.gca(title=dataset)
        dataset_df = wer_df[wer_df['dataset'] == dataset][wer_df['variant'] == 0]
        for model in model_names:
            model_df = dataset_df[dataset_df['model'] == model]
            model_df.plot(kind='line', x='noise_level', y='wer', label=model, ax=ax)
        plt.show()  
    
plot_wer_sample()