In [None]:
import librosa
import numpy as np
import pandas as pd
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM, GRU, Flatten, Conv1D, MaxPooling1D
from keras.layers import BatchNormalization
from keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from keras.utils import to_categorical
from algorithms import *

In [None]:
#extract development and evaulation
df = pd.read_csv("dsl_data/development.csv")
df_eval = pd.read_csv("dsl_data/evaluation.csv")

In [None]:
label_encoder(df, 'gender')
label_encoder(df_eval, 'gender')
label_encoder(df, 'ageRange')
label_encoder(df_eval, 'ageRange')

In [None]:
target_class = df['action']+ df['object']
encoder = LabelEncoder()
y = encoder.fit_transform(target_class)

In [None]:
cols = ['Id','Self-reported fluency level ', 'First Language spoken', 'Current language used for work/school','action','object']
df.drop(columns=cols,inplace=True)
df_eval.drop(columns=cols[:4],inplace=True)

In [None]:
audio_feature_extraction(df)
audio_feature_extraction(df_eval)


In [None]:
def trim_audio(row):
    data = row['data']
    trimmed_data, index = librosa.effects.trim(data, top_db=20, frame_length=2048, hop_length=512)
    return trimmed_data


df['data'] = df.apply(trim_audio, axis=1)
df_eval['data'] = df_eval.apply(trim_audio, axis=1)

In [None]:
# Extract audio features using librosa
mfcc_array = mfcc_feature(df)

In [None]:
import numpy as np

mfcc_features = [np.array(i) for i in mfcc_array]

max_len = max([i.shape[1] for i in mfcc_array])

padded_mfcc_features = []
for i in mfcc_array:
    pad_width = ((0, 0), (0, max_len - i.shape[1]))
    padded_mfcc_features.append(np.pad(i, pad_width, mode='constant'))

In [None]:
X = np.array(padded_mfcc_features)
y = encoder.transform(target_class) # change from to_categorical to encoder.transform
y = y.reshape(-1, 1) # reshape y to have the same first dimension as logits
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Reshape data for the RNN
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2])
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2])

In [None]:
# df.drop(columns=['path','speakerId'],inplace= True)
# df_eval.drop(columns=['path', 'speakerId'],inplace= True)

In [None]:
# Create the model
model = Sequential()
model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(MaxPooling1D(pool_size=2))
model.add(LSTM(100))
model.add(Dense(100, activation='relu'))
model.add(Dense(y_train.shape[1], activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])


In [None]:
# Adding Early Stopping and Model Checkpoint for saving best weights
early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto')
best_weights = ModelCheckpoint(filepath='best_weights.h5', save_best_only=True, save_weights_only=True, monitor='val_loss', mode='auto', verbose=1)


In [None]:
history = model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=500, batch_size=32)

In [None]:
# Fitting the model on training data
# history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_test, y_test), callbacks=[early_stopping, best_weights], verbose=1)


In [None]:
# Plotting accuracy history
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

In [None]:
# Evaluating the model on test data
# model.load_weights('best_weights.h5')
scores = model.evaluate(X_test, y_test, verbose=1)
print("Accuracy:", scores[1])

In [None]:
# Plot the accuracy history
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train'])

In [None]:
# Evaluate the model on the test data
score = model.evaluate(X_test, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

# Predict the class labels for the test data
predictions = model.predict_classes(X_test)

# Decode the predicted class labels
predictions = le.inverse_transform(predictions)

In [None]:
# Predicting the class labels
predictions = model.predict(X_test)
predictions = np.argmax(predictions, axis=1)
predictions = encoder.inverse_transform(predictions)

In [None]:
def preprocess_data(dataframe, n_mfcc=30, sr=22050):
    X = np.array(mfcc).reshape(len(X), -1)
    
    gender = dataframe["gender"].values
    gender = gender.reshape(-1, 1)
    scaler = StandardScaler()
    gender = scaler.fit_transform(gender)
    X = np.hstack((X, gender))
    
    ageRange = dataframe["ageRange"].values
    ageRange = ageRange.reshape(-1, 1)
    scaler = StandardScaler()
    ageRange = scaler.fit_transform(ageRange)
    X = np.hstack((X, ageRange))
    
    le = LabelEncoder()
    y = le.fit_transform(dataframe["label"].values)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test, le

In [None]:
X_train, X_test, y_train, y_test, le = preprocess_data(df, n_mfcc=30)

In [None]:
# Define the RNN model
def create_RNN_model(input_shape, n_classes ):
    model = Sequential()
    model.add(LSTM(128, input_shape=input_shape, return_sequences=True))
    model.add(Dropout(0.5))
    model.add(LSTM(128))
    model.add(Dropout(0.5))
    model.add(Dense(n_classes, activation='softmax'))
    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

In [None]:
model = create_RNN_model((len(X), 50, 30), np.unique(y.values))

In [None]:
# Train the model
def train_model(model, X_train, y_train, X_test, y_test, epochs=20, batch_size=32):
    history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_test, y_test))
    return model, history

In [None]:
model, history = train_model(model, X_train, y_train, X_test, y_test, epochs=1, batch_size=32)

In [None]:
# Evaluate the model
def evaluate_model(model, X_test, y_test):
    score = model.evaluate(X_test, y_test, verbose=0)
    return score

In [None]:
evaluate_model(model, X_test, y_test)

In [None]:
# Plot the model training history
def plot_history(history):
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])

In [None]:
plot_history(history)

In [None]:
def predict(model, X_test, le):
y_pred = model.predict_classes(X_test)
y_pred = le.inverse_transform(y_pred)
y_pred = pd.Series(y_pred)
return y_pred

In [None]:
y_pred = predict(model, X_test, le)

In [None]:
pd.Series(y_decoded).to_csv('predictions.csv', index='Id')