<a href="https://colab.research.google.com/github/PiyumaliSandunika/e18-4yp-Multimodal-Emotion-Prediction-Using-Reinforcement-Learning/blob/main/ensemble_new.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import joblib
import librosa
import soundfile
import os
import numpy as np
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score

emotions = {'01': 'Happy', '02': 'Sad', '03': 'Angry', '04': 'Neutral'}
observed_emotions = ['Happy', 'Sad', 'Angry', 'Neutral']

def extract_feature(file_name, mfcc, chroma, mel):
    X, sample_rate = librosa.load(file_name, res_type='kaiser_fast')
    if chroma:
        stft = np.abs(librosa.stft(X))
    result = np.array([])
    if mfcc:
        mfccs = np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40).T, axis=0)
        result = np.hstack((result, mfccs))
    if chroma:
        chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T, axis=0)
        result = np.hstack((result, chroma))
    if mel:
        mel = np.mean(librosa.feature.melspectrogram(y=X, sr=sample_rate).T, axis=0)
        result = np.hstack((result, mel))
    return result

def predict_one(model,audio_file):
    # model = joblib.load("TrainedUsing95percCommonData&IncLearnedUsingPersAllDataBatchSize1.pkl")
    feature = extract_feature(audio_file, mfcc=True, chroma=True, mel=True)
    x = np.array([feature])
    pred = model.predict(x)
    y_pred_logits = model.predict_proba(np.array(x))
    probabilities_list = y_pred_logits.squeeze().tolist()
    print(probabilities_list)
    print(pred)
    return probabilities_list, pred

def predict_batch(model,folder_path):
    # model = joblib.load("TrainedUsing95percCommonData&IncLearnedUsingPersAllDataBatchSize1.pkl")
    x, y_true = [], []
    for folder in observed_emotions:
        filepath = os.path.join(folder_path, folder)
        for filename in os.listdir(filepath):
            feature = extract_feature(os.path.join(filepath, filename), mfcc=True, chroma=True, mel=True)
            x.append(feature)
            y_true.append(folder)
    y_pred_logits = model.predict_proba(np.array(x))
    return y_pred_logits, y_true



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from transformers import DistilBertForSequenceClassification, DistilBertTokenizer
import torch
import pickle
import joblib
# Assuming device is either "cpu" or "cuda:0"
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model_path = "/content/drive/MyDrive/FYP_Text/emotion_text_classification_model.pkl"
text_model = joblib.load(model_path)
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')


In [None]:
import torch

def predict_emotion_with_probabilities(sentence, model, tokenizer, device):
    # Tokenize the input sentence
    inputs = tokenizer(sentence, return_tensors="pt").to(device)

    # Perform inference
    outputs = model(**inputs)
    logits = outputs.logits.to(device)

    # Apply softmax to get probabilities
    probabilities = torch.nn.functional.softmax(logits, dim=1)

    # Convert probabilities tensor to list
    probabilities_list = probabilities.squeeze().tolist()

    # Map predicted label to emotion
    emotion_labels = ["happiness", "sadness", "anger", "neutral"]
    predicted_emotion = emotion_labels[torch.argmax(logits, dim=1).item()]

    return probabilities_list, predicted_emotion


In [None]:

device = "cuda:0" if torch.cuda.is_available() else "cpu"
sentence = "Yesterday was good"
audio_file = 'Happy_out_8.wav'
model_vocal = joblib.load("TrainedUsing95percCommonData&IncLearnedUsingPersAllDataBatchSize1.pkl")
probabilities1, _ = predict_emotion_with_probabilities(sentence, text_model, tokenizer, device)
probabilities2, _ = predict_one(model_vocal,audio_file)

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


[5.047011827514818e-10, 0.9999999950462861, 5.1431132235680246e-11, 4.3975816617435085e-09]
['Happy']


In [None]:
print(probabilities1)
print(probabilities2)

