In [2]:
from transformers import Wav2Vec2Processor, Wav2Vec2Model
import librosa
import torch
import pandas as pd
import numpy as np
import os
import random
import pickle
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

In [3]:
!pip install ray
!pip install optuna
import optuna
from ray import tune, air
from ray.tune.search.optuna import OptunaSearch
from ray.air import session
from ray.air.config import RunConfig, ScalingConfig
from ray.tune import Tuner



In [4]:
#dataset
unseen_emotion = ""
folder = r"/content/drive/MyDrive/Audio_Speech_Actors_01-24/Actor_01"
path = r"/content/drive/MyDrive/Audio_Speech_Actors_01-24/Actor_01"
# where emotion word embeddings are stored
fasttext_folder='/content/drive/MyDrive/emotion_vectors'
#where to save the model
model_save = 'advanced_embedding_mapper.pth'
#where the pickle is saved
pickle_path = '/content/drive/MyDrive/USER/fileTensorDict.pckl'
seed = 420

In [5]:
def get_emotion_vector(filename):
    parts = filename.split('-')
    third_number = parts[2]

    emotion_vector_label = None
    if third_number == '05':
        emotion_vector_label = 'angry'
    elif third_number == '02':
        emotion_vector_label = 'calm'
    elif third_number == '07':
        emotion_vector_label = 'disgust'
    elif third_number == '06':
        emotion_vector_label = 'fearful'
    elif third_number == '03':
        emotion_vector_label = 'happy'
    elif third_number == '01':
        emotion_vector_label = 'neutral'
    elif third_number == '04':
        emotion_vector_label = 'sad'
    elif third_number == '08':
        emotion_vector_label = 'surprised'

    return emotion_vector_label

In [6]:
def load_emotion_vectors(folder):
    emotion_vectors = {}
    for filename in os.listdir(folder):
        if filename.endswith('.txt'):
            emotion_name = filename.split('.')[0]
            filepath = os.path.join(folder, filename)
            with open(filepath, 'r') as file:
                vector = [float(line.strip()) for line in file]
                emotion_vectors[emotion_name] = torch.tensor(vector, dtype=torch.float32)
    return emotion_vectors

In [7]:
def addToDict(folder):
  emo_dict = {}
  for file in os.listdir(folder):
      emo_dict[file] = get_emotion_vector(file)
  str_emo_dict = str(emo_dict)
  with open("vectors.txt", "a") as vec:
    vec.write(str_emo_dict)

for x in os.listdir(folder):
  addToDict(folder)

In [8]:
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
wav_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base-960h")


