<a href="https://colab.research.google.com/github/KavyaD02/Continuous_Speech_Stress_Detection/blob/main/Stress_Level_Diagnosis_LSTM_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import os
import librosa
import librosa.display
from pydub.silence import split_on_silence
from pydub import AudioSegment, effects 
from scipy.io.wavfile import read, write



def preprocess_audio(audio_file_name):

    audio_file, sr = librosa.load(audio_file_name)
    
    audio_file = librosa.effects.preemphasis(audio_file)
    audio_file = librosa.effects.trim(audio_file, top_db=20)[0]
    audio_file = librosa.util.normalize(audio_file)
    
    return sr, audio_file

In [None]:
DATA_PATH = "./Data/"

def get_file_names(path = DATA_PATH):
    file_names = os.listdir(path)
    return file_names

def process_audio(audio_file_name):
    rate, audio = read(f"./Data/{audio_file_name}")

    aud = AudioSegment(audio.tobytes(), frame_rate = rate,
                         sample_width = audio.dtype.itemsize, channels = 1)

    audio_chunks = split_on_silence(
        aud,
        min_silence_len = 1000,
        silence_thresh = -50,
        keep_silence = 500,)
    
    # audio chunks are combined here
    audio_processed = sum(audio_chunks)
    audio_processed = np.array(audio_processed.get_array_of_samples())
    audio_file = audio_file_name.split(".")[0]
    write(f"./Processed Data/{audio_file}_PROCESSED.wav", rate, audio_processed)

file_names = get_file_names()
for file_name in file_names:
    process_audio(file_name)
print("Done")

In [None]:
def get_label_dict(df, file_list, test=False):
    label_dict = {}
    
    if test:
        df_list = df['PHQ_Score']
    else:
        df_list = df['PHQ8_Score']

    for file in file_list:
        patient_num = int(file.split("/")[-1].split("_")[0])

        patient_list = list(df['Participant_ID'])

        idx = patient_list.index(patient_num)

        phq8_score = int(df_list[idx])
        
        if phq8_score in range(0, 6):
            score = 0
        elif phq8_score in range(6, 15):
            score = 1
        elif phq8_score in range(15, 22):
            score = 2
        elif phq8_score in range(22, 28):
            score = 3

        label_dict[f"{file}"] = score
    
    return label_dict

In [None]:
def get_set(df):
    df_files = [f"./Processed Data/{x}_AUDIO_PROCESSED.wav" for x in df['Participant_ID']]

    return list(set(df_files))

In [None]:
train_df, test_df, val_df = pd.read_csv(f"./train.csv"), pd.read_csv(f"./test.csv"), pd.read_csv(f"./val.csv")

train, test, val = get_set(train_df), get_set(test_df), get_set(val_df)

In [None]:
train_dict, test_dict, val_dict = get_label_dict(train_df, train), get_label_dict(test_df, test, True), get_label_dict(val_df, val)
data_dict = {**train_dict, **test_dict, **val_dict}

In [None]:
PROCESSED_DATA_PATH = "./Processed Data/"

data_list = list(data_dict.keys())
label_list = list(data_dict.values())

new_data_list = []
new_label_list = []
for data in data_list:
    audios = os.listdir(PROCESSED_DATA_PATH)
    data_2 = data.split('/')[2]
    if data_2 in audios:
        new_data_list.append(data)
        new_label_list.append(label_list[data_list.index(data)])

In [None]:
csv_path = "./Combined Features.csv"
combined_df = pd.read_csv(csv_path)
combined = list(combined_df["Combined"])
combined_features = []
for i in combined:
    new_i = i.replace('\n', ' ')
    new_i = new_i.replace('[', '')
    new_i = new_i.replace(']', '')
    array_data = np.fromstring(new_i, dtype=np.float64, sep=' ')
    combined_features.append(array_data)
print(combined_features)

