In [None]:
#Import required packages
import pandas as pd
import numpy as np
import librosa
import pathlib
import os
from google.colab import drive
import matplotlib.pyplot as plt
from sklearn.preprocessing import scale
import warnings
import glob
from scipy import signal

In [None]:
#Read audio classification file and clean
audio_class_df = pd.read_csv("audioclassification_meta.csv")
c_names = audio_class_df.columns.tolist()
c_names = c_names[0].replace(" ", "_").split("\t")

audio_class_df[c_names] = audio_class_df['VoxCeleb1 ID\tVGGFace1 ID\tGender\tNationality\tSet'].\
                        str.split("\t", expand = True)
audio_class_df = audio_class_df[c_names]

#Set as dictionary
audio_class_dict = audio_class_df.set_index("VoxCeleb1_ID").T.to_dict('list')

#View data
audio_class_df.head()

In [None]:
# Phoneme data needed to extract phonemes only
phoible_df = pd.read_csv("phoible.csv")

# Generate a mapping from nationality to language spoken
nationalities_to_language = {'Irish': 'English',
                             'India': 'Hindi', 
                             'USA': 'English (American)',
                             'Australia': 'English (Australian)',
                             'Canada': 'English', 
                             'UK': 'English (British)', 
                             'Norway': 'Norwegian',
                             'Italy': 'Italian',
                             'Sudan': 'Arabic',
                             'Mexico': 'Spanish',
                             'China': 'Standard Chinese; Mandarin',
                             'Switzerland': 'Swiss German',
                             'Guyana': 'English',
                             'Philippines':'Filipino',
                             'New Zealand': 'English (New Zealand)',
                             'Germany': 'German', 
                             'Portugal': 'Portuguese (European)',
                             'Netherlands': 'Dutch',
                             'Pakistan': 'Urdu',
                             'Croatia': 'Croatian',
                             'South Korea': 'Korean',
                             'Sweden': 'Swedish',
                             'Russia': 'Russian',
                             'Poland': 'Polish',
                             'Sri Lanka': 'Sinhalese', 
                             'Singapore': 'Mandarin Chinese',
                             'Chile': 'Spanish',
                             'Spain': 'Spanish',
                             'Israel':'Modern Hebrew',
                             'Brazil': 'Portuguese (Brazilian)',
                             'Trinidad and Tobago': 'English', 
                             'Denmark': 'Danish',
                             'Austria': 'German', 
                             'South Africa': 'English', 
                             'Iran': 'Farsi'} 

# Filter dataframe to only nationalities that will be encountered
phoible_df = phoible_df[phoible_df['LanguageName'].isin(list(nationalities_to_language.values()))]

# Find all languages spoken within VoxCeleb
all_languages = list(phoible_df['LanguageName'].unique())

# Define a mapping from language to phoneme 
# Key is language and value is a set of phonemes within that language
phonemes_per_lang = {}
for j in range(len(all_languages)):
    phonemes_per_lang[all_languages[j]] = {}
    phonemes_in_lang = phoible_df[phoible_df['LanguageName'] == all_languages[j]]['Phoneme'].unique()
    phonemes_per_lang[all_languages[j]] = set()
    for i in range(len(phonemes_in_lang)):
        phonemes_per_lang[all_languages[j]].add(phonemes_in_lang[i])

# Create a set of all the phonemes in English languages
eng_langs = ['English', 'English (American)','English (Australian)', \
             'English (British)', 'English (New Zealand)']
english_phonemes = set()
for lang in eng_langs:
    english_phonemes.update(phonemes_per_lang[lang])

# Define a mapping from English phonemes to allophones that may be present in tother languages
english_phonemes_to_allophones = {}
for phoneme in english_phonemes:
    english_phonemes_to_allophones[phoneme] = set(phoneme)                               
    for allophones in phoible_df[(phoible_df['Phoneme'] == phoneme) & (phoible_df['LanguageName'].isin(eng_langs))].Allophones:
        if pd.isnull(allophones) == False and allophones.isalnum():
            for allophone in allophones:
                english_phonemes_to_allophones[phoneme].add(allophone)

