# <h1 style="text-align: center; font-size: 36px; color: #3498db; font-weight: bold;">Prosody Application</h1>
## <h2 style="text-align: center; font-size: 28px; color: #2ecc71; font-weight: bold;">Prosody Active Learning</h2>


In [1]:

import tkinter as tk
from tkinter import ttk
import pyaudio
import wave
import tensorflow as tf 
import os 
import pickle
import numpy as np
import librosa
import pandas as pd
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
import librosa.effects
from joblib import Parallel, delayed
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from keras.optimizers import Adam






In [2]:
#########################################################################################################
# Cloud Folder
import os 
if not os.path.exists("Models"):
    import gdown
    folder_url =r"https://drive.google.com/drive/folders/1EZL6Ejoa5GH8DzoZcjvRAonlgWznEh14?usp=drive_link" 
    gdown.download_folder(folder_url)
##########################################################################################################


In [3]:
###############################################
# loding the model, weights, scaler and encoder
###############################################

prosody_model = tf.keras.models.load_model(r'./Models/Prosody_Model.keras')
print ("Prosody Model has been loaded")

# loding the Scaler
with open(r"./Models/Prosody_Scaler.pickle", 'rb') as f:
    prosody_scaler = pickle.load(f)

# loding the Encoder    
with open(r"./Models/Prosody_Encoder.pickle", 'rb') as f:
    prosody_encoder = pickle.load(f)
    
OUTPUT_FILE=r"./Output/input_voice.wav"
Output_folder= r"./Output"
if not os.path.exists(Output_folder):    
    os.makedirs(Output_folder)

Prosody Model has been loaded


In [4]:
prosody_encoder.categories_

[array(['Noise', 'angry', 'disgust', 'fear', 'happy', 'neutral', 'sad',
        'surprise'], dtype=object)]

In [5]:
#####################################
# geting the features from the voice
#####################################

## ZCR: Zero Crossing Rate: The rate of sign changes of the signal during the duration of a particular frame
def zcr(data, frame_length, hop_length):
    zcr = librosa.feature.zero_crossing_rate(y=data, frame_length=frame_length, hop_length=hop_length)
    return np.squeeze(zcr)

## RMS: root mean square value
def rmse(data, frame_length=2048, hop_length=512):
    rmse = librosa.feature.rms(y=data, frame_length=frame_length, hop_length=hop_length)
    return np.squeeze(rmse)

## MFCC: Mel Frequency Cepstral Coefficients form a cepstral representation where the frequency bands are not linear but distributed according to the mel-scale
def mfcc(data,sr,frame_length=2048,hop_length=512,flatten:bool=True):
    mfcc=librosa.feature.mfcc(y=data,sr=sr)
    return np.squeeze(mfcc.T)if not flatten else np.ravel(mfcc.T)

## Extraxing the features
def extract_features(data, sr=22050, frame_length=2048, hop_length=512):
    result = np.hstack((
                        zcr(data, frame_length, hop_length),
                        rmse(data, frame_length, hop_length),
                        mfcc(data, sr, frame_length, hop_length)
                        ))
    return result

###############################
# features extraxtion function
###############################
def get_features(path):
    
    data, sr= librosa.load(path, duration=2.5, offset=0) # Extract for 2.5 seconds
    
    result=extract_features(data).reshape((1,-1))
    result = prosody_scaler.transform(result)  # Scaler
    
    return result

######################
# Prediction function
######################
def prediction(path):
    result = get_features(path)
    prediction = prosody_model.predict(result)
    y_prediction = prosody_encoder.inverse_transform(prediction.reshape(1, -1))
    predicted_class = y_prediction[0][0]
    # class probabilities
    predicted_probs = prediction[0]
    # class names from encoder
    class_names = prosody_encoder.categories_[0]
    ''''
    # Print predicted class and probabilities for all classes
    print("Predictions for all classes:")
    for label, prob in zip(class_names, predicted_probs):
        print(f"{label}: {prob*100:.2f}%")
    '''
    return predicted_class, predicted_probs



        



In [6]:

##########################
# Database
##########################