[0.6171170473098755, 0.13686636090278625, 0.02497868239879608, 0.2210378646850586]
[5.047011827514818e-10, 0.9999999950462861, 5.1431132235680246e-11, 4.3975816617435085e-09]


In [None]:
# def ensemble_predictions(probabilities1,probabilities2, model_weights=None):

#     # Apply model-wise weights if provided
#     if model_weights:
#         probabilities1_ = [p1 * w for p1, w in zip(probabilities1, model_weights[0])]
#         probabilities2_ = [p2 * w for p2, w in zip(probabilities2, model_weights[1])]

#     # # Apply class-wise weights if provided
#     # if class_weights:
#     #     probabilities1 = [p1 * w for p1, w in zip(probabilities1, class_weights)]
#     #     probabilities2 = [p2 * w for p2, w in zip(probabilities2, class_weights)]

#     # Calculate ensemble probabilities by averaging
#     ensemble_probabilities = [(p1 + p2) / 2 for p1, p2 in zip(probabilities1_, probabilities2_)]

#     # Map ensemble probabilities to emotion labels
#     emotion_labels = ["happiness", "sadness", "anger", "neutral"]
#     predicted_emotion = emotion_labels[np.argmax(ensemble_probabilities)]

#     return probabilities1, probabilities2, ensemble_probabilities, predicted_emotion



In [None]:
def ensemble_predictions(probabilities1, probabilities2, user_labels, model_weights=None, learning_rate=0.1):
    # Apply model-wise weights if provided
    # if model_weights:
    probabilities1_ = [p1 * w for p1, w in zip(probabilities1, model_weights[0])]
    probabilities2_ = [p2 * w for p2, w in zip(probabilities2, model_weights[1])]

    # Calculate ensemble probabilities by averaging
    ensemble_probabilities = [(p1 + p2) / 2 for p1, p2 in zip(probabilities1_, probabilities2_)]

    # Map ensemble probabilities to emotion labels
    emotion_labels = ["happiness", "sadness", "anger", "neutral"]
    predicted_emotion = emotion_labels[np.argmax(ensemble_probabilities)]

    # Update model weights using online learning
    if model_weights:
        # Update weights based on whether the predicted emotion matches the user-provided label
        for i, (p1, p2, user_label) in enumerate(zip(probabilities1, probabilities2, user_labels)):
            if predicted_emotion == user_label:
                # Increase weights for correct predictions
                model_weights[0][i] += learning_rate
                model_weights[1][i] += learning_rate
            else:
                # Decrease weights for incorrect predictions
                model_weights[0][i] -= learning_rate
                model_weights[1][i] -= learning_rate

    return probabilities1, probabilities2, ensemble_probabilities, predicted_emotion, model_weights


In [None]:
# Example usage with model and class weights
# sentence = "This is a test sentence."
model_weights = [[1.0, 1.0, 1.0, 1.0], [1.0,1.0,1.0,1.0]]  # Example weights for each model
class_weights = [1.0, 1.0, 0.8, 1.2]  # Example weights for each class
probabilities1_new, probabilities2_new, ensemble_probabilities, predicted_emotion,w = ensemble_predictions(probabilities1,probabilities2, 'happiness',w,0.05)
print("Probabilities (Model 1):", probabilities1_new)
print("Probabilities (Model 2):", probabilities2_new)
print("Ensemble Probabilities:", ensemble_probabilities)
print("Ensemble Predicted Emotion:", predicted_emotion)
print(w)

Probabilities (Model 1): [0.6171170473098755, 0.13686636090278625, 0.02497868239879608, 0.2210378646850586]
Probabilities (Model 2): [5.047011827514818e-10, 0.9999999950462861, 5.1431132235680246e-11, 4.3975816617435085e-09]
Ensemble Probabilities: [-5.047021676367172e-11, -0.0999999995046287, -2.5715605982458395e-12, 4.3975813089790893e-10]
Ensemble Predicted Emotion: neutral
[[-0.05000000000000032, -0.05000000000000032, -0.05000000000000032, -0.05000000000000032], [-0.2500000000000001, -0.2500000000000001, -0.15000000000000022, 0.14999999999999947]]


