### Importing the required libraries

In [1]:
import librosa
import pickle
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.model_selection import GridSearchCV
from matplotlib.pyplot import specgram
import pandas as pd
from sklearn.metrics import confusion_matrix
import os # interface with underlying OS that python is running on
import sys
import warnings
# ignore warnings 
if not sys.warnoptions:
    warnings.simplefilter("ignore")
warnings.filterwarnings("ignore", category=DeprecationWarning)
from sklearn.preprocessing import LabelEncoder
import keras
from sklearn.metrics import classification_report
from keras.models import Sequential, Model, model_from_json
from keras.layers import Conv1D, MaxPooling2D, AveragePooling1D
from keras.layers import Input, Flatten, Dropout, Activation, BatchNormalization
from keras.layers import Dense, Embedding, LSTM
from keras.regularizers import l2
from keras.constraints import max_norm
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import layers, models, Model, optimizers
import seaborn as sns
from keras.utils import to_categorical
from glob import glob
import cv2
from sklearn.utils import shuffle
from keras.applications import VGG16
from sklearn import preprocessing
from sklearn.ensemble import RandomForestClassifier
from keras.utils import  to_categorical
from sklearn import metrics
from sklearn.metrics import confusion_matrix
from sklearn.metrics import multilabel_confusion_matrix, confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from keras.models import load_model


### Data Preprocessing

In [202]:
audio = "/Users/abiramvyas/Documents/My_Projects/Ravdess/audio_speech_actors_01-24"
dicts={'01' : 'neutral', '02' : 'calm', '03' : 'happy', '04' : 'sad', '05' : 'angry', '06' : 'fearful', '07' : 'disgust', '08' : 'surprised'}
path_main=audio
folders_main=os.listdir(path_main)

In [237]:
class audio_processing:
    """
    This class is mainly created to preprocess the entire audio data. This includes trimming the data, manipulating it 
    based on the user requirement. 
    ----------
    Attributes
    ----------
    path_to_folder : str
        path to access audio files
    manipulate : str
        audio manipulation - stretch, normal, noise
    path_to_save : str
        final path to save the file

    ----------
    Methods
    ----------
    preprocess_data(path_to_folder,manipulate,path_to_save)
        accesses the files present inside a given path and readies the data to be given to mel_spectrogram
    spectrogram_mod(load_path,manipulate,op_path,emotion)
        modifies the audio data, adds additional features such as noise or stretch and saves the data as an img
    noise(y)
        induces white noise onto the audio
    stretch(y)
        increase the duration of the audio by stretching the amplitude
    """

    def __init__(self,path_to_folder,manipulate,path_to_save) -> None: 
        """
        Constructs all the necessary attributes for the object
        ----------
        Parameters
        ----------
        path_to_folder : str
            path to access audio files
        manipulate : str
            audio manipulation - stretch, normal, noise
        path_to_save : str
            final path to save the file
        """
        self.path_to_folder = path_to_folder
        self.manipulate = manipulate
        self.path_to_save = path_to_save

    def preprocess_data(self,path_to_folder,manipulate,path_to_save):
        """
        Preprocessing of the audio file by navigating through the main folders and passing it to mel_spectrogram
        ----------
        Parameters
        ----------
        path_to_folder : str
            path to access audio files
        manipulate : str
            audio manipulation - stretch, normal, noise
        path_to_save : str
            final path to save the file
        -------
        Returns
        -------
        Prints counter value as and when folders have been processed
        """
        cnt = 0
        for i in folders_main:
            ip_path = path_to_folder+'/{0}'.format(folders)
            if ip_path != path_to_folder + '/.DS_Store':
                files_inside_folder = os.listdir(ip_path)
                for j in files_inside_folder:
                    num_vals = re.findall('\d+',j)
                    emotion = dicts[num_vals[2]]
                    
                    op_path = path_to_save + emotion + '/' + j + '.jpeg'
                    load_path = '{0}/{1}'.format(ip_path,j)

                    self.spectrogram_mod(load_path,manipulate,op_path,emotion)
            cnt+=1
            print("Folders processed is {0}/24".format(cnt))

    def spectrogram_mod(self,load_path,manipulate,path_to_save,emotion):
        """
        Converts the audio to the Mel scale by passing it through a fast fourier transform. Finally saving the audio
        as an image
        ----------
        Parameters
        ----------
        load_path : str
            path to access audio files
        manipulate : str
            audio manipulation - stretch, normal, noise
        path_to_save : str
            final path to save the file
        emotion : str
            emotion label obtained from the files
        -------
        Returns
        -------
        Image of the audio files processed in a .jpeg format
        """
        y, sample_rate = librosa.load(load_path)
        ytrim,_ = librosa.effects.trim(y)
        y = ytrim

        if manipulate == "normal":
            y = librosa.feature.melspectrogram(y=y, sr=sample_rate, n_mels=128,fmax=8000)
            
        elif manipulate == "noise":
            y = self.noise(y)
            y = librosa.feature.melspectrogram(y=y, sr=sample_rate, n_mels=128,fmax=8000)
        else:
            y = self.stretch(y,0.8)
            y = librosa.feature.melspectrogram(y=y, sr=sample_rate, n_mels=128,fmax=8000)

        db_spec = librosa.power_to_db(y)
        librosa.display.specshow(db_spec, y_axis='mel', fmax=20000, x_axis='time')
        
        if os.path.isdir('/Users/abiramvyas/Documents/My_Projects/Ravdess/{0}/{1}/'.format(manipulate,emotion)):
            
            plt.savefig(path_to_save)
        else:
            os.makedirs('/Users/abiramvyas/Documents/My_Projects/Ravdess/{0}/{1}/'.format(manipulate,emotion))
            plt.savefig(path_to_save)

    def noise(self,y):
        """
        Induces noise into the data
        ----------
        Parameters
        ----------
        y : array
            array of the audio data from files
        -------
        Returns
        -------
        Data with noise induced
        """
        amp_noise = 0.01*np.random.uniform()*np.amax(y)
        y = y + amp_noise *np.random.normal(size=y.shape[0])
        return y

    def stretch(self,y, rate):
        """
        Induces stretch into the data
        ----------
        Parameters
        ----------
        y : array
            array of the audio data from files
        -------
        Returns
        -------
        Data stretched with rate coming from the user. Default is 0.8
        """
        data = librosa.effects.time_stretch(y, rate = rate)
        return data