def get_key_english_phonemes_to_allophones(val):
    '''
    Function: Find English allophones of non-English phonemes
    Inputs: 
        - val: a phoneme
    Outputs:
        - key: the allophone that phoneme is known as in English, if applicable
    '''
    for key, value in english_phonemes_to_allophones.items():
        if val in value:
            return key

# Define phonemes that will be unseen in training as they are non-English and 
# remove them from a language's phoneme set, replaced by an unseen tag
for language in phonemes_per_lang:
    unseen_phonemes = set()
    for phoneme in phonemes_per_lang[language]:
        if phoneme not in english_phonemes:
            unseen_phonemes.add(phoneme) 
    for unseen_phoneme in unseen_phonemes:
        phonemes_per_lang[language].remove(unseen_phoneme)
        possible_allophone = get_key_english_phonemes_to_allophones(unseen_phoneme)
        if possible_allophone is not None:
            phonemes_per_lang[language].add(possible_allophone)
        else:
            phonemes_per_lang[language].add('unseen')

# Bypass languages to map directly from nationality to phoneme
nationalities_to_phonemes = {}
for nationality in nationalities_to_language.keys():
    nationalities_to_phonemes[nationality] = \
    phonemes_per_lang[nationalities_to_language[nationality]]

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount = True)
%cd "/content/drive/Shareddrives/CIS_519_Final_Project"


In [None]:
# Path to audio npz files
speech_path = "/content/drive/Shareddrives/CIS_519_Final_Project/"
# Path to phoneme npz files
phoneme_path = "/content/drive/Shareddrives/CIS_519_Final_Project/"

In [None]:
def pull_id_npz(file_name):
    '''
    Function: Load in files in npz format that are stored as dictionaries
    Inputs: 
        - file_name: string of file name containing path to file
    Outputs:
        - container_list: nested list containing values in the npz files
    '''
    container_list = []
    container = np.load(file_name,allow_pickle=True)
    container_list.append([container[key] for key in container])
    return container_list

In [None]:
# Unpack npz files storing raw speech
def pull_speech_npz(speech_path, id):
    '''
    Function: Unpack npz files storing raw audio
    Inputs: 
        - speech_path: path to audio npz files
    Outputs:
        - nested list containing values in the npz files 
    '''
    for path, subdirs, files in os.walk(speech_path):
        if "phoneme" not in path:
            os.chdir(path)
        if (id+'.npz') in os.listdir():
            return pull_id_npz(id+'.npz')

In [None]:
# Pull all the phoneme npz files associated with a given Voxceleb id
def pull_phoneme_npz(phoneme_path, id):
    '''
    Function: Unpack npz files storing phonemes
    Inputs: 
        - speech_path: path to phoneme npz files
    Outputs:
        - nested list containing values in the npz files 
    '''
    for path, subdirs, files in os.walk(phoneme_path):
        if "phoneme" in path:
            os.chdir(path)
        if (id+'.npz') in os.listdir():
            return pull_id_npz(id+'.npz')

In [None]:
from python_speech_features import fbank
# Function to calculate mfcc coefficients from raw speech, currently using 10 coeffs to limit size
def compute_mfcc(data_to_ids_dict,numcep=10,fs=16e3, max_len=10000):
    '''
    Function: Calculate Mel frequency cepstral coefficients from raw audio npz
    Inputs: 
        - data_to_ids_dict: dictionary of ids to raw audio extracted from npz file
    Outputs:
        - X_mfcc: list of 13 extract MFCCs per time window limited to length 10,000 per sample
    '''
    X_mfcc = []
    for key in data_to_ids_dict:
        if data_to_ids_dict[key] is not None:
            for data in data_to_ids_dict[key][0]:
                mfccs = np.array(mfcc(data).flatten())
                if (max_len > mfccs.shape[0]):
                    pad_width = max_len - mfccs.shape[0]
                    mfccs = np.pad(mfccs, pad_width=(0, pad_width), mode='constant')
                else:
                    mfccs = mfccs[:max_len]
                X_mfcc.append(mfccs)
    return X_mfcc