Error while fetching `HF_TOKEN` secret value from your vault: 'Requesting secret HF_TOKEN timed out. Secrets can only be fetched when running from the Colab UI.'.
You are not authenticated with the Hugging Face Hub in this notebook.
If the error persists, please let us know by opening an issue on GitHub (https://github.com/huggingface/huggingface_hub/issues/new).
Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
def get_vector_from_audio(path):
  audio_input, sampling_rate = librosa.load(path, sr=16000)
  inputs = processor(audio_input, sampling_rate=16000, return_tensors="pt", padding=True)
  with torch.no_grad():
    outputs = wav_model(**inputs)

  hidden_states = outputs.last_hidden_state

  vector_rep = torch.mean(hidden_states, dim=1)
  return vector_rep



In [10]:
def get_embeddings():
  filename_vector_dict = {}
  # path = r"/content/drive/MyDrive/Audio_Speech_Actors_01-24/Actor_01"
  for f in os.listdir(path):
    file_path = os.path.join(path,f)
    emov = get_vector_from_audio(file_path)
    filename_vector_dict[f] = emov
  return filename_vector_dict

In [11]:
def load_fasttext_embedding(emotion_label, fasttext_folder):
  filepath = os.path.join(fasttext_folder, f'{emotion_label}.txt')
  if not os.path.exists(filepath):
      raise FileNotFoundError(f"Embedding file for {emotion_label} not found in {folder}")

  embedding = []
  with open(filepath, 'r') as file:
      for line in file:
          embedding.append(float(line.strip()))

  return embedding


In [12]:
def map_fasttext_to_wav2vec(wav2vec_dict, fasttext_folder='emotion_vectors'):
    fasttext_vector_dict = {}
    for filename, wav2vec_embedding in wav2vec_dict.items():
        emotion_label = get_emotion_vector(filename)
        fasttext_embedding = load_fasttext_embedding(emotion_label, fasttext_folder)
        fasttext_embedding = torch.tensor(fasttext_embedding, dtype=torch.float32)
        fasttext_vector_dict[filename] = (wav2vec_embedding, fasttext_embedding)
    return fasttext_vector_dict

In [13]:
# filename_vector_dict = get_embeddings()
# vector_map = map_fasttext_to_wav2vec(filename_vector_dict,fasttext_folder)

In [14]:
def check_or_create_vector_map(pickle_path, fasttext_folder):
    if os.path.exists(pickle_path):
        with open(pickle_path, 'rb') as f:
            data = pickle.load(f)
            if 'vector_map' in data:
                print("vector_map loaded from pickle file.")
                return data['vector_map']

    # If the file does not exist or vector_map is not in the file, create it
    filename_vector_dict = get_embeddings()
    vector_map = map_fasttext_to_wav2vec(filename_vector_dict, fasttext_folder)

    # Save the vector_map to the pickle file
    with open(pickle_path, 'wb') as f:
        pickle.dump({'vector_map': vector_map}, f)
    print("vector_map created and saved to pickle file.")

    return vector_map

vector_map = check_or_create_vector_map(pickle_path, fasttext_folder)

vector_map loaded from pickle file.


In [15]:
def split_sets(dictionary, unseen_emotion, train_ratio=0.8, seed=420):
    # Set the random seed for reproducibility
    random.seed(seed)

    # Separate keys for the unseen emotion and other emotions
    unseen_keys = [key for key in dictionary.keys() if get_emotion_vector(key) == unseen_emotion]
    filtered_keys = [key for key in dictionary.keys() if key not in unseen_keys]

    # Shuffle the filtered keys
    random.shuffle(filtered_keys)

    # Calculate the number of training samples needed from the filtered data
    total_samples = len(dictionary)
    num_train_samples = int(total_samples * train_ratio)
    num_test_samples = total_samples - num_train_samples

    # Adjust the number of test samples from the filtered data
    num_test_samples_from_filtered = num_test_samples - len(unseen_keys)

    # Ensure there are enough samples in the filtered data
    if num_test_samples_from_filtered < 0:
        raise ValueError("Not enough samples in the filtered data to maintain the overall split ratio.")

    # Split the filtered keys into training and test sets
    train_keys = filtered_keys[:num_train_samples]
    test_keys = filtered_keys[num_train_samples:num_train_samples + num_test_samples_from_filtered]

    # Create training and test dictionaries from the filtered data
    train_dict = {key: dictionary[key] for key in train_keys}
    test_dict = {key: dictionary[key] for key in test_keys}

    # Add the unseen emotion samples to the test dictionary
    test_dict.update({key: dictionary[key] for key in unseen_keys})

    # Check for overlaps
    train_keys_set = set(train_dict.keys())
    test_keys_set = set(test_dict.keys())
    overlapping_keys = train_keys_set & test_keys_set
    if overlapping_keys:
        raise ValueError(f"Overlapping filenames found between training and test sets: {overlapping_keys}")


    return train_dict, test_dict

# Example usage
train_dict, test_dict = split_sets(vector_map, unseen_emotion)

# Check the counts
print("Training samples:", len(train_dict))
print("Test samples:", len(test_dict))

# Ensure no unseen emotion samples in the training set
print("Unseen emotion in training set:", any(get_emotion_vector(key) == unseen_emotion for key in train_dict.keys()))
print("Unseen emotion in test set:", any(get_emotion_vector(key) == unseen_emotion for key in test_dict.keys()))
print("Unseen emotion:", unseen_emotion)


Training samples: 1152
Test samples: 288
Unseen emotion in training set: False
Unseen emotion in test set: False
Unseen emotion: 


In [16]:
class CNNEmbeddingMapper(nn.Module):

  def __init__(self):
    super(CNNEmbeddingMapper, self).__init__()
    self.conv1 = nn.Conv1d(in_channels=768, out_channels=512, kernel_size=3, padding=1)
    self.bn1 = nn.BatchNorm1d(512)
    self.conv2 = nn.Conv1d(in_channels=512, out_channels=256, kernel_size=3, padding=1)
    self.bn2 = nn.BatchNorm1d(256)
    self.conv3 = nn.Conv1d(in_channels=256, out_channels=128, kernel_size=3, padding=1)
    self.bn3 = nn.BatchNorm1d(128)
    self.conv4 = nn.Conv1d(in_channels=128, out_channels=300, kernel_size=3, padding=1)
    self.dropout = nn.Dropout(0.2)
    self.relu = nn.ReLU()


  def forward(self, x):

    x = x.transpose(1, 2)
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.dropout(x)

    x = self.conv2(x)
    x = self.bn2(x)
    x = self.relu(x)
    x = self.dropout(x)

    x = self.conv3(x)
    x = self.bn3(x)
    x = self.relu(x)
    x = self.dropout(x)

    x = self.conv4(x)
    x = x.transpose(1, 2)
    return x

In [17]:
# class RNNEmbeddingMapper(nn.module):
#   def __init__(self):

In [18]:
def create_dataloader(data_dict, batch_size=2, shuffle=True):
    wav2vec_tensors = []
    fasttext_tensors = []

    for key in data_dict:
        wav2vec_tensors.append(data_dict[key][0])
        fasttext_tensors.append(data_dict[key][1])

    X = torch.stack(wav2vec_tensors)
    Y = torch.stack(fasttext_tensors)

    dataset = TensorDataset(X, Y)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

    return dataloader

train_dataloader = create_dataloader(train_dict)
test_dataloader = create_dataloader(test_dict, shuffle=False)

In [19]:
def get_activation_function(name):
    if name == 'ReLU':
        return nn.ReLU()
    elif name == 'Tanh':
        return nn.Tanh()
    elif name == 'Sigmoid':
        return nn.Sigmoid()
    elif name == 'LeakyReLU':
        return nn.LeakyReLU()
    else:
        raise ValueError(f"Unknown activation function: {name}")

In [20]:
class AdvancedEmbeddingMapper(nn.Module):
    def __init__(self, hidden_size, dropout_rate, activation_function):
        super(AdvancedEmbeddingMapper, self).__init__()
        self.fc1 = nn.Linear(768, hidden_size)
        self.fc2 = nn.Linear(hidden_size, 300)
        self.dropout = nn.Dropout(dropout_rate)  # Increase dropout rate for better regularization
        self.activation_function = get_activation_function(activation_function)

    def forward(self, x):
        x = self.fc1(x)
        x = self.activation_function(x)
        x = self.fc2(x)
        x = self.dropout(x)
        return x



In [None]:
def objective(trial):
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'SGD', 'RMSprop'])
    hidden_size = trial.suggest_int('hidden_size', 128, 1024)
    dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5)
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-1)
    activation_function = trial.suggest_categorical('activation_function', ['ReLU', 'Tanh', 'Sigmoid', 'LeakyReLU'])
    num_epochs = trial.suggest_int('num_epochs', 5, 100)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = AdvancedEmbeddingMapper(hidden_size, dropout_rate, activation_function).to(device)
    criterion = nn.CosineEmbeddingLoss()

    # Initialize optimizer
    if optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    elif optimizer_name == 'SGD':
        optimizer = optim.SGD(model.parameters(), lr=learning_rate)
    elif optimizer_name == 'RMSprop':
        optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs): #change 10 to num_epochs when training final model, takes too long
        model.train()
        epoch_loss = 0
        for batch_x, batch_y in train_dataloader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = model(batch_x)
            # Create a label tensor filled with 1s
            labels = torch.ones(outputs.size(0)).to(outputs.device)

            # Flatten
            outputs = outputs.view(outputs.size(0), -1)
            batch_y = batch_y.view(batch_y.size(0), -1)

            loss = criterion(outputs, batch_y, labels)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

        # Validation loop
        model.eval()
        valid_loss = 0
        with torch.no_grad():
            for batch_x, batch_y in test_dataloader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)
                outputs = model(batch_x)
                outputs = outputs.view(outputs.size(0), -1)
                batch_y = batch_y.view(batch_y.size(0), -1)

                labels = torch.ones(outputs.size(0), device=device)
                loss = criterion(outputs, batch_y, labels)
                valid_loss += loss.item()

        valid_loss /= len(test_dataloader)
        trial.report(valid_loss, epoch)

        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return valid_loss

