In [None]:
import librosa
import tensorflow as tf
import numpy as np
import pandas as pd
from keras import utils
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import EarlyStopping, TensorBoard
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D
from concurrent.futures import ThreadPoolExecutor

In [None]:
CSV_FILE_PATH = "../../bio_metadata.csv"

In [None]:
def predict_class_audio(MFCCs, model):
    '''
        Predict class based on MFCC samples
        :param MFCCs: Numpy array of MFCCs
        :param model: Trained model
        :return: Predicted class of MFCC segment group
    '''
    print(MFCCs.shape)
    MFCCs = MFCCs.reshape(MFCCs.shape[0],MFCCs.shape[1], MFCCs.shape[2],1)  # MFCCs.shape[2]
    y_predicted = model.predict(MFCCs,verbose=0)
    return(Counter(list(y_predicted)).most_common(1)[0][0]) # [0]


def predict_prob_class_audio(MFCCs, model):
    '''
        Predict class based on MFCC samples' probabilities
        :param MFCCs: Numpy array of MFCCs
        :param model: Trained model
        :return: Predicted class of MFCC segment group
    '''
    MFCCs = MFCCs.reshape(MFCCs.shape[0],MFCCs.shape[1],MFCCs.shape[2],1)
    y_predicted = model.predict_proba(MFCCs,verbose=0)
    return(np.argmax(np.sum(y_predicted,axis=0)))

def predict_class_all(X_train, model):
    '''
        :param X_train: List of segmented mfccs
        :param model: trained model
        :return: list of predictions
    '''
    predictions = []
    for mfcc in X_train:
        predictions.append(predict_class_audio(mfcc, model))
        # predictions.append(predict_prob_class_audio(mfcc, model))
    return predictions

def confusion_matrix(y_predicted,y_test):
    '''
        Create confusion matrix
        :param y_predicted: list of predictions
        :param y_test: numpy array of shape (len(y_test), number of classes). 1.'s at index of actual, otherwise 0.
        :return: numpy array. confusion matrix
    '''
    confusion_matrix = np.zeros((len(y_test[0]),len(y_test[0])),dtype=int )
    for index, predicted in enumerate(y_predicted):
        confusion_matrix[np.argmax(y_test[index])][predicted] += 1
    return(confusion_matrix)

def get_accuracy(y_predicted,y_test):
    '''
        Get accuracy
        :param y_predicted: numpy array of predictions
        :param y_test: numpy array of actual
        :return: accuracy
    '''
    c_matrix = confusion_matrix(y_predicted,y_test)
    return( np.sum(c_matrix.diagonal()) / float(np.sum(c_matrix)))

In [None]:

def filter_df(df):
    '''
        Function to filter audio files based on df columns
        df column options: [age,age_of_english_onset,age_sex,birth_place,english_learning_method,
        english_residence,length_of_english_residence,native_language,other_languages,sex]
        :param df (DataFrame): Full unfiltered DataFrame
        :return (DataFrame): Filtered DataFrame
    '''
    arabic = df[df.native_language == 'arabic']
    mandarin = df[df.native_language == 'mandarin']
    english = df[df.native_language == 'english']
    mandarin = mandarin[mandarin.length_of_english_residence < 2] # 10
    arabic = arabic[arabic.length_of_english_residence < 2] # 10

    # use concat to add the dataframes together
    return pd.concat(
        [
            df,
            mandarin,
            arabic,
            english,
        ],
        ignore_index=True
    )
    # return dataframe

def split_people(df,test_size=0.2):
    '''
        Create train test split of DataFrame
        :param df (DataFrame): Pandas DataFrame of audio files to be split
        :param test_size (float): Percentage of total files to be split into test
        :return X_train, X_test, y_train, y_test (tuple): Xs are list of df['language_num'] and Ys are df['native_language']
    '''
    return train_test_split(
        df['language_num'],
        df['native_language'],
        test_size=test_size,
        random_state=124
    )