def df_database_function(database_folder):
    
    datagrams = []
    for filename in os.listdir(database_folder):
        if filename.endswith('.wav'):
            emotion = filename.split('_')[0]
            file_path = os.path.join(database_folder, filename)
            datagram = {'path': file_path, 'Emotions': emotion}
            datagrams.append(datagram)
            
    df0 = pd.DataFrame(datagrams)

    ##########################
    # Extraction process
    ###########################
    def process_feature(path, emotion):
        features = get_features(path)
        x = features.flatten()
        y = emotion
        return x, y
    
    paths = df0.path
    emotions = df0.Emotions
    ##########################%%%%%% this parallel loop is chaotic in a sensse that things no longer remian in a particular order as in the df0!################################
    # parallel loop 
    results = Parallel(n_jobs=-1)(delayed(process_feature)(path, emotion) for (path, emotion) in zip(paths, emotions))
    X = []
    Y = []
    for x,y in results:
        X.append(x)
        Y.append(y)
    df = pd.DataFrame(X)
    df['Emotions'] = Y
    
    return df

database_folder = r"./new_recordings"
df_new = df_database_function(database_folder)

In [7]:
from datetime import datetime, timezone

def get_date_string():
    current_datetime = datetime.now(timezone.utc)

    # Format the datetime as desired
    formatted_datetime = current_datetime.strftime('%Y_%m_%d_%H-%M')
    return formatted_datetime

def get_latest_experiment(experiments_dir = r"tmp"):

    # List all folders in the experiments directory
    experiment_folders = [folder for folder in os.listdir(experiments_dir)]
    if not experiment_folders: #check if empty
        return "exp_" + get_date_string()
    
    # Parse folder names and extract datetime information
    parsed_folders = []
    for folder_name in experiment_folders:
        try:
            folder_datetime = datetime.strptime(folder_name, 'exp_%Y_%m_%d_%H-%M')
            parsed_folders.append((folder_datetime, folder_name))
        except ValueError:
            print(ValueError)
            # Skip folders with names not matching the expected format
            pass

    # Sort the parsed folders based on datetime
    sorted_folders = sorted(parsed_folders, key=lambda x: x[0], reverse=True)

    # Retrieve the latest folder name
    latest_folder = sorted_folders[0][1] if sorted_folders else None

    print("Latest experiment folder:", latest_folder)

    return latest_folder



In [8]:
new_experiment = True


def model_finetune(df):
    X= df.iloc[: ,:-1]
    y = df['Emotions']
    X = prosody_scaler.fit_transform(X)
    y = prosody_encoder.transform(y.to_numpy().reshape(-1, 1)).toarray()  # no fit_transform as it will remove the previous state and will only have as many classes as in the current dataset
    X_cnn = np.expand_dims(X, axis=2)
    
    if new_experiment: 
        exp_dir = f"tmp/exp_{get_date_string()}"
    else: 
        exp_dir = get_latest_experiment(experiments_dir=r"tmp")
        
    # checkpoint_path = os.path.join(exp_dir, r"ckpts/Model_{epoch:02d}-{accuracy:.2f}-{loss:.4f}.keras")
    checkpoint_path = exp_dir + r"/ckpts/Model_{epoch:02d}-{accuracy:.2f}-{loss:.4f}.keras"
    
    model_checkpoint = ModelCheckpoint(checkpoint_path, monitor='loss', save_best_only=True, save_weights_only=False)

    early_stop = EarlyStopping(monitor='loss', mode='auto', patience=5, restore_best_weights=True)

    lr_reduction = ReduceLROnPlateau(monitor='accuracy', patience=3, verbose=1, factor=0.5, min_lr=0.000001)
    optimiser = Adam(learning_rate= 1e-3)
    prosody_model.compile(optimizer=optimiser, loss='categorical_crossentropy', metrics=['accuracy'])

    history = prosody_model.fit(X_cnn, y, epochs=50, batch_size=64, callbacks=[early_stop, lr_reduction, model_checkpoint])
    history_path = exp_dir + '/history.pkl'
    ###save the history
    with open(history_path, 'wb') as file_pi:
        pickle.dump(history.history, file_pi)
    
model_finetune(df_new)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 10: ReduceLROnPlateau reducing learning rate to 0.0005000000237487257.
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 13: ReduceLROnPlateau reducing learning rate to 0.0002500000118743628.
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 16: ReduceLROnPlateau reducing learning rate to 0.0001250000059371814.
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 19: ReduceLROnPlateau reducing learning rate to 6.25000029685907e-05.
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 22: ReduceLROnPlateau reducing learning rate to 3.125000148429535e-05.
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 25: ReduceLROnPlateau reducing learning rate to 1.5625000742147677e-05.
