## End to end speech recognition

- TIMIT base 
- LSTM and CTC to start

In [1]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt 
import numpy as np


## Base de données

In [2]:
import zipfile
import shutil
import gzip
from IPython.display import display, Audio 
import librosa
import datetime
import os 
import pandas as pd
import csv

In [3]:
def extract_zip_dir(dir_path, display_info=True):

    with zipfile.ZipFile(dir_path, 'r') as zip_dir:
        
        for member in zip_dir.namelist():
            if os.path.exists('.' + r'/' + member) or os.path.isfile('.' + r'/' + member):
                print(member, 'already exists.')
                pass
            else:
                #zip_dir.printdir()
                zip_dir.extractall()

        if display_info:
            with zipfile.ZipFile(dir_path, 'r') as zip_dir:
                for info in zip_dir.infolist():
                    print(info.filename)
                    print('\tModified:\t' + str(datetime.datetime(*info.date_time)))
                    print('\tSystem:\t\t' + str(info.create_system) + ' (0 = Windows, 3 = Unix)')
                    print('\tZIP version:\t' + str(info.create_version))
                    print('\tCompressed:\t' + str(info.compress_size) + ' bytes')
                    print('\tUncompressed:\t' + str(info.file_size) + ' bytes')


#Write a csv file to manipulate easilier the data
def create_csv_files(word_file_list,
                     txt_csv_file, audio_csv_file):

    word_file_list_df = pd.read_csv(word_file_list, header=None)

    try:

        if not ( os.path.isfile(txt_csv_file) and os.path.isfile(txt_csv_file) ):

            with open(txt_csv_file, 'w', newline='') as txt_file, open(audio_csv_file, 'w', newline='') as audio_file:

                writer_txt   = csv.writer(txt_file)
                writer_audio = csv.writer(audio_file)

                writer_txt.writerow(["index","test_or_train","dialect_region","speaker_id","filename",
                                     "path_from_data_dir",
                                     "is_audio","is_word_file","is_phonetic_file","is_sentence_file"])

                writer_audio.writerow(["index","test_or_train","dialect_region","speaker_id","filename",
                                       "path_from_data_dir",
                                       "is_audio","is_word_file","is_phonetic_file","is_sentence_file"])

                for line in range(len(word_file_list_df)):
                    
                    index = line

                    # Looking for info in the line to fill the CSV
                    line_str = word_file_list_df.iloc[line][0]

                    if "TEST" in line_str:
                        test_or_train = "TEST"
                    else:
                        test_or_train = "TRAIN"
                    dialect_region    = "DR" + line_str[line_str.find("/DR") + 3]
                    speaker_id        = line_str[line_str.find(dialect_region) + 4 : line_str.find(dialect_region) + 9]
                    file_name         = line_str[line_str.find(speaker_id) + 6 :]
                    # Remove suffix WRD from the name to replace it by TXT
                    if ".WRD" in file_name:
                        file_name = file_name.replace(".WRD", "")

                    txt_file_name     = file_name + ".TXT.gz"
                    audio_file_name   = file_name + ".WAV.gz"

                    path_from_data_dir_txt   = "./TIMIT/" + test_or_train + "/" + dialect_region + "/" + speaker_id + "/" + txt_file_name 
                    path_from_data_dir_audio = "./TIMIT/" + test_or_train + "/" + dialect_region + "/" + speaker_id + "/" + audio_file_name
                    
                    writer_txt.writerow([index, test_or_train, dialect_region, speaker_id, file_name, path_from_data_dir_txt, 
                                         False, False, False, True])
                    writer_audio.writerow([index, test_or_train, dialect_region, speaker_id, audio_file_name, path_from_data_dir_audio,
                                           True, False, False, False])
            
        else: 
            print("CSV files already exist")

    except IOError as err:
        print ('Failed to open csv files:', err.strerror)
        

