In [None]:
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
import torch
import torchaudio
from transformers import Wav2Vec2Processor, Wav2Vec2ForSequenceClassification
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# Define emotion mapping for RAVDESS
emotion_map = {
    1: 'neutral',
    2: 'calm',
    3: 'happy',
    4: 'sad',
    5: 'angry',
    6: 'fearful',
    7: 'disgust',
    8: 'surprised'
}

# Create dataframe from RAVDESS directory
def create_dataframe(dataset_path):
    data = []
    for root, dirs, files in os.walk(dataset_path):
        for file in files:
            if file.endswith('.wav'):
                path = os.path.join(root, file)
                # RAVDESS filename format: 03-01-06-01-02-01-12.wav
                # Emotion is the 3rd number (06 in this example)
                emotion_code = int(file.split('-')[2])
                emotion = emotion_map[emotion_code]
                data.append({'path': path, 'emotion': emotion, 'label': emotion_code-1})  # -1 to make labels 0-based
    return pd.DataFrame(data)

# Replace with your RAVDESS dataset path
dataset_path = 'path/to/ravdess/audio'
df = create_dataframe(dataset_path)

# Split data into train and test
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])

In [None]:
from torch.utils.data import Dataset, DataLoader

class EmotionDataset(Dataset):
    def __init__(self, df, processor, max_length=16000*4):
        self.df = df
        self.processor = processor
        self.max_length = max_length  # 4 seconds at 16kHz

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        audio_path = self.df.iloc[idx]['path']
        label = self.df.iloc[idx]['label']
        
        # Load audio file
        waveform, sample_rate = torchaudio.load(audio_path)
        
        # Resample if necessary (Wav2Vec2 expects 16kHz)
        if sample_rate != 16000:
            resampler = torchaudio.transforms.Resample(sample_rate, 16000)
            waveform = resampler(waveform)
        
        # Convert stereo to mono if needed
        if waveform.shape[0] > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)
        
        # Normalize waveform
        waveform = waveform / torch.max(torch.abs(waveform))
        
        # Trim or pad audio to max_length
        if waveform.shape[1] > self.max_length:
            waveform = waveform[:, :self.max_length]
        else:
            padding = self.max_length - waveform.shape[1]
            waveform = torch.nn.functional.pad(waveform, (0, padding))
        
        # Process with Wav2Vec2 processor
        input_values = self.processor(
            waveform.squeeze().numpy(), 
            sampling_rate=16000, 
            return_tensors="pt"
        ).input_values.squeeze()
        
        return {
            'input_values': input_values,
            'label': torch.tensor(label, dtype=torch.long)
        }

# Initialize processor
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")

# Create datasets
train_dataset = EmotionDataset(train_df, processor)
test_dataset = EmotionDataset(test_df, processor)

# Create dataloaders
batch_size = 4
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
from transformers import Wav2Vec2ForSequenceClassification

# Load pre-trained Wav2Vec2 model with classification head
model = Wav2Vec2ForSequenceClassification.from_pretrained(
    "facebook/wav2vec2-base-960h",
    num_labels=len(emotion_map),
    problem_type="single_label_classification"
)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Training setup
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
criterion = torch.nn.CrossEntropyLoss()

In [None]:
def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch in tqdm(dataloader, desc="Training"):
        input_values = batch['input_values'].to(device)
        labels = batch['label'].to(device)
        
        optimizer.zero_grad()
        
        outputs = model(input_values)
        logits = outputs.logits
        loss = criterion(logits, labels)
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        _, predicted = torch.max(logits, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    
    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total
    return avg_loss, accuracy

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_values = batch['input_values'].to(device)
            labels = batch['label'].to(device)
            
            outputs = model(input_values)
            logits = outputs.logits
            loss = criterion(logits, labels)
            
            total_loss += loss.item()
            _, predicted = torch.max(logits, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total
    return avg_loss, accuracy, all_preds, all_labels

# Training loop
num_epochs = 10
best_accuracy = 0

for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    
    # Train
    train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
    
    # Evaluate
    val_loss, val_acc, val_preds, val_labels = evaluate(model, test_loader, criterion, device)
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
    # Save best model
    if val_acc > best_accuracy:
        best_accuracy = val_acc
        torch.save(model.state_dict(), "best_wav2vec2_emotion_model.pt")
        print("Saved best model!")
    
    # Print classification report
    print(classification_report(val_labels, val_preds, target_names=list(emotion_map.values())))

In [None]:
def predict_emotion(audio_path, model, processor, device, max_length=16000*4):
    # Load and preprocess audio
    waveform, sample_rate = torchaudio.load(audio_path)
    
    # Resample if necessary
    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(sample_rate, 16000)
        waveform = resampler(waveform)
    
    # Convert stereo to mono if needed
    if waveform.shape[0] > 1:
        waveform = torch.mean(waveform, dim=0, keepdim=True)
    
    # Normalize
    waveform = waveform / torch.max(torch.abs(waveform))
    
    # Trim or pad
    if waveform.shape[1] > max_length:
        waveform = waveform[:, :max_length]
    else:
        padding = max_length - waveform.shape[1]
        waveform = torch.nn.functional.pad(waveform, (0, padding))
    
    # Process
    input_values = processor(
        waveform.squeeze().numpy(), 
        sampling_rate=16000, 
        return_tensors="pt"
    ).input_values.to(device)
    
    # Predict
    model.eval()
    with torch.no_grad():
        outputs = model(input_values)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=1)
        predicted_class = torch.argmax(probabilities, dim=1).item()
    
    return emotion_map[predicted_class + 1], probabilities.cpu().numpy()[0]

# Example usage
audio_path = "path/to/test/audio.wav"
emotion, probabilities = predict_emotion(audio_path, model, processor, device)
print(f"Predicted emotion: {emotion}")
print("Probabilities:", {e: f"{p:.4f}" for e, p in zip(emotion_map.values(), probabilities)})