audio_process = audio_processing('/Users/abiramvyas/Documents/My_Projects/Ravdess/audio_speech_actors_01-24',
"normal","/Users/abiramvyas/Documents/My_Projects/Ravdess/normal/")
audio_process.preprocess_data('/Users/abiramvyas/Documents/My_Projects/Ravdess/audio_speech_actors_01-24',
"normal","/Users/abiramvyas/Documents/My_Projects/Ravdess/normal/")


### Pre-Modelling

In [None]:
class pre_modelling :
    """
    This class is mainly created to get the data ready for modelling. This class includes functions to fetch the 
    manipulated data, and performing the train test split for modelling 
    ----------
    Attributes
    ----------
    No specific input attributes needed
    ----------
    Methods
    ----------
    img_fetch(directory)
        accesses the files present inside a given path and readies the data to be given to be split into train-test
    train_test_data_split(img_,labels_)
        Peforms a 70-30 split of the data based on the images obtained through the img_fetch function
    run_steps(directory)
        runs the img_fetch and the train test split functions and returns the X_train, X_test, Y_train, Y_test
    """

    def __init__(self) -> None:
        """
        Constructs all the necessary attributes for the object
        ----------
        Parameters
        ----------
        No input parameters are needed
        """
        self.Images = []
        self.Labels = []
        
    def img_fetch(self,directory):
        """
        Obtains the processed images from the desired location. It could be a local drive or google drive
        ----------
        Parameters
        ----------
        directory : str
            Directory where all the preprocessed files are stored
        -------
        Returns
        -------
        Resized shuffled images to make them consistent and its labels to be used for train and test
        """
        Images = []
        Labels = []
        label_ = ''

        for labels in os.listdir(directory):
            if labels != '.DS_Store':
                if labels == 'angry':
                    label_ = 'angry'
                elif labels == 'calm':
                    label_ = 'calm'
                elif labels == 'disgust':
                    label_ = 'disgust'
                elif labels == 'fearful':
                    label_ = 'fearful'
                elif labels == 'happy':
                    label_ = 'happy'
                elif labels == 'neutral':
                    label_ = 'neutral'
                elif labels == 'sad':
                    label_ = 'sad'
                elif labels == 'surprised':
                    label_ = 'surprised'

                for i in os.listdir(directory+labels): 
                    img_ = cv2.imread(directory+labels+r'/'+i) 
                    img_ = cv2.resize(img_,(224,224))
                    Images.append(img_)
                    Labels.append(label_)

        return shuffle(Images,Labels,random_state=200) 

    def run_steps(self):
        """
        Runs all the steps needed prior to modelling. This will retrieve the data, calls the train test split function
        and sends the input data for modelling
        ----------
        Parameters
        ----------
        directory : str
            Directory where all the preprocessed files are stored
        -------
        Returns
        -------
        X_train, X_test, y_train, y_test
        """
        
        Images_norm, Labels_norm  = self.img_fetch('/Users/abiramvyas/Documents/My_Projects/Ravdess/normal/')
        Images_stretch, Labels_stretch  = self.img_fetch('/Users/abiramvyas/Documents/My_Projects/Ravdess/stretch/') 
        Images_noise, Labels_noise = self.img_fetch('/Users/abiramvyas/Documents/My_Projects/Ravdess/noise/')

        img_ = Images_norm + Images_noise + Images_stretch
        labels_ = Labels_norm + Labels_noise + Labels_stretch
        X_train, X_test, y_train, y_test, lb = self.train_test_data_split(img_,labels_)

        return X_train, X_test, y_train, y_test 

    def train_test_data_split(self,img_,labels_):
        """
        Performs the train test split of the data, one hot encodes the Y variable and normalizes the X variables
        ----------
        Parameters
        ----------
        img_ : arr
            Array of various images - stretched, normal, noise induced
        labels_ : str
            Emotion labels to be one hot encoded
        -------
        Returns
        -------
        X_train, X_test, y_train, y_test variables processed and one hot encoded
        """
        X_train, X_test, y_train, y_test = train_test_split(img_, labels_, test_size=0.3,
                                                            random_state=22, stratify=labels_)
        lb = LabelEncoder()
        X_train=np.array(X_train)
        X_test=np.array(X_test)
        
        X_train = X_train.astype('float32')
        X_test = X_test.astype('float32')
        X_train /= 255
        X_test /= 255

        y_train = to_categorical(lb.fit_transform(y_train))
        y_test = to_categorical(lb.fit_transform(y_test))

        return X_train, X_test, y_train, y_test,lb