In [None]:
def extract_phonemes_for_training(phoneme_dict):
    '''
    Function: Extract phonemes per training instance 
    Inputs: 
        - phoneme_dict: dictionary of ids to phoneme sets extracted from npz file
    Outputs:
        - X: list of sets of extracted phonemes, 1 set per file 
    '''
    X = []
    for key in phoneme_dict.keys():
        for item in phoneme_dict[key][0]:
            X.append(item.item())
    return X

In [None]:
def extract_nationalities(npz_to_speech_ids_dict, audio_class_dict):
    '''
    Function: Extract nationalities per training instance 
    Inputs: 
        - npz_to_speech_ids_dict: dictionary of ids to raw audio extracted from npz file
        - audio_class_dict: dictionary of id to nationality 
    Outputs:
        - X: list of sets of extracted nationalities, 1 nationality per file 
    '''
    y = []
    for key in npz_to_speech_ids_dict.keys():
        y_val = audio_class_dict[key][2]
        if npz_to_speech_ids_dict[key] is not None:
            for i in range(len(npz_to_speech_ids_dict[key][0])):
            y.append(y_val)
    return y

In [None]:
# Define all nationalities as English or non English speaking
all_nationalities = list(audio_class_df['Nationality'].unique())
# Remove English speaking nationalities with only 1 speaker in the set 
all_nationalities.remove('South Africa')
all_nationalities.remove('Guyana')
all_nationalities.remove('Trinidad and Tobago')
all_nationalities.remove('Germany') # difficulty generating npz files here


eng_nationalities =  ['USA', 'UK', 'Australia', 'Canada', 'New Zealand', 'Ireland']
non_eng_nationalities = [nationality for nationality in all_nationalities if nationality not in eng_nationalities]

In [None]:
from python_speech_features import mfcc

def generate_zero_shot_data(nationalities):
    '''
    Function: Extract MFCCs and nationalities for ids belonging to a nationality within "nationalities"
    Inputs: 
        - nationalities: a list of nationalities for which to determine MFCCs and nationalities
    Outputs:
        - X_mfcc_train_flat: list of 10,000 MFCC coefficients per example 
        - y_nationality_train_flat: list of the nationality corresponding to a given example 
    '''
    X_mfcc_train = []
    y_nationality_train = []
    for nationality in nationalities:
        print('Working on importing ' + nationality)
        speech_to_phoneme_training_ids = list(audio_class_df[audio_class_df['Nationality'].isin\
                               ([nationality])].VoxCeleb1_ID)
        # Remove corrupt files and limit input sizes
        if 'id11240' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id11240')
        if 'id10155' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10155') 
        if 'id10347' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10347')
        if 'id10409' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10409')
        if 'id10061' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10061') 
        if len(speech_to_phoneme_training_ids) > 50:
            speech_to_phoneme_training_ids = speech_to_phoneme_training_ids[0:49]
            
        # Define dictionary from id to raw audio        
        npz_to_speech_ids_dict = {id: pull_speech_npz(speech_path,id) for id in speech_to_phoneme_training_ids}
        # Compute MFCCs off raw audio
        mfcc = np.array(compute_mfcc(npz_to_speech_ids_dict))
        X_mfcc_train.append(mfcc)
        # Extract nationalities from ids 
        y_nationality = extract_nationalities(npz_to_speech_ids_dict, audio_class_dict)
        y_nationality_train.append(y_nationality)
        # Delete large files to clear memory
        del npz_to_speech_ids_dict
        del mfcc
        del y_nationality
    # Flatten lists
    X_mfcc_train_flat = [item for sublist in X_mfcc_train for item in sublist]
    y_nationality_train_flat = [item for sublist in y_nationality_train for item in sublist]
    return X_mfcc_train_flat, y_nationality_train_flat