# Create Optuna study and optimize
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=1)

# Print the best hyperparameters
print('Best hyperparameters:', study.best_params)
best_params = study.best_params

[I 2024-08-13 19:01:07,090] A new study created in memory with name: no-name-9f5fab87-d18e-452e-bced-1e99ced3aa06
  learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-1)
[W 2024-08-13 19:02:25,974] Trial 0 failed with parameters: {'optimizer': 'SGD', 'hidden_size': 653, 'dropout_rate': 0.35052041596716443, 'learning_rate': 0.030593109611005826, 'activation_function': 'Sigmoid', 'num_epochs': 58} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/optuna/study/_optimize.py", line 196, in _run_trial
    value_or_values = func(trial)
  File "<ipython-input-21-ab376fd02a47>", line 27, in objective
    outputs = model(batch_x)
  File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1532, in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1541, in _call_impl
    return for

In [None]:
best_params = study.best_params

optimizer_name = best_params['optimizer']
hidden_size = best_params['hidden_size']
dropout_rate = best_params['dropout_rate']
learning_rate = best_params['learning_rate']
activation_function = best_params['activation_function']
num_epochs = best_params['num_epochs']

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
criterion = nn.CosineEmbeddingLoss()


model = AdvancedEmbeddingMapper(hidden_size, dropout_rate, activation_function)
#model = CNNEmbeddingMapper()

