In [1]:
# Complete script for speech emotion recognition training

import os
import json
import numpy as np
import torch
import torchaudio
from dataclasses import dataclass
from typing import Dict, List, Optional, Union, Any, Tuple
from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from packaging import version
import torchaudio

from transformers import (
    Wav2Vec2Processor,
    Wav2Vec2Model,
    Wav2Vec2PreTrainedModel,
    Trainer,
    TrainingArguments,
    AutoConfig,
    EvalPrediction,
    is_apex_available
)
from datasets import load_dataset
from transformers.file_utils import ModelOutput

# Check for APEX
if is_apex_available():
    from apex import amp

# Check for native AMP
if version.parse(torch.__version__) >= version.parse("1.6"):
    _is_native_amp_available = True
    from torch.cuda.amp import autocast

# Define SpeechClassifierOutput class
@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

# Define the classification head
class Wav2Vec2ClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

# Define the model for speech classification
class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(
            self,
            hidden_states,
            mode="mean"
    ):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

        return outputs

    def forward(
            self,
            input_values,
            attention_mask=None,
            output_attentions=None,
            output_hidden_states=None,
            return_dict=None,
            labels=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

# Data collator for padding
@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    """
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [feature["labels"] for feature in features]

        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )

        batch["labels"] = torch.tensor(label_features, dtype=d_type)

        return batch

# # Custom trainer class
# class CTCTrainer(Trainer):
#     def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
#         """
#         Perform a training step on a batch of inputs.
#         """
#         model.train()
#         inputs = self._prepare_inputs(inputs)

#         if self.use_amp:
#             with autocast():
#                 loss = self.compute_loss(model, inputs)
#         else:
#             loss = self.compute_loss(model, inputs)

#         if self.args.gradient_accumulation_steps > 1:
#             loss = loss / self.args.gradient_accumulation_steps

#         if self.use_amp:
#             self.scaler.scale(loss).backward()
#         elif self.use_apex:
#             with amp.scale_loss(loss, self.optimizer) as scaled_loss:
#                 scaled_loss.backward()
#         elif self.deepspeed:
#             self.deepspeed.backward(loss)
#         else:
#             loss.backward()

#         return loss.detach()

# class CTCTrainer(Trainer):
#     def training_step(
#         self, 
#         model: nn.Module, 
#         inputs: Dict[str, Union[torch.Tensor, Any]], 
#         num_items_in_batch: Optional[int] = None  # Add the new argument
#     ) -> torch.Tensor:
#         """
#         Perform a training step on a batch of inputs.
#         """
#         model.train()
#         inputs = self._prepare_inputs(inputs)

#         if self.use_amp:
#             with autocast():
#                 loss = self.compute_loss(model, inputs)
#         else:
#             loss = self.compute_loss(model, inputs)

#         if self.args.gradient_accumulation_steps > 1:
#             loss = loss / self.args.gradient_accumulation_steps

#         if self.use_amp:
#             self.scaler.scale(loss).backward()
#         elif self.use_apex:
#             with amp.scale_loss(loss, self.optimizer) as scaled_loss:
#                 scaled_loss.backward()
#         elif self.deepspeed:
#             self.deepspeed.backward(loss)
#         else:
#             loss.backward()

#         return loss.detach()
class CTCTrainer(Trainer):
    pass

# Main training function
def train_emotion_recognition_model():
    print("Starting speech emotion recognition model training...")
    
    # Check for GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # Load dataset
    data_files = {
        "train": "dataset/train.csv",
        "validation": "dataset/test.csv",
    }
    
    dataset = load_dataset("csv", data_files=data_files, delimiter="\t")
    train_dataset = dataset["train"]
    eval_dataset = dataset["validation"]
    
    print(f"Train dataset: {train_dataset}")
    print(f"Evaluation dataset: {eval_dataset}")
    
    # Define input and output columns
    input_column = "path"
    output_column = "emotion"
    
    # Get unique labels
    label_list = train_dataset.unique(output_column)
    label_list.sort()  # Sort for determinism
    num_labels = len(label_list)
    print(f"Classification problem with {num_labels} classes: {label_list}")
    
    # Setup model configuration
    model_name_or_path = "jonatasgrosman/wav2vec2-large-xlsr-53-english"
    pooling_mode = "mean"
    
    # Create config
    config = AutoConfig.from_pretrained(
        model_name_or_path,
        num_labels=num_labels,
        label2id={label: i for i, label in enumerate(label_list)},
        id2label={i: label for i, label in enumerate(label_list)},
        finetuning_task="wav2vec2_clf",
    )
    setattr(config, 'pooling_mode', pooling_mode)
    
    # Create processor
    processor = Wav2Vec2Processor.from_pretrained(model_name_or_path, force_download=True)
    target_sampling_rate = processor.feature_extractor.sampling_rate
    print(f"Target sampling rate: {target_sampling_rate}")
    
    # # Define preprocessing functions
    # def speech_file_to_array_fn(path):
    #     import torch
    #     speech_array, sampling_rate = torchaudio.load(path)
    #     resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
    #     speech = resampler(speech_array).squeeze().numpy()
    #     return speech
    def speech_file_to_array_fn(path):
        import torchaudio
        import torch 
    
        try:
            speech_array, sampling_rate = torchaudio.load(path)
    
            # --- Convert to mono by averaging channels if necessary ---
            if speech_array.shape[0] > 1:  # Check if number of channels > 1
                print(f"Warning: Converting stereo audio to mono: {path}") # Optional: log this
                speech_array = torch.mean(speech_array, dim=0, keepdim=True)
            # --------------------------------------------------------
    
            # Ensure sampling rate matches target
            if sampling_rate != target_sampling_rate:
                 # Initialize resampler only if needed
                resampler = torchaudio.transforms.Resample(orig_freq=sampling_rate, new_freq=target_sampling_rate)
                speech_array = resampler(speech_array) # Resample
    
            # Squeeze and convert to numpy
            # Ensure it's float32 for consistency
            speech = speech_array.squeeze().to(torch.float32).numpy()
    
            # Optional: Check for empty arrays after processing
            if speech.size == 0:
                print(f"Warning: Empty audio array after processing: {path}")
                # Return a small non-empty array to avoid downstream errors,
                # though this might impact training slightly.
                # A better approach might be to filter these files beforehand.
                return np.zeros(1, dtype=np.float32)
    
            return speech
    
        except Exception as e:
            print(f"Error processing file {path}: {e}")
            # Return a placeholder or raise the exception depending on desired behavior
            # Returning a dummy array might be safer for .map() not to fail entirely
            return np.zeros(1, dtype=np.float32) # Or handle more gracefully
    
    def label_to_id(label, label_list):
        if len(label_list) > 0:
            return label_list.index(label) if label in label_list else -1
        return label
    
    def preprocess_function(examples):
        speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
        target_list = [label_to_id(label, label_list) for label in examples[output_column]]
        
        result = processor(speech_list, sampling_rate=target_sampling_rate)
        result["labels"] = list(target_list)
        
        return result
    
    # Preprocess datasets
    print("Preprocessing train dataset...")
    train_dataset = train_dataset.map(
        preprocess_function,
        batch_size=100,
        batched=True,
        num_proc=4
    )
    
    print("Preprocessing evaluation dataset...")
    eval_dataset = eval_dataset.map(
        preprocess_function,
        batch_size=100,
        batched=True,
        num_proc=4
    )
    
    # Create data collator
    data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)
    
    # Define compute metrics function
    def compute_metrics(p: EvalPrediction):
        preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
        preds = np.argmax(preds, axis=1)
        return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}
    
    # Load pretrained model
    model = Wav2Vec2ForSpeechClassification.from_pretrained(
        model_name_or_path,
        config=config,
    )
    
    # Freeze feature extractor
    model.freeze_feature_extractor()
    
    # Define training arguments
    output_dir = "./emotion_recognition_model"
    training_args = TrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=10,
        per_device_eval_batch_size=10,
        gradient_accumulation_steps=4,
        evaluation_strategy="steps",
        num_train_epochs=8.0,
        fp16=torch.cuda.is_available(),
        # save_steps=100,
        eval_steps=20,
        logging_steps=10,
        learning_rate=1e-4,
        save_total_limit=2,
        # dataloader_num_workers=2,
        report_to="none",  # Disable wandb or other reporting
    )
    
    # Initialize trainer
    trainer = CTCTrainer(
        model=model,
        data_collator=data_collator,
        args=training_args,
        compute_metrics=compute_metrics,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        tokenizer=processor.feature_extractor,
    )
    
    # Train the model
    print("Starting model training...")
    train_result = trainer.train()
    print(f"Training completed. Training metrics: {train_result.metrics}")
    
    # Evaluate the model
    print("Evaluating model...")
    eval_result = trainer.evaluate()
    print(f"Evaluation metrics: {eval_result}")
    
    # Save the model
    print(f"Saving model to {output_dir}")
    model.save_pretrained(output_dir)
    processor.save_pretrained(output_dir)
    
    # Save training args and other configurations
    with open(f"{output_dir}/training_args.json", "w") as f:
        json.dump(training_args.to_dict(), f)
    
    # Save label mappings
    with open(f"{output_dir}/label_mappings.json", "w") as f:
        json.dump({
            "label_list": label_list,
            "num_labels": num_labels,
            "label2id": {label: i for i, label in enumerate(label_list)},
            "id2label": {i: label for i, label in enumerate(label_list)}
        }, f)
    
    print("Model training and saving completed successfully!")
    return output_dir