In [None]:
def generate_phoneme_data(nationalities):
        '''
    Function: Extract MFCCs and nationalities for ids belonging to a nationality within "nationalities"
    Inputs: 
        - nationalities: a list of nationalities for which to determine MFCCs and nationalities
    Outputs:
        - X_mfcc_train_flat: list of 10,000 MFCC coefficients per example 
        - y_nationality_train_flat: list of the nationality corresponding to a given example 
    '''
    y_phoneme_train = []
    for nationality in nationalities:
        print('Working on importing ' + nationality)
        speech_to_phoneme_training_ids = list(audio_class_df[audio_class_df['Nationality'].isin\
                               ([nationality])].VoxCeleb1_ID)
        # Remove corrupt files and limit input sizes
        if 'id11240' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id11240')
        if 'id10155' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10155') 
        if 'id10347' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10347')
        if 'id10409' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10409')
        if 'id10061' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10061') 
        if len(speech_to_phoneme_training_ids) > 50:
            speech_to_phoneme_training_ids = speech_to_phoneme_training_ids[0:49]
        # Define dictionary from id to raw audio 
        npz_to_speech_ids_dict = {id: pull_speech_npz(speech_path,id) for id in speech_to_phoneme_training_ids}
        # Extract nationalities from ids 
        phoneme_ids_dict ={id: pull_phoneme_npz(phoneme_path,id) for id in speech_to_phoneme_training_ids}
        y_phonemes = extract_phonemes_for_training(phoneme_ids_dict)    
        y_phoneme_train.append(y_phonemes)
        # Delete large files to clear memory
        del speech_to_phoneme_training_ids
        del y_phonemes
    # Flatten list
    y_phoneme_train_flat = [item for sublist in y_phoneme_train for item in sublist]
    return y_phoneme_train_flat

In [None]:
def generate_specs(nationalities):
    X_spec_train = []
      for nationality in nationalities:
        print('Working on importing ' + nationality)
        speech_to_phoneme_training_ids = list(audio_class_df[audio_class_df['Nationality'].isin\
                               ([nationality])].VoxCeleb1_ID)
        # Remove corrupt files and limit input sizes
        if 'id11240' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id11240')
        if 'id10155' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10155') 
        if 'id10347' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10347')
        if 'id10409' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10409')
        if 'id10061' in speech_to_phoneme_training_ids:
            speech_to_phoneme_training_ids.remove('id10061') 
        if len(speech_to_phoneme_training_ids) > 50:
            speech_to_phoneme_training_ids = speech_to_phoneme_training_ids[0:49]
        # Define dictionary from id to raw audio 
        npz_to_speech_ids_dict = {id: pull_speech_npz(speech_path,id) for id in speech_to_phoneme_training_ids}
        #  Compute specrograms off raw audio
        spec = get_feats_mod(npz_to_speech_ids_dict)
        X_spec_train.append(spec)
        # Delete large files to clear memory
        del npz_to_speech_ids_dict
        del spec
    # Flatten list
    X_spec_train_flat = [item for sublist in X_spec_train for item in sublist]
    return X_spec_train_flat

In [None]:
# Generate training and testing data
X_train, y_train_nationalities = generate_zero_shot_data(eng_nationalities)
X_test, y_test_nationalities = generate_zero_shot_data(non_eng_nationalities)
y_train_phonemes = generate_phoneme_data(eng_nationalities)
X_train_spec = generate_specs(eng_nationalities)
X_test_spec = generate_specs(non_eng_nationalities)

In [None]:
# Save files for later use 
%cd "/content/drive/Shareddrives/CIS_519_Final_Project/train_test_data_to_load_in"
np.savez('X_train_mfcc_no_corrupt.npz', *X_train)
np.savez('y_train_nationalities_no_corrupt.npz', *y_train_nationalities)
np.savez('X_test_mfcc.npz', *X_test)
np.savez('y_test_nationalities.npz', *y_test_nationalities)
np.savez('y_train_phonemes_no_corrupt.npz', *y_train_phonemes)
np.savez('X_train_spec.npz', *X_train_spec) 
np.savez('X_test_spec.npz', *X_test_spec) 