# Initialize optimizer
if optimizer_name == 'Adam':
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
elif optimizer_name == 'SGD':
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)
elif optimizer_name == 'RMSprop':
    optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)

def train_model(model, train_dataloader, best_params):

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        for batch_x, batch_y in train_dataloader:
            optimizer.zero_grad()
            outputs = model(batch_x)
            # Create a label tensor filled with 1s
            labels = torch.ones(outputs.size(0)).to(outputs.device)

            #flatten
            outputs = outputs.view(outputs.size(0), -1)
            batch_y = batch_y.view(batch_y.size(0), -1)

            loss = criterion(outputs, batch_y, labels)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()



        if (epoch + 1) % 1 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss / len(train_dataloader):.4f}')
            #evaluate_model_more(model, test_dataloader)


train_model(model, train_dataloader, best_params)


# Save the model and optimizer state
model_save_path = 'advanced_embedding_mapper.pth'
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'best_params': best_params
}, model_save_path)

print(f'Model saved to {model_save_path}')

#train_model(CNNmodel, train_dataloader_cnn)

In [None]:
def evaluate_model_more(model, test_dataloader, train_dataloader):
    model.eval()
    test_loss = 0
    all_targets = []
    all_predictions = []

    with torch.no_grad():
        for batch_x, batch_y in test_dataloader:
            outputs = model(batch_x).squeeze(1)  # Squeeze to remove singleton dimension

            # Create a label tensor filled with 1s
            labels = torch.ones(outputs.size(0)).to(outputs.device)

            #flatten
            outputs = outputs.view(outputs.size(0), -1)
            batch_y = batch_y.view(batch_y.size(0), -1)

            loss = criterion(outputs, batch_y, labels)
            test_loss += loss.item()

            all_targets.append(batch_y)
            all_predictions.append(outputs)

    # Compute average test loss
    test_loss /= len(test_dataloader)
    #train_loss /= len(train_dataloader)

    # Concatenate all targets and predictions
    all_targets = torch.cat(all_targets).cpu().numpy()
    all_predictions = torch.cat(all_predictions).cpu().numpy()

    # Compute additional metrics
    mse = mean_squared_error(all_targets, all_predictions)
    mae = mean_absolute_error(all_targets, all_predictions)
    r2 = r2_score(all_targets, all_predictions)

    #print(f'Train Loss: {train_loss:.4f}')
    print(f'Test Loss: {test_loss:.4f}')
    print(f'Mean Squared Error (MSE): {mse:.4f}')
    print(f'Mean Absolute Error (MAE): {mae:.4f}')
    print(f'R-squared (R²): {r2:.4f}')

evaluate_model_more(model, test_dataloader, train_dataloader)

In [None]:
def cosine_similarity(model_output, target):
  cos = nn.CosineSimilarity(dim=0, eps=1e-6)
  output = cos(model_output, target)

  return output

# def evaluate_cosine():

#   model = torch.tensor(model(batch_x).squeeze(1))
#   target = torch.tensor(load_fasttext_embedding(unseen_emotion, fasttext_folder))

# cosine_similarity(model, target)

In [None]:
def custom_cosine_similarity(tensor1, tensor2):
    # Flatten the tensors if they are not 1-dimensional
    if tensor1.dim() != 1:
        tensor1 = tensor1.view(-1)
    if tensor2.dim() != 1:
        tensor2 = tensor2.view(-1)

    # Compute the dot product between the two tensors
    dot_product = torch.dot(tensor1, tensor2)

    # Compute the L2 norm (Euclidean norm) of each tensor
    norm_tensor1 = torch.norm(tensor1, p=2)
    norm_tensor2 = torch.norm(tensor2, p=2)

    # Compute the cosine similarity
    cosine_similarity = dot_product / (norm_tensor1 * norm_tensor2)

    return cosine_similarity.item()