In [None]:
def extract_acoustic_features(file_path, features=('mfcc', 'chroma_stft', 'spectral_centroid')):
    """
        Extracts acoustic features from an audio file.
        Args:
            file_path (str): Path to the audio file.
            features (tuple, optional): A tuple of feature names to extract. Defaults to ('mfcc', 'chroma_stft', 'spectral_centroid').
        Returns:
            dict: A dictionary containing the extracted acoustic features, or None if an error occurs.
        Raises:
            ValueError: If an unsupported feature is requested.
    """

    try:
        y, sr = librosa.load(f'../../data/audio/{file_path}.wav')
        features_dict = {}
        for feature_name in features:
            if feature_name == 'mfcc':
                features_dict['mfcc'] = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40)
            elif feature_name == 'chroma_stft':
                features_dict['chroma_stft'] = librosa.feature.chroma_stft(y=y, sr=sr, n_chroma=12)
            elif feature_name == 'spectral_centroid':
                features_dict['spectral_centroid'] = librosa.feature.spectral_centroid(y=y, sr=sr)
            elif feature_name == 'spectral_bandwidth':
                features_dict['spectral_bandwidth'] = librosa.feature.spectral_bandwidth(y=y, sr=sr)
            elif feature_name == 'zero_crossing_rate':
                features_dict['zero_crossing_rate'] = librosa.feature.zero_crossing_rate(
                y=y, 
                frame_length=2048, 
                hop_length=512
            )
            elif feature_name == 'rmse':
                features_dict['rmse'] = librosa.feature.rms(y=y)
            else:
                raise ValueError(f"Unsupported feature: {feature_name}")

        return features_dict
    except Exception as e:
        print(f"Error loading file {file_path}: {e}")
        return None  # Or handle error differently


def extract_prosodic_features(file_path, features=('pitch', 'intensity', 'duration')):
    """
        Extracts prosodic features from an audio file.

        Args:
            file_path (str): Path to the audio file.
            features (tuple, optional): A tuple of feature names to extract. Defaults to ('pitch', 'intensity', 'duration').

        Returns:
            dict: A dictionary containing the extracted prosodic features.

        Raises:
            ValueError: If an unsupported feature is requested.
    """

    try:
        y, sr = librosa.load(f'../../data/audio/{file_path}.wav')
        prosodic_features = {}
        for feature_name in features:
            if feature_name == 'pitch':
                prosodic_features['pitch'] = librosa.yin(y=y, fmin=65, fmax=2093)
            elif feature_name == 'intensity':
                prosodic_features['intensity'] = librosa.feature.rms(y=y)
            elif feature_name == 'duration':
                prosodic_features['duration'] = len(y) / sr
            elif feature_name == 'formants':
                # Implement formant extraction using librosa or a custom solution
                prosodic_features['formants'] = None  # Placeholder for now
            else:
                raise ValueError(f"Unsupported feature: {feature_name}")
        return prosodic_features
    except Exception as e:
        print(f"Error loading file {file_path}: {e}")
        return None  # Handle loading errors

# extract plp 
def extract_plp(file_path):
    y, sr = librosa.load(f'../../data/audio/{file_path}.wav')
    plp = librosa.beat.plp(y=y, sr=sr)
    return plp

In [None]:
csv_file = CSV_FILE_PATH
df = pd.read_csv(csv_file)
filtered_df = filter_df(df)
split_peo = split_people(filtered_df)

In [None]:
DEBUG = True
SILENCE_THRESHOLD = .001 # .01
RATE = 2205 # 2205
N_MFCC = 15 #13 #15 # 20
COL_SIZE = 30
EPOCHS = 10 #35 #250
BATCH_SIZE=16

def to_categorical(y):
    '''
        Converts list of languages into a binary class matrix
        :param y (list): list of languages
        :return (numpy array): binary class matrix
    '''
    lang_dict = {}
    for index,language in enumerate(set(y)):
        # # strip the numbers from the language name, example english1 -> english
        # language = re.sub(r'[0-9]+', '', language)
        # # split by new line and take the first part
        # language = language.split('\n')[0]
        lang_dict[language] = index
    y = list(map(lambda x: lang_dict[x],y))
    return utils.to_categorical(y, len(lang_dict))

def get_wav(filename):
    """
        Loads a wav file from disk and resamples to a target sample rate.

        Args:
            filename (str): Path to the wav file.

        Returns:
            numpy.ndarray: Down-sampled wav file (or None if an error occurs).
    """
    try:
        y, sr = librosa.load(f'../../data/audio/{filename}.wav')
        return librosa.core.resample(y=y, orig_sr=sr, target_sr=RATE, scale=True)
    except Exception as e:
        print(f"Error loading wav: {filename} - {e}")
        return None  # Or handle error differently

def to_mfcc(wav):  # Optional arguments for flexibility
    """
        Converts a wav file to Mel Frequency Ceptral Coefficients (MFCCs).
        Args:
            wav (numpy array): The wav form data.
            sr (int, optional): The sample rate of the audio. Defaults to None (use from data if available).
            n_mfcc (int, optional): The number of MFCC coefficients to extract. Defaults to None (use librosa's default).

        Returns:
            numpy.ndarray: A 2D numpy array containing the MFCC features.

        Raises:
            Exception: If an error occurs during processing.
    """
    try:
        return librosa.feature.mfcc(y=wav, sr=RATE, n_mfcc=N_MFCC)
    except Exception as e:
        print(f"Error converting wav to MFCC: {e}")
        return None  # Or handle error differently