run_premodel_steps = pre_modelling()
X_train, X_test, y_train, y_test,lb_  = run_premodel_steps.run_steps()

### Modelling

In [286]:
class modelling:
    """
    This class is mainly created to perform the modelling process. This class includes functions to fetch the perform
    transfer learning using VGG16 and fitting the extracted features onto a random forest classifier for predictions
    ----------
    Attributes
    ----------
    X_train : array
        Training data
    X_test : array
        Test data
    y_train : array
        Label training data
    y_test : array
        Label test data
    ----------
    Methods
    ----------
    run_vgg16(X_train,X_test,y_train,y_test)
        calls the VGG16 model and builds a CNN to fit our data
    ensemble_classifier(vgg_transfer_model,X_train,X_test,y_train,y_test)
        uses the VGG16 model and extracts its features to train a RF classifier
    """
    def __init__(self,X_train,X_test,y_train,y_test) -> None:
        """
        Constructs all the necessary attributes for the object
        ----------
        Parameters
        ----------
        X_train : array
            Training data
        X_test : array
            Test data
        y_train : array
            Label training data
        y_test : array
            Label test data
        """
        self.X_train = X_train
        self.X_test = X_test
        self.y_train = y_train
        self.y_test = y_test

    def run_vgg16(self,X_train,X_test,y_train,y_test,lb_,):
        """
        Calls a VGG16 model (pre-trained). It is specified that we want to train the last three layers of the data.
        The output layers are added to the model using softmax as the output activation layer. Model is saved as well
        ----------
        Parameters
        ----------
        X_train : array
            Training data
        X_test : array
            Test data
        y_train : array
            Label training data
        y_test : array
            Label test data
        -------
        Returns
        -------
        Confusion matrix along with the y_yte
        """
        vgg16_model = VGG16(weights='imagenet',
                include_top=False,
                input_shape=(224, 224, 3))

        x_pretrained = vgg16_model.output
        x_pretrained = Flatten()(x_pretrained)
        x_pretrained = Dense(256, activation='relu')(x_pretrained)
        x_pretrained = Dropout(0.1)(x_pretrained)
        x_pretrained = Dense(256, activation='relu')(x_pretrained)
        x_pretrained = Dense(len(lb.classes_), activation='softmax')(x_pretrained)
        vgg_transfer_model = Model(inputs=vgg16_model.input, outputs=x_pretrained)

        # I want to train the last three layers alone. Others I want to use from existing trained model
        for layer in vgg_transfer_model.layers[:19]:
            layer.trainable = False
        
        for i, layer in enumerate(vgg_transfer_model.layers):
            print(i, layer.name, layer.trainable)
        
        lr= 5e-5
        opt = tf.keras.optimizers.legacy.RMSprop(learning_rate=lr)
        vgg_transfer_model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"])
        history = vgg_transfer_model.fit(X_train, y_train, batch_size = 16, epochs=50, validation_data=(X_test,y_test))
        vgg_transfer_model.save("vgg16_base_v2.h5")
        cm,y_test,pred_rf = self.ensemble_classifier(vgg_transfer_model,X_train,X_test,y_train,y_test)
        return cm, y_test, pred_rf

    def ensemble_classifier(self,vgg_transfer_model,X_train,X_test,y_train,y_test):
        """
        The VGG16 model's training features are extracted and is fit on a random forest classifier. This is done since
        there is a lack of data and resources to run a stand-alone VGG16 model. This is done to improve the accuracy 
        of the model
        ----------
        Parameters
        ----------
        vgg_transfer_model : model_obj
            The VGG16 model that has been fit on the data
        X_train : array
            Training data
        X_test : array
            Test data
        y_train : array
            Label training data
        y_test : array
            Label test data
        -------
        Returns
        -------
        The confusion matrix post prediction of the test data, the y_test and the rf_predictions
        """
        feature_extractor = vgg_transfer_model.predict(X_train)
        features = feature_extractor.reshape(feature_extractor.shape[0], -1)
        
        RF_model = RandomForestClassifier(n_estimators=100, random_state=42)
        RF_model.fit(features, y_train)

        X_test_feature = vgg_transfer_model.predict(X_test)
        X_test_features = X_test_feature.reshape(X_test_feature.shape[0], -1)

        prediction_RF = RF_model.predict(X_test_features)
        cm = multilabel_confusion_matrix(y_test, prediction_RF)
        pickle.dump(RF_model,open("rf_model.sav","wb"))
        confusion_matrix_final =classification_report(
        y_test,
        prediction_RF,
        )
        return(confusion_matrix_final,y_test,prediction_RF)