In [None]:
def find_most_similar_emotion(predicted_vector, emotion_vectors):
    similarities = []
    emotions = []

    for emotion, vector in emotion_vectors.items():
      similarity = custom_cosine_similarity(predicted_vector, vector)
      similarities.append(similarity)
      emotions.append(emotion)

    # Convert similarities to numpy array and use argmax to find the highest similarity
    similarities = np.array(similarities)
    max_index = np.argmax(similarities)

    most_similar_emotion = emotions[max_index]
    max_similarity = similarities[max_index]

    return most_similar_emotion, max_similarity

In [None]:
def calculate_cosine_similarity(model, test_dataloader, test_dict, emotion_vectors):
  model.eval()
  results = {}
  correct_predictions = 0

  with torch.no_grad():
      indices_list = list(test_dataloader.batch_sampler)

      for batch_idx, (batch_x, batch_y) in enumerate(test_dataloader):
          outputs = model(batch_x)
          batch_indices = indices_list[batch_idx]

          for i, output in enumerate(outputs):
              global_index = batch_indices[i]
              filename = list(test_dict.keys())[global_index]
              cosine_sim = custom_cosine_similarity(output, batch_y[i])
              predicted_emotion, similarity_score = find_most_similar_emotion(output, emotion_vectors)
              actual_emotion = get_emotion_vector(filename)
              emotion_similarity = custom_cosine_similarity(output, emotion_vectors[predicted_emotion])

              results[filename] = {
                  'cosine_similarity': cosine_sim,
                  'predicted_emotion': predicted_emotion,
                  'actual_emotion': actual_emotion,
                  'emotion_similarity': emotion_similarity
              }
              if predicted_emotion == actual_emotion:
                  correct_predictions += 1

  print(f'Number of correct labels: {correct_predictions}')
  return results
emotion_vectors = load_emotion_vectors(fasttext_folder)
results = calculate_cosine_similarity(model, test_dataloader, test_dict, emotion_vectors)

# Print the results
for filename, result in results.items():
    print(f'{filename}: Cosine Similarity: {result["cosine_similarity"]}, Predicted Emotion: {result["predicted_emotion"]}, '
          f'Actual Emotion: {result["actual_emotion"]}, Emotion Similarity: {result["emotion_similarity"]}')

**Testing ground below**

In [None]:
!pip install fasttext
import fasttext
import fasttext.util
wav_file = "/content/drive/MyDrive/Audio_Speech_Actors_01-24/Actor_01/03-01-01-01-01-01-01.wav"
ft = fasttext.load_model('drive/MyDrive/fasttext/cc.en.300.bin')

In [None]:
def knn(custom_vector):
  # Get all words in the vocabulary
  words = ft.get_words()

  # Create a list to hold (word, similarity) pairs
  similarity_list = []

  for word in words:
      word_vector = ft.get_word_vector(word)
      similarity = cosine_similarity(custom_vector, word_vector)
      similarity_list.append((word, similarity))

  # Sort the list by similarity in descending order
  similarity_list.sort(key=lambda x: x[1], reverse=True)

  # Get the top N similar words
  top_n = 10  # Change this to get more or fewer words
  top_similar_words = similarity_list[:top_n]

  # Print the results
  for word, similarity in top_similar_words:
      print(f"Word: {word}, Similarity: {similarity}")

In [None]:
def sample_to_tensor(wav_file):
    audio_input, sampling_rate = librosa.load(wav_file, sr=16000)  # Ensure the sample rate is 16kHz

    # Process the audio to create the input tensor
    inputs = processor(audio_input, sampling_rate=16000, return_tensors="pt", padding=True)

    # Extract the input values (this is the tensor you'll pass to the model)
    audio_tensor = inputs.input_values

    return audio_tensor

In [None]:
def run_model_on_sample(model, audio_tensor):
    model.eval()
    with torch.no_grad():
        # Pass through the model
        audio_tensor = audio_tensor.unsqueeze(0)
        output = model(audio_tensor)

    return output

In [None]:
tensor = get_vector_from_audio(wav_file)
output = run_model_on_sample(model, tensor)
knn(output)