def extract_gzip_files(path_to_files):

    files_df = pd.read_csv(path_to_files)

    # Case where we extract audio files
    if ".WAV.gz" in files_df["path_from_data_dir"].iloc[0]:
        
        for path in files_df["path_from_data_dir"]:

            with gzip.open(path, 'rb') as f_in:
                with open(path.replace("WAV.gz", "wav"), 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
    
    # Case where we extract txt files
    elif ".TXT.gz" in files_df["path_from_data_dir"].iloc[0]:

        for path in files_df["path_from_data_dir"]:

            with gzip.open(path, 'rb') as f_in:
                with open(path.replace("TXT.gz", "txt"), 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)


if __name__=='__main__':
    
    dir_path = "TIMIT.zip"
    extract_zip_dir(dir_path, display_info=False)

    # create fill CSV files
    word_file_path = "TIMIT/word_files.txt.gz"
    audio_csv_path = "TIMIT/audio_data.csv"
    txt_csv_path   = "TIMIT/txt_data.csv"

    create_csv_files(word_file_path, txt_csv_path, audio_csv_path)

    # extract needed gz files
    extract_gzip_files(audio_csv_path)
    extract_gzip_files(txt_csv_path)

TIMIT/ already exists.
TIMIT/DOC/ already exists.
TIMIT/DOC/PHONCODE.DOC already exists.
TIMIT/DOC/PROMPTS.TXT.gz already exists.
TIMIT/DOC/SPKRINFO.TXT.gz already exists.
TIMIT/DOC/SPKRSENT.TXT.gz already exists.
TIMIT/DOC/TESTSET.DOC.gz already exists.
TIMIT/DOC/TIMITDIC.DOC already exists.
TIMIT/DOC/TIMITDIC.TXT already exists.
TIMIT/README.DOC already exists.
TIMIT/README.DOC.gz already exists.
TIMIT/TEST/ already exists.
TIMIT/TEST/DR1/ already exists.
TIMIT/TEST/DR1/FAKS0/ already exists.
TIMIT/TEST/DR1/FAKS0/SA1.PHN.gz already exists.
TIMIT/TEST/DR1/FAKS0/SA1.PHN_SP.gz already exists.
TIMIT/TEST/DR1/FAKS0/SA1.TXT.gz already exists.
TIMIT/TEST/DR1/FAKS0/SA1.WAV.gz already exists.
TIMIT/TEST/DR1/FAKS0/SA1.WRD.gz already exists.
TIMIT/TEST/DR1/FAKS0/SA1.lab.gz already exists.
TIMIT/TEST/DR1/FAKS0/SA1.mfc.gz already exists.
TIMIT/TEST/DR1/FAKS0/SA2.PHN.gz already exists.
TIMIT/TEST/DR1/FAKS0/SA2.PHN_SP.gz already exists.
TIMIT/TEST/DR1/FAKS0/SA2.TXT.gz already exists.
TIMIT/TEST/DR1

In [4]:
def create_data_frame(csv_path):

    dataframe = pd.read_csv(csv_path)

    if "WAV.gz" in dataframe["path_from_data_dir"].iloc[0]:
        dataframe["path_from_data_dir"] = dataframe["path_from_data_dir"].str.replace("WAV.gz", "wav", regex=True)

    if "TXT.gz" in dataframe["path_from_data_dir"].iloc[0]:
        dataframe["path_from_data_dir"] = dataframe["path_from_data_dir"] = dataframe["path_from_data_dir"].str.replace("TXT.gz", "txt", regex=True)

    return dataframe


if __name__=="__main__":

    audio_df = create_data_frame(audio_csv_path)
    txt_df   = create_data_frame(txt_csv_path)

      index test_or_train dialect_region speaker_id       filename  \
0         0          TEST            DR1      FAKS0     SA1.WAV.gz   
1         1          TEST            DR1      FAKS0     SA2.WAV.gz   
2         2          TEST            DR1      FAKS0  SI1573.WAV.gz   
3         3          TEST            DR1      FAKS0  SI2203.WAV.gz   
4         4          TEST            DR1      FAKS0   SI943.WAV.gz   
...     ...           ...            ...        ...            ...   
6295   6295         TRAIN            DR8      MTCS0   SX172.WAV.gz   
6296   6296         TRAIN            DR8      MTCS0   SX262.WAV.gz   
6297   6297         TRAIN            DR8      MTCS0   SX352.WAV.gz   
6298   6298         TRAIN            DR8      MTCS0   SX442.WAV.gz   
6299   6299         TRAIN            DR8      MTCS0    SX82.WAV.gz   

                     path_from_data_dir  is_audio  is_word_file  \
0        ./TIMIT/TEST/DR1/FAKS0/SA1.wav      True         False   
1        ./TIMIT/TEST/DR1

In [10]:
import pickle as pkl

def fill_X_and_y(list_type):
    """
        type (str): "TRAIN" or "TEST"
    """

    X = []
    y = []

    for path in audio_df["path_from_data_dir"][(audio_df["test_or_train"]==list_type)]:

        # fill X
        audio, sample_rate = librosa.load(path)
        # hop length is set as in the cited paper
        mfcc_spectrogram   = librosa.feature.mfcc(y=audio, sr=sample_rate, n_fft=254, hop_length=254-128, n_mels=13)
        #print(mfcc_spectrogram.shape)
        X.append(mfcc_spectrogram)

        # fill y
        path = path.replace("wav", "txt")
        text = open(path, 'r').read()
        y.append(text)

    return X, y


def save_datasets(X, y, file_name):

    with open(file_name, "wb") as file:
        pkl.dump([X, y], file)


def load_datasets(file_name):

    with open(file_name, "rb") as file:
        X, y = pkl.load(file)

    return X, y



if __name__=="__main__":

    if not ( os.path.isfile("train.pkl") and os.path.isfile("test.pkl") ):

        X_train, y_train = fill_X_and_y(list_type="TRAIN")
        X_test, y_test   = fill_X_and_y(list_type="TEST")
        
        file_name = "train.pkl"
        save_datasets(X_train, y_train, file_name)
        file_name = "test.pkl"
        save_datasets(X_test, y_test, file_name)

    else:
        print("Files: test.pkl and train.pkl already exist")


Files: test.pkl and train.pkl already exist


In [6]:
file_name = "train.pkl"
X_train, y_train = load_datasets(file_name)
file_name = "test.pkl"
X_test, y_test   = load_datasets(file_name)

In [12]:
print(X_test[5].shape)
print(y_train[0])
print(y_test == y_train)

(13, 581)
0 46797 She had your dark suit in greasy wash water all year.

False


## Modèle

In [8]:
class coupled_LSTM_CTC(nn.Module):

    def __init__(self, input_layer, output_layer):
        super.__init__()


    def forward(self, x):
        return x

## Utils

In [9]:
def save_model(model, path):
    torch.save(model.state_dict(), path)

def load_model(model, file_path, device):
    state_params = torch.load(file_path)
    model.load_state_dict(state_params)



## Main