run_modelling = modelling(X_train,X_test,y_train,y_test)
final_result = run_modelling.run_vgg16(X_train,X_test,y_train,y_test,lb_)

### Unseen forecast

In [288]:
'''
Recording my voice through python using the package PyAudio. It is needed that the recording is stored in a .wav format.
If there is no requirement to record the voice through python, a file can be uploaded by the user as well
'''
import pyaudio
import wave

chunk = 1024 
sample_format = pyaudio.paInt16 
channels = 1
fs = 44100  # Record at 44100 samples per second
seconds = 3
filename = "/Users/abiramvyas/Documents/My_Projects/av_op1.wav"

p = pyaudio.PyAudio()

print('Recording')

stream = p.open(format=sample_format,
                channels=channels,
                rate=fs,
                frames_per_buffer=chunk,
                input=True)

frames = [] 


for i in range(0, int(fs / chunk * seconds)):
    data = stream.read(chunk)
    frames.append(data)

# Stop and close the stream 
stream.stop_stream()
stream.close()

p.terminate()

print('Finished recording')

# Save the recorded data as a WAV file
wf = wave.open(filename, 'wb')
wf.setnchannels(channels)
wf.setsampwidth(p.get_sample_size(sample_format))
wf.setframerate(fs)
wf.writeframes(b''.join(frames))
wf.close()

In [287]:
'''
Pre-processing the audio and saving it as a .jpeg image
'''
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.applications.vgg16 import preprocess_input
from keras.applications.vgg16 import decode_predictions
import re

y_new, sr_new = librosa.load('/Users/abiramvyas/Documents/My_Projects/av_op1.wav')
yt_new,__=librosa.effects.trim(y_new)
y_new=yt_new
y_new = librosa.feature.melspectrogram(y=y_new, sr=sr_new, n_mels=128,fmax=8000)
db_spec_new = librosa.power_to_db(y_new)
librosa.display.specshow(db_spec_new, y_axis='mel', fmax=20000, x_axis='time')
if os.path.isdir('/Users/abiramvyas/Documents/My_Projects/Ravdess/unseen/'):
    plt.savefig('/Users/abiramvyas/Documents/My_Projects/Ravdess/unseen/new_recording2.jpeg')
else:
    os.makedirs('/Users/abiramvyas/Documents/My_Projects/Ravdess/unseen/')
    plt.savefig('/Users/abiramvyas/Documents/My_Projects/Ravdess/unseen/new_recording2.jpeg')


loaded_img = load_img('/Users/abiramvyas/Documents/My_Projects/Ravdess/unseen/new_recording2.jpeg',target_size=(224,224))

img_arr = img_to_array(loaded_img)
img_arr = img_arr/255
img_arr_final = img_arr.reshape(1, img_arr.shape[0],img_arr.shape[1],img_arr.shape[2])

loaded_model = load_model('/Users/abiramvyas/Documents/My_Projects/Ravdess/Results/vgg16_base.h5')
rf_loaded = pickle.load(open("/Users/abiramvyas/Documents/My_Projects/Ravdess/Results/rf_model.sav", 'rb'))

X_test_feature = loaded_model.predict(img_arr_final)
X_test_features = X_test_feature.reshape(X_test_feature.shape[0], -1)

prediction_RF = rf_loaded.predict(X_test_features)

emotion_dict = {0:'angry', 1:'calm', 2:'disgust', 3:'fearful', 4:'happy', 5:'neutral',6:'sad',7:'surprised'}
location_of_emotion = int(np.where(prediction_RF == 1)[1])
emotion_dict.get(location_of_emotion)


'fearful'

The above output is given as an input to the Spotify process. And songs are then recommended for the same