In [7]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
from transformers import HubertModel, Wav2Vec2FeatureExtractor, RobertaTokenizer, RobertaModel

from datasets import load_from_disk

import numpy
import librosa


In [2]:
iemocap = load_from_disk('./iemocap')
iemocap

DatasetDict({
    train: Dataset({
        features: ['file', 'audio', 'frustrated', 'angry', 'sad', 'disgust', 'excited', 'fear', 'neutral', 'surprise', 'happy', 'EmoAct', 'EmoVal', 'EmoDom', 'gender', 'transcription', 'major_emotion', 'speaking_rate', 'pitch_mean', 'pitch_std', 'rms', 'relative_db'],
        num_rows: 10039
    })
})

In [4]:
emotion_labels = ['neutral', 'happy', 'sad', 'angry', 'frustrated', 'excited', 'fear', 'disgust', 'surprise']
label_to_idx = {label: idx for idx, label in enumerate(emotion_labels)}

In [44]:
example = iemocap['train'][0]
example

{'file': 'Ses01F_impro01_F000.wav',
 'audio': {'path': 'Ses01F_impro01_F000.wav',
  'array': array([-0.0050354 , -0.00497437, -0.0038147 , ..., -0.00265503,
         -0.00317383, -0.00418091]),
  'sampling_rate': 16000},
 'frustrated': 0.0062500000931322575,
 'angry': 0.0062500000931322575,
 'sad': 0.0062500000931322575,
 'disgust': 0.0062500000931322575,
 'excited': 0.0062500000931322575,
 'fear': 0.0062500000931322575,
 'neutral': 0.949999988079071,
 'surprise': 0.0062500000931322575,
 'happy': 0.0062500000931322575,
 'EmoAct': 2.3333330154418945,
 'EmoVal': 2.6666669845581055,
 'EmoDom': 2.0,
 'gender': 'Female',
 'transcription': ' Excuse me.',
 'major_emotion': 'neutral',
 'speaking_rate': 5.139999866485596,
 'pitch_mean': 202.79881286621094,
 'pitch_std': 76.12785339355469,
 'rms': 0.00788376946002245,
 'relative_db': -17.938434600830078}

In [None]:
auido_checkpooint = "facebook/hubert-base-ls960"
text_checkpoint = 'roberta-base"

feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(auido_checkpooint)
hubert_model = HubertModel.from_pretrained(auido_checkpooint)

roberta_model = RobertaModel.from_pretrained(text_checkpoint)
tokenizer = RobertaTokenizer.from_pretrained(text_checkpoint)
hubert_model.eval()
roberta_model.eval()

# Extract HuBERT features
def extract_hubert_features(audio_array, sampling_rate=16000):
    # audio_tensor = torch.tensor(audio_array).unsqueeze(0)
    audio_tensor = torch.tensor(audio_array)
    
    with torch.no_grad():
        input_values = feature_extractor(audio_tensor, return_tensors="pt", sampling_rate=sampling_rate).input_values
        hubert_features = hubert_model(input_values).last_hidden_state

    # print(f'hubert_features: {hubert_features.shape}')    
    return torch.mean(hubert_features, dim=1)  # avg pooling
  
# Extract RoBERTa features
def extract_roberta_features(transcript):
    tokens = tokenizer(transcript, return_tensors="pt", padding=True, truncation=True, max_length=128)
    with torch.no_grad():
        roberta_output = roberta_model(**tokens)
    
    # print(f'roberta_output: {roberta_output.last_hidden_state[:, 0, :].shape}')
    return roberta_output.last_hidden_state[:, 0, :]  # CLS

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
class Classifier(nn.Module):
    def __init__(self, audio_dim=768, text_dim=768, hidden_dim=512, num_classes=9):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(audio_dim + text_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, num_classes)
    
    def forward(self, audio_emb, text_emb):
        x = torch.cat((audio_emb, text_emb), dim=1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

In [38]:
import torch.optim as optim
import torch.nn.functional as F

classifier = Classifier()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(classifier.parameters(), lr=0.0005)

def train_step(audio_array, transcript, label):
    classifier.train()

    audio_emb = extract_hubert_features(audio_array)
    # print(f'audio_emb: {audio_emb.shape}')
    text_emb = extract_roberta_features(transcript)
    # print(f'text_emb: {text_emb.shape}')

    output = classifier(audio_emb, text_emb)
    # print(f'output: {output.shape}')

    target = torch.tensor([label]).long()
    loss = criterion(output, target)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return loss.item()

In [None]:
num_samples = 200

for i in range(num_samples):
    sample = iemocap['train'][i]
    
    # Extract audio and transcript
    audio_array = sample['audio']['array']
    transcript = sample['transcription']
    
    
    # Get emotion label
    label = label_to_idx[sample['major_emotion']]

    # Train
    loss = train_step(audio_array, transcript, label)
    
    if i % 10 == 0:
        print(f'Step {i}/{num_samples}, Loss: {loss:.4f}')

Step 0/200, Loss: 2.2045
Step 10/200, Loss: 1.9784
Step 20/200, Loss: 0.8544
Step 30/200, Loss: 5.3096
Step 40/200, Loss: 2.8518
Step 50/200, Loss: 1.1458
Step 60/200, Loss: 0.3454
Step 70/200, Loss: 4.1522
Step 80/200, Loss: 2.7448
Step 90/200, Loss: 1.8373
Step 100/200, Loss: 2.2920
Step 110/200, Loss: 1.4440
Step 120/200, Loss: 1.4263
Step 130/200, Loss: 1.1908
Step 140/200, Loss: 3.0186
Step 150/200, Loss: 2.8815
Step 160/200, Loss: 1.0167
Step 170/200, Loss: 0.7201
Step 180/200, Loss: 0.2134
Step 190/200, Loss: 2.9178


In [45]:
def predict(audio_array, transcript):
    classifier.eval()
    
    # Extract features
    audio_embedding = extract_hubert_features(audio_array)
    text_embedding = extract_roberta_features(transcript)

    with torch.no_grad():
        output = classifier(audio_embedding, text_embedding)
        predicted_label = torch.argmax(F.softmax(output, dim=1), dim=1).item()

    return emotion_labels[predicted_label]

correct = 0
n_samples = 100
for i in range(1000, 1000 + n_samples, 1):
    sample = iemocap['train'][i]
    prediction = predict(sample['audio']['array'], sample['transcription'])
    if prediction == sample['major_emotion']:
        correct += 1

print(f'total: {n_samples}, correct = {correct}, acc = {correct / n_samples}')

total: 100, correct = 49, acc = 0.49