[array([-4.79988800e+02,  8.19064178e+01,  2.37880669e+01,  2.76477852e+01,
        1.64174309e+01,  1.18718338e+01, -1.46540225e+00,  6.03091717e-01,
        3.43655109e+00, -2.21614882e-01,  1.62698007e+00, -2.90776347e-03,
        1.44289351e+00,  4.51081141e-04,  6.64304243e-04,  7.05572427e-04,
        6.93348760e-04,  6.08195260e-04,  4.96753200e-04,  4.12559777e-04,
        3.60709499e-04,  3.11995856e-04,  2.56050233e-04,  2.16459026e-04,
        2.10880549e-04,  2.17300185e-04, -8.55024118e-05, -9.66644002e-05,
       -4.22024568e-05,  6.02277805e-06,  2.73243350e-05,  2.84198013e-05,
        2.49642817e-05,  2.08131605e-05,  9.92893820e-06, -7.61972979e-06,
       -2.14398769e-05, -2.43459526e-05, -2.29571524e-05,  3.92782062e-01,
        4.06451583e-01,  4.09212559e-01,  4.32155341e-01,  4.71119404e-01,
        5.26509285e-01,  5.63314199e-01,  5.53642154e-01,  4.79845643e-01,
        4.36706066e-01,  4.09527898e-01,  3.95022154e-01,  7.59823394e+00,
        7.77365867e+00, 

In [None]:
X = np.array(combined_features).reshape(186, 64, 1)

In [None]:
Y = np.array(new_label_list)

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

In [None]:
train_X, test_X, train_Y, test_Y = train_test_split(X, Y, test_size = 0.20)

In [None]:
model = Sequential()
model.add(LSTM(64, input_shape=(64, 1)))
model.add(Dropout(0.5))
model.add(Dense(4, activation='softmax'))

In [None]:
from keras.callbacks import EarlyStopping
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1)

model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(train_X, train_Y, batch_size=32, epochs=70, validation_data=(test_X, test_Y))

# Evaluate the model on the test set
_, accuracy = model.evaluate(test_X, test_Y, verbose=0)
print("Accuracy on Test Set:", accuracy)

Epoch 1/70
Epoch 2/70
Epoch 3/70
Epoch 4/70
Epoch 5/70
Epoch 6/70
Epoch 7/70
Epoch 8/70
Epoch 9/70
Epoch 10/70
Epoch 11/70
Epoch 12/70
Epoch 13/70
Epoch 14/70
Epoch 15/70
Epoch 16/70
Epoch 17/70
Epoch 18/70
Epoch 19/70
Epoch 20/70
Epoch 21/70
Epoch 22/70
Epoch 23/70
Epoch 24/70
Epoch 25/70
Epoch 26/70
Epoch 27/70
Epoch 28/70
Epoch 29/70
Epoch 30/70
Epoch 31/70
Epoch 32/70
Epoch 33/70
Epoch 34/70
Epoch 35/70
Epoch 36/70
Epoch 37/70
Epoch 38/70
Epoch 39/70
Epoch 40/70
Epoch 41/70
Epoch 42/70
Epoch 43/70
Epoch 44/70
Epoch 45/70
Epoch 46/70
Epoch 47/70
Epoch 48/70
Epoch 49/70
Epoch 50/70
Epoch 51/70
Epoch 52/70
Epoch 53/70
Epoch 54/70
Epoch 55/70
Epoch 56/70
Epoch 57/70
Epoch 58/70
Epoch 59/70


Epoch 60/70
Epoch 61/70
Epoch 62/70
Epoch 63/70
Epoch 64/70
Epoch 65/70
Epoch 66/70
Epoch 67/70
Epoch 68/70
Epoch 69/70
Epoch 70/70
Accuracy on Test Set: 0.5263158082962036


In [None]:
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [None]:
import matplotlib.pyplot as plt

In [None]:
train_acc = history.history['accuracy']
loss = history.history['loss']
epochs = range(1, len(train_acc) + 1)

plt.plot(epochs, train_acc, label='Training Accuracy')
plt.title('Training Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

plt.plot(epochs, loss, label='Training Loss')
plt.title('Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
from sklearn.model_selection import train_test_split

# Loading train set and test set
X_train, X_test, y_train, y_test = get_train_test()

In [None]:
from preprocess import *
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D, LSTM
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
import tensorflow as tf

In [None]:
y_train_hot = to_categorical(y_train)
y_test_hot = to_categorical(y_test)

In [None]:
X_train = X_train.reshape(X_train.shape[0], max_len, n_mfcc)
X_test = X_test.reshape(X_test.shape[0], max_len, n_mfcc)

X_train = tf.expand_dims(X_train, axis=-1)
X_test = tf.expand_dims(X_test, axis=-1)

In [None]:
from keras.callbacks import EarlyStopping
es = EarlyStopping(monitor='val_accuracy', mode='max')
model = Sequential()
model.add(Conv2D(128, kernel_size = (3, 3), activation = 'relu', input_shape = (max_len, n_mfcc, 1)))
model.add(MaxPooling2D(pool_size = (2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(64, kernel_size = (3, 3), activation = 'relu'))
model.add(MaxPooling2D(pool_size = (2, 2)))
model.add(Conv2D(32, kernel_size = (3, 3), activation = 'relu'))
model.add(MaxPooling2D(pool_size = (2, 2)))
model.add(Conv2D(16, kernel_size = (3, 3), activation = 'relu'))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(10))
model.add(Dense(4, activation = 'softmax'))
model.compile(loss = "categorical_crossentropy",
                  optimizer = "adam",
                  metrics = ['accuracy'])

In [None]:
tf.config.run_functions_eagerly(True)
model.fit(X_train, y_train_hot, epochs = 50, validation_data = (X_test, y_test_hot))