In [18]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import noisereduce as nr
from pathlib import Path
from scipy.io import wavfile
import librosa as lb
import pyAudioAnalysis as pyaudio
import IPython
import os, pickle, csv
import random, re
import ipywidgets as widgets

from glob import glob
from pyAudioAnalysis import ShortTermFeatures as aSF
from pyAudioAnalysis import MidTermFeatures as aMF
from pyAudioAnalysis import audioBasicIO as aIO
from random import shuffle
from math import floor

from IPython.display import Audio
from IPython.display import display

from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split

from hmmlearn import hmm                  

In [19]:
#Class to handle all HMM related processing
class HMMTrainer(object):
    def __init__(self, n_components=7, model_name ="GaussianHMM", cov_type='tied', n_iter=10000):
        self.model_name = model_name
        self.n_components = n_components
        self.cov_type = cov_type
        self.n_iter = n_iter
        self.models = []

        if self.model_name == 'GaussianHMM':
            self.model = hmm.GaussianHMM(n_components=self.n_components, covariance_type=self.cov_type, n_iter=self.n_iter)
        elif self.model_name == 'GMMHMM':
            self.model = hmm.GMMHMM(n_components=self.n_components, covariance_type=self.cov_type, n_iter=self.n_iter)
        elif self.model_name == 'MultinomialHMM':
            self.model = hmm.MultinomialHMM(n_components=self.n_components, n_iter=self.n_iter)
        else:
            raise TypeError("Invalid model type")

    # X is a 2D numpy array where each row is 13D
    def train(self, X):
        np.seterr(all='ignore')
        self.models.append(self.model.fit(X))

    # Run model on input data
    def get_score(self, input_data):
        return self.model.score(input_data)
    
    def get_predict(self, X):
        return self.model.predict(X, lengths=None)

In [20]:
#Noise Reduction via Spectral Grating
def noise_reduction_stationary(data, rate):
    reduced_noise = nr.reduce_noise(y=data, sr=rate, n_std_thresh_stationary=1.5, stationary=True)
    return reduced_noise

def noise_reduction_nonstationary(data, rate):
    reduced_noise = nr.reduce_noise(y=data, sr=rate, thresh_n_mult_nonstationary=1.25, stationary=False)
    return reduced_noise

In [21]:
import itertools
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    #print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [22]:
hmm_models = []

angHMM = pickle.load(open("Complete_ANGRY_model2.pkl", 'rb'))
hmm_models.append((angHMM, 'ANGRY'))

disHMM = pickle.load(open("Complete_DISGUST_model2.pkl", 'rb'))
hmm_models.append((disHMM, 'DISGUST'))

feaHMM = pickle.load(open("Complete_FEAR_model2.pkl", 'rb'))
hmm_models.append((feaHMM, 'FEAR'))

hapHMM = pickle.load(open("Complete_HAPPY_model2.pkl", 'rb'))
hmm_models.append((hapHMM, 'HAPPY'))

neuHMM = pickle.load(open("Complete_NEUTRAL_model2.pkl", 'rb'))
hmm_models.append((neuHMM, 'NEUTRAL'))

sadHMM = pickle.load(open("Complete_SAD_model2.pkl", 'rb'))
hmm_models.append((sadHMM, 'SAD'))

surHMM = pickle.load(open("Complete_SURPRISE_model2.pkl", 'rb'))
hmm_models.append((surHMM, 'SURPRISE'))

In [23]:
import anvil.server
from io import BytesIO
import os

anvil.server.connect('U3QCA6VOXKVVA47BQPXZT5UG-M3LFUQOGY5KGNTZI')

In [24]:
import anvil.media

emotions = ["ANG", "DIS", "FEA", "HAP", "NEU", "SAD", "SUR"]
emotion_labels = {
    'NEU':'NEUTRAL',
    'ANG':'ANGRY',
    'DIS':'DISGUST',
    'FEA':'FEAR',
    'HAP':'HAPPY',
    'SAD':'SAD',
    'SUR':'SURPRISE'
}

data_summary = pd.read_csv('raw_data_Complete_summary.csv')
data_summary.rename(index={0: data_summary.iat[0,0], 1: data_summary.iat[1,0], 2: data_summary.iat[2,0], 3: data_summary.iat[3,0], 4: data_summary.iat[4,0], 5: data_summary.iat[5,0], 6: data_summary.iat[6,0], 7: data_summary.iat[7,0]}, inplace=True)
data_summary = data_summary.drop(["Unnamed: 0"], axis=1)
data_summary = data_summary.drop(["35"], axis=1)

def get_normalized_features(df):
    minim = data_summary.loc["min"]
    minim = minim.to_numpy()
    minim = minim.reshape(-1,1)
    minim = minim.T
    maxim = data_summary.loc["max"]
    maxim = maxim.to_numpy()
    maxim = maxim.reshape(-1,1)
    maxim = maxim.T
    norm_features = (df-minim)/ (maxim-minim)
    norm_features
    
    return norm_features

def Filter(string, substr):
    return [str for str in string if
             any(sub in str for sub in substr)]