# Run the training function
if __name__ == "__main__":
    train_emotion_recognition_model()

Starting speech emotion recognition model training...
Using device: cuda
Train dataset: Dataset({
    features: ['name', 'path', 'emotion', 'text'],
    num_rows: 768
})
Evaluation dataset: Dataset({
    features: ['name', 'path', 'emotion', 'text'],
    num_rows: 192
})
Classification problem with 5 classes: ['anger', 'disgust', 'fear', 'happiness', 'sadness']


preprocessor_config.json:   0%|          | 0.00/262 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.53k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/300 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

Target sampling rate: 16000
Preprocessing train dataset...
Preprocessing evaluation dataset...


Some weights of Wav2Vec2ForSpeechClassification were not initialized from the model checkpoint at jonatasgrosman/wav2vec2-large-xlsr-53-english and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = CTCTrainer(


Starting model training...


Step,Training Loss,Validation Loss,Accuracy
20,5.4735,1.30483,0.46875
40,3.981,0.889456,0.703125
60,2.6401,0.729066,0.729167
80,1.6035,0.369323,0.911458
100,1.1028,0.315033,0.921875
120,0.8002,0.301259,0.9375
140,0.5486,0.248176,0.942708


Training completed. Training metrics: {'train_runtime': 457.1167, 'train_samples_per_second': 13.441, 'train_steps_per_second': 0.333, 'total_flos': 7.734316525162577e+17, 'train_loss': 2.384810395538807, 'epoch': 7.623376623376624}
Evaluating model...


Evaluation metrics: {'eval_loss': 0.26280614733695984, 'eval_accuracy': 0.9427083134651184, 'eval_runtime': 10.5812, 'eval_samples_per_second': 18.145, 'eval_steps_per_second': 1.89, 'epoch': 7.623376623376624}
Saving model to ./emotion_recognition_model
Model training and saving completed successfully!


In [1]:
# Code to upload your trained model to Huggingface Hub

from huggingface_hub import HfApi, HfFolder
from getpass import getpass
import os

def upload_model_to_hub(model_path):
    """
    Upload a trained model to the Huggingface Hub
    
    Args:
        model_path (str): Path to the saved model directory
    """

    # Set repository name
    model_name = "wav2vec2-emotion-recognition-for-ravdness"
    repo_name = "usamakenway/wav2vec2-large-xlsr-53-english-ravdess"# 
    
    # Login to Huggingface
    print("Logging in to Huggingface Hub...")
    token = getpass("Enter your Huggingface token: ")
    HfFolder.save_token(token)
    api = HfApi()
    
    # Create repository if it doesn't exist
    try:
        api.create_repo(repo_id=repo_name, exist_ok=True)
        print(f"Repository {repo_name} is ready")
    except Exception as e:
        print(f"Error creating repository: {e}")
        return
    
    # Upload model files
    print(f"Uploading model files from {model_path} to {repo_name}...")
    api.upload_folder(
        folder_path=model_path,
        repo_id=repo_name,
        repo_type="model",
        commit_message="Upload emotion recognition model"
    )
    
    print(f"Model has been uploaded to https://huggingface.co/{repo_name}")
    
    # Extract label information for the model card
    import json
    try:
        with open(os.path.join(model_path, "label_mappings.json"), "r") as f:
            label_info = json.load(f)
            label_list = label_info.get("label_list", ['happiness', 'disgust', 'anger', 'fear', 'sadness'])
    except:
        label_list = ['happiness', 'disgust', 'anger', 'fear', 'sadness']
    
    # Create model card
    print("Creating model card...")
    model_card = f"""---
language: en
license: apache-2.0
tags:
  - audio
  - speech
  - emotion-recognition
  - wav2vec2
datasets:
  - RAVDESS
model-index:
  - name: {model_name}
    results:
      - task:
          name: Speech Emotion Recognition
          type: audio-classification
        metrics:
          - name: Training Accuracy
            value: 0.9427  # Training accuracy from last epoch
          - name: Validation Accuracy
            value: 0.9427 
          - name: Training Loss
            value: 2.38  
          - name: Validation Loss
            value: 0.26  
---

# Speech Emotion Recognition Model

This model is fine-tuned for speech emotion recognition. It can detect emotions such as happiness, sadness, anger, fear, disgust, etc. in speech.

## Model Details

- Model type: Fine-tuned Wav2Vec2
- Base model: lighteternal/wav2vec2-large-xlsr-53-english
- Training data: RAVDESS dataset
- Supported emotions: {', '.join(label_list)}

## Usage

```python
from transformers import Wav2Vec2Processor, AutoModelForAudioClassification
import torchaudio
import torch

# Load model and processor
processor = Wav2Vec2Processor.from_pretrained("{repo_name}")
model = AutoModelForAudioClassification.from_pretrained("{repo_name}")

# Function to predict emotion from audio file
def predict_emotion(audio_path):
    # Load audio
    speech_array, sampling_rate = torchaudio.load(audio_path)
    
    # Resample if needed
    if sampling_rate != 16000:
        resampler = torchaudio.transforms.Resample(sampling_rate, 16000)
        speech = resampler(speech_array).squeeze().numpy()
    else:
        speech = speech_array.squeeze().numpy()
    
    # Process audio
    inputs = processor(speech, sampling_rate=16000, return_tensors="pt", padding=True)
    
    # Make prediction
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        predicted_class_id = torch.argmax(logits, dim=1).item()
        predicted_emotion = model.config.id2label[predicted_class_id]
    
    return predicted_emotion

# Example usage
emotion = predict_emotion("path/to/audio.wav")
print(f"Detected emotion: sadness")
```

## Training Details

The model was trained for 8 epochs using the following parameters:
- Learning rate: 1e-4
- Batch size: 20
- Gradient accumulation steps: 4

## Limitations

This model works best with clear speech recordings in quiet environments. Performance may vary with different accents, languages, or noisy backgrounds.
"""

    # Save and upload the model card
    with open("README.md", "w") as f:
        f.write(model_card)
        
    api.upload_file(
        path_or_fileobj="README.md",
        path_in_repo="README.md",
        repo_id=repo_name,
        repo_type="model",
        commit_message="Add model card"
    )
    
    print("Model card has been uploaded")
    print(f"Your model is now available at: https://huggingface.co/{repo_name}")

# To use this script in a notebook:
model_path = "./emotion_recognition_model"  # Path to your trained model
upload_model_to_hub(model_path)

Logging in to Huggingface Hub...


Enter your Huggingface token:  ········


Repository usamakenway/wav2vec2-large-xlsr-53-english-ravdess is ready
Uploading model files from ./emotion_recognition_model to usamakenway/wav2vec2-large-xlsr-53-english-ravdess...


model.safetensors:   0%|          | 0.00/1.27G [00:00<?, ?B/s]

Model has been uploaded to https://huggingface.co/usamakenway/wav2vec2-large-xlsr-53-english-ravdess
Creating model card...


ValueError: Invalid metadata in README.md.
- "model-index[0].results[0].metrics[0].type" is required
- "model-index[0].results[0].metrics[1].type" is required
- "model-index[0].results[0].metrics[2].type" is required
- "model-index[0].results[0].metrics[3].type" is required