def remove_silence(wav, thresh=0.04, chunk=5000):
    '''
        Searches wav form for segments of silence. If wav form values are lower than 'thresh' for 'chunk' samples, the values will be removed
        :param wav (np array): Wav array to be filtered
        :return (np array): Wav array with silence removed
    '''

    tf_list = []
    for x in range(len(wav) / chunk):
        if (np.any(wav[chunk * x:chunk * (x + 1)] >= thresh) or np.any(wav[chunk * x:chunk * (x + 1)] <= -thresh)):
            tf_list.extend([True] * chunk)
        else:
            tf_list.extend([False] * chunk)

    tf_list.extend((len(wav) - len(tf_list)) * [False])
    return(wav[tf_list])

def normalize_mfcc(mfcc):
    '''
        Normalize mfcc
        :param mfcc:
        :return:
    '''
    mms = MinMaxScaler()
    return(mms.fit_transform(np.abs(mfcc)))

def make_segments(mfccs,labels):
    '''
        Makes segments of mfccs and attaches them to the labels
        :param mfccs: list of mfccs
        :param labels: list of labels
        :return (tuple): Segments with labels
    '''
    segments = []
    seg_labels = []
    for mfcc,label in zip(mfccs,labels):
        for start in range(0, int(mfcc.shape[1] / COL_SIZE)):
            segments.append(mfcc[:, start * COL_SIZE:(start + 1) * COL_SIZE])
            seg_labels.append(label)
    return(segments, seg_labels)

def segment_one(mfcc):
    '''
        Creates segments from on mfcc image. If last segments is not long enough to be length of columns divided by COL_SIZE
        :param mfcc (numpy array): MFCC array
        :return (numpy array): Segmented MFCC array
    '''
    segments = []
    for start in range(0, int(mfcc.shape[1] / COL_SIZE)):
        segments.append(mfcc[:, start * COL_SIZE:(start + 1) * COL_SIZE])
    return(np.array(segments))

def create_segmented_mfccs(X_train):
    '''
        Creates segmented MFCCs from X_train
        :param X_train: list of MFCCs
        :return: segmented mfccs
    '''
    segmented_mfccs = []
    for mfcc in X_train:
        # print("were here >>>> ", mfcc.shape)
        segmented_mfccs.append(segment_one(mfcc))
    return(segmented_mfccs)