In [None]:
# Assuming device is either "cpu" or "cuda:0"
device = "cuda:0" if torch.cuda.is_available() else "cpu"



In [None]:
import pickle
import numpy as np
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
# Load pretrained models for vocal tone classification and text classification
vocal_tone_model = joblib.load("TrainedUsing95percCommonData&IncLearnedUsingPersAllDataBatchSize1.pkl")
text_model = joblib.load("/content/drive/MyDrive/FYP_Text/emotion_text_classification_model.pkl")
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [None]:
import numpy as np
from sklearn.neural_network import MLPClassifier
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
import torch

# Define Fusion Module for ensembling
class FusionModule:
    def __init__(self, vocal_tone_model, text_model, tokenizer):
        self.vocal_tone_model = vocal_tone_model
        self.text_model = text_model
        self.tokenizer = tokenizer

    def combine_predictions(self, vocal_tone_features, text_features):
        # Predictions from vocal tone model
        vocal_tone_prediction = self.vocal_tone_model.predict(vocal_tone_features.reshape(1, -1))

        # Predictions from text model
        text_inputs = self.tokenizer.encode_plus(
            text_features,
            add_special_tokens=True,
            max_length=512,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt'
        )
        text_inputs = {k: v.squeeze(0) for k, v in text_inputs.items()}
        text_prediction = self.text_model(**text_inputs)[0]
        text_prediction = torch.argmax(text_prediction).item()

        # Combine predictions (e.g., simple averaging)
        fused_prediction = (vocal_tone_prediction + text_prediction) / 2  # Simple averaging

        return fused_prediction

In [None]:
# Predictions from vocal tone model
vocal_tone_features = extract_feature('Happy_out_8.wav', mfcc=True, chroma=True, mel=True)
vocal_tone_prediction = vocal_tone_model.predict(vocal_tone_features.reshape(1, -1))

# # Predictions from text model
# text_inputs = self.tokenizer.encode_plus(
#     text_features,
#     add_special_tokens=True,
#     max_length=512,
#     padding='max_length',
#     return_attention_mask=True,
#     return_tensors='pt'
# )
# text_inputs = {k: v.squeeze(0) for k, v in text_inputs.items()}
# text_prediction = self.text_model(**text_inputs)[0]
# text_prediction = torch.argmax(text_prediction).item()

# # Combine predictions (e.g., simple averaging)
# fused_prediction = (vocal_tone_prediction + text_prediction) / 2  # Simple averaging


In [None]:
text_features = tokenizer('This is great', return_tensors="pt").to(device)

In [None]:
# Example code to get text features
sentence = "I am feeling happy today!"
text_features = tokenizer(sentence, return_tensors="pt", padding=True, truncation=True)


Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


# **Vocal tone --> Text conversion**

In [None]:
import speech_recognition as sr

def audio_to_text(audio_file):
    # Initialize the recognizer
    recognizer = sr.Recognizer()

    # Load audio file
    with sr.AudioFile(audio_file) as source:
        # Adjust for ambient noise
        recognizer.adjust_for_ambient_noise(source)

        # Listen for speech and transcribe
        audio_data = recognizer.record(source)
        try:
            text = recognizer.recognize_google(audio_data)
            return text
        except sr.UnknownValueError:
            print("Speech Recognition could not understand audio")
            return None
        except sr.RequestError as e:
            print("Could not request results from Speech Recognition service; {0}".format(e))
            return None


In [None]:
%pip install SpeechRecognition


Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.



In [None]:
import os

emotions = {"happiness": 0, "sadness": 1, "anger": 2, "neutral": 3}
observed_emotions = ['Happy', 'Sad', 'Angry', 'Neutral']