@anvil.server.callable
def classify_emotion(file):
    with anvil.media.TempFile(file) as filename:
        s, fs = lb.load(filename, sr=16000)
    
    file_name = file.name
    #file_name = file_name.split('.')[0]
    contains_emotion = any(label in file_name for label in emotions)
    if contains_emotion:
        for i in emotions:
            res = re.findall(i, file_name)
            if res:
                break
        #res = [x for x in emotions if re.search(file_name, x)]
        f_label = res[0]
        file_label = emotion_labels[f_label]
    else:
        file_label = "Unknown"
    win, step = 0.03, 0.015
    #blob = anvil.BlobMedia('audio/', data, name='audio.mp3')
    reduced_audio = noise_reduction_nonstationary(s, fs)
    IPython.display.Audio(data=reduced_audio, rate=fs)
    f, fn = aSF.feature_extraction(reduced_audio, fs, int(fs*win), int(fs*step))
    f_cut = f[:34]
    features = np.mean(f_cut, axis=1)
    features = features.reshape(-1,1)
    features = features.T
    column_names = list(range(1,36+1))
    column_names = [str(x) for x in column_names]
    df = pd.DataFrame(features)
    
    norm_f = get_normalized_features(df)
    
    max_score = -9999999999999999999
    output_label = None
    
    for item in hmm_models:
        hmm_model, label = item
        score = hmm_model.get_score(norm_f)
        if score > max_score:
            max_score = score
            output_label = label
    
    return output_label, score, file_name, file_label, file

def display_file(file):
    with anvil.media.TempFile(file) as filename:
        s, fs = lb.load(filename, sr=16000)


@anvil.server.callable
def speak(audio_string):
    wav_fp = BytesIO()
    tts = gTTS(audio_string, lang='en')
    tts.write_to_fp(mp3_fp)

    tts.save("my_file.mp3")

    with open('my_file.mp3', 'rb') as f:
        data=f.read()

    tts_blob = anvil.BlobMedia('audio/mpeg', data, name='audio.mp3')
    os.remove('my_file.mp3')

    return tts_blob

In [27]:
from IPython.core.display import display, HTML
display(HTML('<h1><center>Speech Emotion Recognition (HMM)</center></h1>'))

In [28]:
uploader = widgets.FileUpload(
    accept='audio/*',  # Accepted file extension e.g. '.txt', '.pdf', 'image/*', 'image/*,.pdf'
    multiple=False  # True to accept multiple files upload else False
)
display(uploader)

FileUpload(value={}, accept='audio/*', description='Upload')

In [29]:
uploader.value

{'05_01_NEU_010_01_0.wav': {'metadata': {'name': '05_01_NEU_010_01_0.wav',
   'type': 'audio/wav',
   'size': 55504,
   'lastModified': 1653987994116},
  'content': b'RIFF\xc8\xd8\x00\x00WAVEfmt \x10\x00\x00\x00\x01\x00\x01\x00\x80>\x00\x00\x00}\x00\x00\x02\x00\x10\x00data\xa4\xd8\x00\x00\xff\xff\x03\x00\x04\x00\xfe\xff\xfd\xff\xfd\xff\xfd\xff\xff\xff\xfd\xff\xf6\xff\xf7\xff\xf4\xff\xf6\xff\xf6\xff\xf5\xff\xf1\xff\xee\xff\xf3\xff\xe8\xff\xe8\xff\xe9\xff\xe2\xff\xe7\xff\xe4\xff\xe3\xff\xe1\xff\xdb\xff\xe0\xff\xdb\xff\xd7\xff\xd7\xff\xd6\xff\xd0\xff\xd2\xff\xcc\xff\xcd\xff\xce\xff\xcd\xff\xc6\xff\xbf\xff\xc5\xff\xbc\xff\xc0\xff\xc6\xff\xc2\xff\xb4\xff\xbb\xff\xb8\xff\xb8\xff\xbc\xff\xad\xff\xb0\xff\xb0\xff\xaf\xff\xae\xff\xae\xff\xb0\xff\xa8\xff\xa1\xff\xa6\xff\xa2\xff\xa0\xff\xa1\xff\x9d\xff\x9d\xff\x8c\xff\x94\xff\x93\xff\x90\xff\x95\xff\x90\xff\x8b\xff\x88\xff\x90\xff\x86\xff\x7f\xff\x83\xff\x7f\xffx\xffz\xffw\xffy\xfft\xffn\xffq\xffs\xffn\xffg\xffh\xffb\xffc\xffb\xffb\xff\\\xff]\xff\

Anvil websocket closed (code 1006, reason=Going away)


Exception in thread Thread-9 (heartbeat_until_reopened):
Traceback (most recent call last):
  File "C:\Users\Ardel\AppData\Local\Programs\Python\Python310\lib\site-packages\anvil\server.py", line 401, in call


Reconnecting Anvil Uplink...
Connecting to wss://anvil.works/uplink
Reconnection failed. Waiting 10 seconds, then retrying.


    return _do_call(args, kwargs, fn_name=fn_name)
  File "C:\Users\Ardel\AppData\Local\Programs\Python\Python310\lib\site-packages\anvil\server.py", line 393, in _do_call
    return _threaded_server.do_call(args, kwargs, fn_name=fn_name, live_object=live_object)
  File "C:\Users\Ardel\AppData\Local\Programs\Python\Python310\lib\site-packages\anvil\_threaded_server.py", line 429, in do_call
    raise error_from_server
anvil._server.AnvilWrappedError: 'Connection to Anvil Uplink server lost'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Ardel\AppData\Local\Programs\Python\Python310\lib\threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "C:\Users\Ardel\AppData\Local\Programs\Python\Python310\lib\threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Ardel\AppData\Local\Programs\Python\Python310\lib\site-packages\anvil\server.py", line 204, in heartbeat_unti