def train_model(X_train,y_train,X_validation, y_validation, batch_size=BATCH_SIZE):
    '''
        Trains 2D convolutional neural network
        :param X_train: Numpy array of mfccs
        :param y_train: Binary matrix based on labels
        :return: Trained model
    '''
    # Get row, column, and class sizes
    rows = X_train[0].shape[0]
    cols = X_train[0].shape[1]
    val_rows = X_validation[0].shape[0]
    val_cols = X_validation[0].shape[1]
    num_classes = len(y_train[0])
    input_shape = (rows, cols, 1)
    X_train = X_train.reshape(X_train.shape[0], rows, cols, 1 )
    X_validation = X_validation.reshape(X_validation.shape[0],val_rows,val_cols,1)

    model = Sequential()
    model.add(
        Conv2D(
            32, 
            kernel_size=(3,3), 
            activation='relu',
            data_format="channels_last",
            input_shape=input_shape
        )
    )
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(64,kernel_size=(3,3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(
        loss='categorical_crossentropy',
        optimizer='adadelta',
        metrics=['accuracy']
    )
    # Stops training if accuracy does not change at least 0.005 over 10 epochs
    es = EarlyStopping(
        monitor='acc', 
        min_delta=.005,
        patience=10, 
        verbose=1, 
        mode='auto'
    )
    # Creates log file for graphical interpretation using TensorBoard
    tb = TensorBoard(
        log_dir='../../logs', 
        histogram_freq=0, 
        batch_size=BATCH_SIZE, 
        write_graph=True, 
        write_grads=True,
        write_images=True, 
        embeddings_freq=0, 
        embeddings_layer_names=None,
        embeddings_metadata=None,
    )

    # Image shifting
    datagen = ImageDataGenerator(width_shift_range=0.05)

    # Fit model using ImageDataGenerator
    model.fit(
        datagen.flow(
            X_train, 
            y_train,
            # batch_size=batch_size
        ),
        steps_per_epoch=len(X_train) / BATCH_SIZE,
        epochs=EPOCHS,
        callbacks=[
            # es,
            tb
        ],
        validation_data=(
            X_validation,
            y_validation
        )
    )

    return (model)

def save_model(model, model_filename):
    '''
        Save model to file
        :param model: Trained model to be saved
        :param model_filename: Filename
        :return: None
    '''
    model.save('../../output/models/{}.keras'.format(model_filename))  # creates a HDF5 file 'my_model.keras'



In [None]:
# Load arguments
file_name = CSV_FILE_PATH
model_filename = 'model5'
# Load metadata
df = pd.read_csv(file_name)
# Filter metadata to retrieve only files desired
filtered_df = filter_df(df)
# Train test split
X_train, X_test, y_train, y_test = split_people(filtered_df)
# split y_train value by \n and take the first part, in the dataframe
y_test = y_test.apply(lambda x: x.split('\n')[0])
y_train = y_train.apply(lambda x: x.split('\n')[0])
# Get statistics
train_count = Counter(y_train)
test_count = Counter(y_test)
print('Train count:', train_count)
print('Test count:', test_count)

In [None]:
acc_to_beat = test_count.most_common(1)[0][1] / float(np.sum(list(test_count.values())))

# To categorical
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
print(y_train)

In [None]:

# Get resampled wav files using multiprocessing
if DEBUG:
    print('Extracting Features....')
    
with ThreadPoolExecutor() as pool:
    X_prosodic = pool.map(extract_prosodic_features, X_train)
    X_acoustic = pool.map(extract_acoustic_features, X_train)
    X_plp = pool.map(extract_plp, X_train)

In [None]:
# do something with the prosodic features
# ...

# do something with the acoustic features
# ...

In [None]:

# Get resampled wav files using multiprocessing
if DEBUG:
    print('Loading wav files....')
# pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
# X_train = pool.map(get_wav, X_train)
# X_test = pool.map(get_wav, X_test)

with ThreadPoolExecutor() as pool:
    X_train = pool.map(get_wav, X_train)
    X_test = pool.map(get_wav, X_test)

In [None]:

# Convert to MFCC
if DEBUG:
    print('Converting to MFCC....')
with ThreadPoolExecutor() as pool:
    X_train = pool.map(to_mfcc, X_train)
    X_test = pool.map(to_mfcc, X_test)
    # X_train = pool.map(to_mfcc, X_train)
    # X_test = pool.map(to_mfcc, X_test)

In [None]:
# Create segments from MFCCs
X_train, y_train = make_segments(X_train, y_train)
X_validation, y_validation = make_segments(X_test, y_test)

In [None]:
# Train model
model = train_model(np.array(X_train), np.array(y_train), np.array(X_validation),np.array(y_validation))


In [None]:
val_rows = X_validation[0].shape[0]
val_cols = X_validation[0].shape[1]
X_validation = np.array(X_validation)
X_validation = X_validation.reshape(X_validation.shape[0],val_rows,val_cols, 1)
X_validation.shape

In [None]:


# for mfcc in X_validation:
    # print(mfcc.shape)
    
    # MFCCs = mfcc.reshape(mfcc.shape[0], mfcc.shape[1], mfcc.shape[2])  # MFCCs.shape[2]
    # print(MFCCs.shape)
    # print(mfcc.shape[0],mfcc.shape[1])
    # reshare mfcc to get this shape format shape=(None, 15, 30, 1), 1 is the number of channels
    # mfcc = mfcc.reshape(mfcc.shape[0],mfcc.shape[1],mfcc.shape[2])  # MFCCs.shape[2]
    # y_predicted = model.predict(mfcc,verbose=0)
    # print(Counter(list(y_predicted)).most_common(1)[0]) # [0] shape=(None, 15, 30, 1)
    # return(Counter(list(y_predicted)).most_common(1)[0]) # [0]

In [None]:
# Make predictions on full X_test MFCCs
X_val = create_segmented_mfccs(X_test)
y_predicted = predict_class_all(X_validation, model)
y_predicted

In [None]:
# display some X_test samples sinces its a generator not a list

In [None]:
# Print statistics
print('Training samples:', train_count)
print('Testing samples:', test_count)
print('Accuracy to beat:', acc_to_beat)
print('Confusion matrix of total samples:\n', np.sum(confusion_matrix(y_predicted, y_test),axis=1))
print('Confusion matrix:\n',confusion_matrix(y_predicted, y_test))
print('Accuracy:', get_accuracy(y_predicted,y_test))

# Save model
save_model(model, model_filename)

In [None]:
# plot the confusion matrix
import matplotlib.pyplot as plt
import seaborn as sns

# get the confusion matrix
conf_matrix = confusion_matrix(y_predicted, y_test)

# plot using seaborn
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt='d')
# plt.xlabel('Predicted')
# plt.ylabel('Actual')
# save it to file
plt.savefig('../../output/confusion_matrix.png')
plt.show()


In [None]:
tf.keras.utils.plot_model(
    model,
    to_file=f'../../output/model.png',
    show_shapes=True,
    show_layer_names=True,
    rankdir='TB',
    expand_nested=True,
    show_layer_activations=True,
    show_trainable=True,
    dpi=200
)