x, y_label = [], []
folder_path = "C:/Users/HP/Downloads/audio_dataset"
for folder in observed_emotions:
    filepath = os.path.join(folder_path, folder)
    print(filepath)
    for filename in os.listdir(filepath):
        # print(os.path.join(filepath, filename))
        hj = os.path.join(filepath, filename)
        try:

            transcribed_text = audio_to_text(hj)
            if transcribed_text:
                print("Transcribed Text:", transcribed_text)
                x.append(transcribed_text)
                y_label.append(folder)
        except:
            print("Error occurred while processing:", hj)
        # feature = extract_feature(os.path.join(filepath, filename), mfcc=True, chroma=True, mel=True)
        # x.append(feature)
        # y_true.append(folder)

C:/Users/HP/Downloads/audio_dataset\Happy
Transcribed Text: it's such a great day spend a time
Transcribed Text: office recording let's
Transcribed Text: Sunny Deol beaches you happiness
Transcribed Text: very very beautiful day I love this yeah of course it's a very very happy
Transcribed Text: very very funny right yeah he is a very very funny person I like that I
Transcribed Text: Google you are really cool person I like you I want to
Transcribed Text: yes of course it's really nice I do like that I want
Transcribed Text: understand that your school completing
Transcribed Text: give me a sense of achievement feeling the worth of the son of my skin makes
Transcribed Text: money in my pocket that I forgot about places of Christ
Transcribed Text: it's really good but since you thank you from some
Transcribed Text: always make me happy sing my favourite man performs live version
Transcribed Text: feel appreciate find the money in my pocket that I forgot about
Transcribed Text: I can't h

In [None]:
import pandas as pd
df = pd.DataFrame(list(zip(x, y_label)), columns=['content', 'emotion'])

In [None]:
df

Unnamed: 0,content,emotion
0,it's such a great day spend a time,Happy
1,office recording let's,Happy
2,Sunny Deol beaches you happiness,Happy
3,very very beautiful day I love this yeah of co...,Happy
4,very very funny right yeah he is a very very f...,Happy
...,...,...
83,Yadav about a situation the weather today's ne...,Neutral
84,contact DC and see what happens today size in,Neutral
85,I am feeling OK nothing special happen,Neutral
86,parrot non spinal about today's plans Just Ano...,Neutral


In [None]:
df.to_csv("transcribed.csv", index=False)

In [None]:
# # Assuming have functions to load text and audio data, and to perform speech-to-text conversion and feature extraction

# # Step 1: Load data
# text_data = load_text_data("text_samples.csv")
# audio_data = load_audio_data("audio_samples.wav")

# # Step 2: Convert audio data into text
# converted_text_data = []
# for audio_sample in audio_data:
#     converted_text = audio_to_text(audio_sample)
#     converted_text_data.append(converted_text)

# # Step 3: Extract features
# text_features = extract_text_features(text_data)
# audio_features = extract_audio_features(converted_text_data)

# # Step 4: Make initial predictions
# probabilities1, probabilities2 = model_predictions(text_features, audio_features)

# # Step 5: Gather user feedback or ground truth labels
# user_labels = get_user_labels(text_data, audio_data)  # Assuming you have a way to get user feedback

# # Step 6: Update model weights based on feedback
# updated_probabilities1, updated_probabilities2, ensemble_probabilities, predicted_emotion, model_weights = ensemble_predictions(probabilities1, probabilities2, user_labels, model_weights, learning_rate=0.1)

# # Step 7: Repeat for each sample
# # This step depends on the size of your dataset and the available computational resources

# # Step 8: Evaluate performance on a validation set
# validation_text_data = load_text_data("validation_text.csv")
# validation_audio_data = load_audio_data("validation_audio.wav")
# validation_user_labels = get_user_labels(validation_text_data, validation_audio_data)
# validation_text_features = extract_text_features(validation_text_data)
# validation_audio_features = extract_audio_features(validation_audio_data)
# validation_probabilities1, validation_probabilities2 = model_predictions(validation_text_features, validation_audio_features)
# accuracy = evaluate(validation_probabilities1, validation_probabilities2, validation_user_labels)
# print("Accuracy:", accuracy)
