<a href="https://colab.research.google.com/github/chesseraf/Audio-emotion-recognition/blob/main/Audio_Emotion_Regression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
DRIVE_MOUNTED = False
if DRIVE_MOUNTED:
  drive.mount('/content/drive')
  SAVE_DIR = "/content/drive/MyDrive/w2v2_temporal_head"
  DATA_DIR     = "/content/drive/MyDrive/Colab_Drive_Files"   # folder with .wav or .mp3 + matching .json
else:
  SAVE_DIR = "./w2v2_temporal_head"
  DATA_DIR = "./Data"


In [2]:
!pip -q install torchcodec --index-url "https://download.pytorch.org/whl/cu126"

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.4 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━[0m [32m1.6/2.4 MB[0m [31m39.3 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.3/2.4 MB[0m [31m34.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m24.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
# imports
import os, gc, torch, json, random, math, torchaudio, joblib
import numpy as np

from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from torch import nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoFeatureExtractor, AutoConfig, Wav2Vec2Model
from sklearn.metrics import mean_absolute_error, mean_squared_error

In [4]:
os.environ["USE_TF"] = "0"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["WANDB_DISABLED"] = "true"
os.environ["TOKENIZERS_PARALLELISM"] = "false"
torch.cuda.empty_cache(); gc.collect()

print("PyTorch:", torch.__version__, "| CUDA available:", torch.cuda.is_available())


PyTorch: 2.9.0+cu126 | CUDA available: True


In [5]:
# ==== USER SETTINGS ====

TARGET_KEYS  = ["Valence_best","Arousal_best","Submissive_vs._Dominant_best", "Serious_vs._Humorous_best"]
MODEL_NAME   = "facebook/wav2vec2-base-960h"  # small & stable; upgrade later if needed
TARGET_SR    = 16_000
MAX_SECONDS  = 12.0              # keep modest; you can try more later
SEED         = 42

# Training
EPOCHS       = 50                # start small
LR           = 1e-3             # higher LR since we train only a tiny head
WEIGHT_DECAY = 0.007             # overfits with current data set size
BATCH_SIZE   = 1                # keep at 1 for stability
NUM_WORKERS  = 0                # 0 = no multiprocessing (stable on Colab)
VAL_SPLIT    = 0.1              # if N>1, take ~10% for val
MAX_FILES    = None             # set an int (e.g., 200) for a smoke test; None = all

FEATURE_FILE_EXTENSION = '.npy' # encoder outputs saved as numpy tensors

random.seed(SEED); np.random.seed(SEED)
torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [6]:
# Normalization based on label ranges of different attributes
ALL_KEYS_WITH_RANGE = {
  "Valence_best": { "min": -3, "max": 3 },
  "Arousal_best": { "min": 0, "max": 4 },
  "Submissive_vs._Dominant_best": { "min": -3, "max": 3 },
  "Age_best": { "min": 0, "max": 6 },
  "Gender_best": { "min": -2, "max": 2 },
  "Serious_vs._Humorous_best": { "min": 0, "max": 4 },
  "Vulnerable_vs._Emotionally_Detached_best": { "min": 0, "max": 4 },
  "Confident_vs._Hesitant_best": { "min": 0, "max": 4 },
  "Warm_vs._Cold_best": { "min": -2, "max": 2 },
  "Monotone_vs._Expressive_best": { "min": 0, "max": 4 },
  "High-Pitched_vs._Low-Pitched_best": { "min": 0, "max": 4 },
  "Soft_vs._Harsh_best": { "min": -2, "max": 2 },
  "Authenticity_best": { "min": 0, "max": 4 },
  "Recording_Quality_best": { "min": 0, "max": 4 },
  "Background_Noise_best": { "min": 0, "max": 3 }
}

def normalize_range(value, attribute):
    min_value = ALL_KEYS_WITH_RANGE[attribute]["min"]
    max_value = ALL_KEYS_WITH_RANGE[attribute]["max"]
    return (value - min_value) / (max_value - min_value)
def denormalize_range(value, attribute):
    min_value = ALL_KEYS_WITH_RANGE[attribute]["min"]
    max_value = ALL_KEYS_WITH_RANGE[attribute]["max"]
    return value * (max_value - min_value) + min_value

In [7]:
global num_mp3s_encoded
global reused_encodings
global presaved_encoding_found
num_mp3s_encoded = 0
reused_encodings = 0
presaved_encoding_found = 0

# Audio and emotions will be lazy initialized, and also store their output of the base encoder

# Assuming normalize_range, load_first_n_seconds, TARGET_KEYS, TARGET_SR, MAX_SECONDS, fe, device are defined in the global scope

class LazyAudioData:
    def __init__(self, audio_path: str, json_path: str, existing_feature_path: str):
        self._audio_path = audio_path
        self._json_path = json_path
        self._precalculated_feature_file = existing_feature_path
        self._emotions = None  # To store cached emotions
        self._wav = None       # To store cached waveform
        self._encoded_features = None # To store cached encoded features

    @property
    def emotions(self) -> List[float]:

        if self._emotions is None:
            # Load and normalize emotions from JSON using global TARGET_KEYS and normalize_range
            try:
                emo = json.loads(Path(self._json_path).read_text(encoding="utf-8")).get("emotion_annotation", {})
                labels = [normalize_range(float(emo[k]), k) for k in TARGET_KEYS]
                if not all(np.isfinite(labels)):
                    raise ValueError("Non-finite labels found after normalization")
                if isinstance(labels, torch.Tensor):
                    labels = labels.unsqueeze(0)          # (1, D)
                else:
                    labels = torch.tensor(labels, dtype=torch.float32).unsqueeze(0)
                labels = labels.to(device)
                self._emotions = labels
            except Exception as e:
                print(f"Warning: Error loading emotions from {self._json_path}: {e}")
                self._emotions = [] # Return empty list on error
        return self._emotions

    @property
    def wav(self) -> torch.Tensor:
        if self._wav is None:
            # Load waveform using the existing global function and parameters
            try:
                self._wav = load_first_n_seconds(self._audio_path, TARGET_SR, MAX_SECONDS)
            except Exception as e:
                print(f"Warning: Error loading audio from {self._audio_path}: {e}")
                self._wav = torch.empty(0) # Return empty tensor on error
        return self._wav

    @property
    def encoded_features(self):
        global num_mp3s_encoded
        global reused_encodings
        global presaved_encoding_found
        if self._encoded_features is None:
            # check if it has been computed and saved before
            precalculated_feature = None

            # was slow as implemented
            if self._precalculated_feature_file is not None:
                try:
                  arr = np.load(Path(self._precalculated_feature_file))
                  self._encoded_features = torch.from_numpy(arr).cuda()             # CPU tensor
                  presaved_encoding_found += 1
                  if presaved_encoding_found % 100 == 0:
                    print(f"Found {presaved_encoding_found} presaved encodings")
                  return self._encoded_features
                except Exception as e:
                  print(f"Warning: Error loading precalculated features from {self._precalculated_feature_file}: {e}")

            # Ensure wav is loaded first
            if self._wav is None:
                _ = self.wav # Trigger wav loading
            if self._wav is not None and self._wav.numel() > 0:
                # Process wav through feature extractor (global `fe`)
                # Feat should be moved to device for consistency with batch_to_inputs
                feat = fe(self._wav, sampling_rate=TARGET_SR, return_tensors="pt", padding="do_not_pad")

                # THESE ARE ENCODER INPUTS, NOT FEATURES
                inputs = {k: v.to(device) for k, v in feat.items()}

                with torch.no_grad():
                    out = encoder(input_values=inputs["input_values"])
                    self._encoded_features = out.last_hidden_state  # (B, T', d_model)

                    # Save the encoded features to a file for future runs
                    feature_path = Path(self._audio_path).with_suffix(FEATURE_FILE_EXTENSION)
                    np.save(feature_path, self._encoded_features.cpu().numpy())
                # dont keep the large wavs in ram
                del self._wav
                self._wav = None
                num_mp3s_encoded += 1
                if num_mp3s_encoded % 100 == 0:
                    print(f"Processed {num_mp3s_encoded} mp3s")
        else:
          reused_encodings += 1
          if reused_encodings == 1:
            print(f"Reused an encoding!")

        return self._encoded_features

print("LazyAudioData class defined.")

LazyAudioData class defined.


In [8]:
def collect_pairs(data_dir: str, target_keys: List[str], limit: Optional[int]=None):
    root = Path(data_dir)
    files = sorted(root.rglob("*.mp3"))
    items = []
    for mp3 in files:
        j = mp3.with_suffix(".json")
        if not j.exists():
            continue
        precalculated_feature_file = mp3.with_suffix(FEATURE_FILE_EXTENSION)
        if not precalculated_feature_file.exists():
            calculatedFile = None
        else:
            calculatedFile = str(precalculated_feature_file)

        items.append(LazyAudioData(str(mp3), str(j), calculatedFile))
    if not items:
        raise RuntimeError("No usable (audio,json) pairs found.")
    return items

items = collect_pairs(DATA_DIR, TARGET_KEYS, limit=MAX_FILES)
random.shuffle(items)

# Split
n = len(items)
if n == 1:
    train_items, val_items = items, []
else:
    n_val = min(max(1, int(n * VAL_SPLIT)), n-1)
    val_items, train_items = items[:n_val], items[n_val:]

print(f"pairs total={n}  train={len(train_items)}  val={len(val_items)}")

pairs total=23  train=21  val=2


In [9]:
MAX_LEN = int(TARGET_SR * MAX_SECONDS)
_resamplers: Dict[Tuple[int,int], torchaudio.transforms.Resample] = {}

def load_first_n_seconds(path: str, target_sr: int, max_seconds: float) -> torch.Tensor:
    # infer original SR without decoding full file
    try:
        info = torchaudio.info(path)
        orig_sr = info.sample_rate
    except Exception:
        _, orig_sr = torchaudio.load(path, frame_offset=0, num_frames=1024)
    frames = int(orig_sr * max_seconds)

    # read only that window
    wav, sr = torchaudio.load(path, frame_offset=0, num_frames=frames)  # (C, T<=frames)

    # mono
    if wav.shape[0] > 1:
        wav = wav.mean(0, keepdim=True)
    # resample minimal window
    if sr != target_sr:
        key = (sr, target_sr)
        if key not in _resamplers:
            _resamplers[key] = torchaudio.transforms.Resample(sr, target_sr)
        wav = _resamplers[key](wav)
    wav = wav.squeeze(0)

    # truncate/pad to EXACT MAX_LEN (so we can use padding="do_not_pad")
    if wav.numel() > MAX_LEN:
        wav = wav[:MAX_LEN]
    if wav.numel() < MAX_LEN:
        wav = torch.nn.functional.pad(wav, (0, MAX_LEN - wav.numel()))

    # peak normalize
    wav = wav / (wav.abs().max() + 1e-9)
    return wav


In [10]:
class PathDataset(Dataset):
    def __init__(self, items: List[Dict[str,Any]]):
        self.items = items
    def __len__(self): return len(self.items)
    def __getitem__(self, idx):
        ex = self.items[idx]
        return {"encodedFeatures": ex.encoded_features, "labels": ex.emotions}

train_ds = PathDataset(train_items)
val_ds   = PathDataset(val_items) if len(val_items) else None

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=NUM_WORKERS, pin_memory=False, drop_last=False)
val_loader   = DataLoader(val_ds, batch_size=1, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=False, drop_last=False) if val_ds else None

print("Loaders ready.")

Loaders ready.


In [24]:
# Feature extractor & encoder (frozen)
fe = AutoFeatureExtractor.from_pretrained(MODEL_NAME, sampling_rate=TARGET_SR)
enc_cfg = AutoConfig.from_pretrained(MODEL_NAME, output_hidden_states=False)
encoder = Wav2Vec2Model.from_pretrained(MODEL_NAME, config=enc_cfg).to(device)
encoder.eval()
for p in encoder.parameters():
    p.requires_grad = False

# Tiny temporal head: GRU + attention pooling -> 3 outputs
class TemporalHead(nn.Module):
    def __init__(self, d_model=768, hidden=128, out_dim=len(TARGET_KEYS)):
        super().__init__()
        self.gru = nn.GRU(d_model, hidden, num_layers=1, batch_first=True, bidirectional=True)
        self.att = nn.Sequential(
            nn.Linear(2*hidden, hidden), nn.Tanh(),
            nn.Linear(hidden, 1)
        )
        self.out = nn.Sequential(
            nn.LayerNorm(2*hidden),
            nn.Linear(2*hidden, out_dim)
        )
    def forward(self, hs):                 # hs: (B, T', d_model)
        z, _ = self.gru(hs)                # (B, T', 2H)
        a = self.att(z).squeeze(-1)        # (B, T')
        w = torch.softmax(a, dim=1).unsqueeze(-1)
        pooled = (w * z).sum(dim=1)        # (B, 2H)
        return self.out(pooled)            # (B, out_dim)

num_labels = len(TARGET_KEYS)
head = TemporalHead(d_model=encoder.config.hidden_size, hidden=128, out_dim=num_labels).to(device)

# Only head is trainable
opt = torch.optim.AdamW(head.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
mse = nn.MSELoss()
scaler = torch.cuda.amp.GradScaler(enabled=(device.type=="cuda"))

print("Encoder frozen. Trainable head params:",
      sum(p.numel() for p in head.parameters() if p.requires_grad))


Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Encoder frozen. Trainable head params: 724229


  scaler = torch.cuda.amp.GradScaler(enabled=(device.type=="cuda"))


In [25]:
# run partial validation report every so often
global training_count_report # every so many training samples make and print a report
training_count_report = 410
global val_count_to_report
val_count_to_report = 400
global report_num
report_num = 0
def run_val_report(numsamples=val_count_to_report, mini_train_loss=None):
  global report_num
  report_num += 1
  with torch.no_grad():
    va_loss = run_epoch(val_loader, train=False, limit_samples=numsamples)
    print(f"report {report_num} | val MSE={va_loss:.4f}")
    if mini_train_loss is not None:
      print(f"report {report_num} | train MSE={mini_train_loss/training_count_report:.4f}")


In [26]:
def run_epoch(loader, train=True, limit_samples=None):
    if train:
        head.train()
    else:
        head.eval()
    total_loss = 0.0
    mini_report_loss = 0.0
    sample_num = 0
    # batch is only one sample
    for batch in loader:
        sample_num += 1
        if limit_samples is not None and sample_num > limit_samples:
            break
        # intermediate reports can be run when data set is large and slow
        # if sample_num % training_count_report == 0:
        #   run_val_report(mini_train_loss=mini_report_loss)
        #   mini_report_loss = 0
        #   if train: # Ensure head is back in train mode after validation report
        #     head.train()

        hs, labels = batch["encodedFeatures"].squeeze(0), batch["labels"]
        with torch.amp.autocast('cuda', enabled=(device.type=="cuda")):
            preds = head(hs)
            # average of the 4 emotion's square errors
            loss = mse(preds, labels)

        if train:
            opt.zero_grad(set_to_none=True)
            scaler.scale(loss).backward()
            scaler.step(opt)
            scaler.update()
        cLoss = loss.item() * labels.size(0)
        total_loss += cLoss
        mini_report_loss += cLoss

    return total_loss / max(1, sample_num)
with torch.no_grad():
     va_loss = run_epoch(val_loader, train=False)
     print(f"Epoch {0}/{EPOCHS} | val MSE={va_loss:.4f}")
for epoch in range(1, EPOCHS+1):
    tr_loss = run_epoch(train_loader, train=True)
    if val_loader:
        with torch.no_grad():
            va_loss = run_epoch(val_loader, train=False)
        print(f"Epoch {epoch}/{EPOCHS} | train MSE={tr_loss:.4f} | val MSE={va_loss:.4f}")
        if va_loss < 0.0041:
            break
    else:
        print(f"Epoch {epoch}/{EPOCHS} | train MSE={tr_loss:.4f}")

  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 0/1000 | val MSE=0.1127
Reused an encoding!
Epoch 1/1000 | train MSE=0.4230 | val MSE=0.0501
Epoch 2/1000 | train MSE=0.0467 | val MSE=0.0044
Epoch 3/1000 | train MSE=0.0129 | val MSE=0.0231
Epoch 4/1000 | train MSE=0.0141 | val MSE=0.0048
Epoch 5/1000 | train MSE=0.0092 | val MSE=0.0442
Epoch 6/1000 | train MSE=0.0096 | val MSE=0.0065
Epoch 7/1000 | train MSE=0.0046 | val MSE=0.0062
Epoch 8/1000 | train MSE=0.0043 | val MSE=0.0048
Epoch 9/1000 | train MSE=0.0044 | val MSE=0.0113
Epoch 10/1000 | train MSE=0.0035 | val MSE=0.0084
Epoch 11/1000 | train MSE=0.0022 | val MSE=0.0052
Epoch 12/1000 | train MSE=0.0015 | val MSE=0.0037


In [27]:
print(num_mp3s_encoded)
print(reused_encodings)
print(presaved_encoding_found)

19
210
0


In [18]:
def ccc(y_true, y_pred):
    y = np.asarray(y_true, np.float64)
    x = np.asarray(y_pred, np.float64)
    vx, vy = x.var(), y.var()
    mx, my = x.mean(), y.mean()
    cov = ((x - mx) * (y - my)).mean()
    denom = vx + vy + (mx - my)**2
    return float(2 * cov / denom) if denom > 0 else 0.0

def evaluate_full(loader):
    y_true, y_pred = [], []
    head.eval()
    cnt = 0
    with torch.no_grad():
        for batch in loader:
            hs, labels = batch["encodedFeatures"].squeeze(0), batch["labels"]
            preds = head(hs)
            y_true.append(labels.detach().cpu().numpy())
            y_pred.append(preds.detach().cpu().numpy())
            torch.cuda.empty_cache()
    y_true = np.array(y_true).squeeze(1)
    y_pred = np.array(y_pred)

    true_sq = np.array(y_true).squeeze()
    pred_sq = np.array(y_pred).squeeze()

    mean_true = true_sq.mean(axis=0)
    mean_pred = pred_sq.mean(axis=0)
    std_true = true_sq.std(axis=0)
    std_pred = pred_sq.std(axis=0)
    print("Keys:            ", TARGET_KEYS)
    print("Validation mean: ", mean_true)
    print("Predicted mean:  ", mean_pred)
    print("Validation std: ", std_true)
    print("Predicted std:  ", std_pred)

    Y = np.concatenate(y_true, axis=0)
    P = np.concatenate(y_pred, axis=0)

    mae = mean_absolute_error(Y, P, multioutput="raw_values")
    mse = mean_squared_error(Y, P, multioutput="raw_values")
    metrics = {
        "MAE_macro": float(mae.mean()),
        "MSE_macro": float(mse.mean()),
    }
    for i,k in enumerate(TARGET_KEYS):
        metrics[f"MAE_{k}"] = float(mae[i])
        metrics[f"MSE_{k}"] = float(mse[i])
        metrics[f"CCC_{k}"] = ccc(Y[:,i], P[:,i])
    return metrics

if val_loader and len(val_ds) > 0:
    metrics = evaluate_full(val_loader)
    print("Validation metrics:")
    for k,v in metrics.items():
        print(f"  {k}: {v:.4f}")
else:
    print("No validation split; skipped metrics.")

if train_loader and len(train_ds) > 0:
    metrics = evaluate_full(train_loader)
    print("Training metrics:")
    for k,v in metrics.items():
        print(f"  {k}: {v:.4f}")
else:
    print("No training split; skipped metrics.")


Reused an encoding!
Keys:             ['Valence_best', 'Arousal_best', 'Submissive_vs._Dominant_best', 'Serious_vs._Humorous_best']
Validation mean:  [0.4989522  0.3203125  0.53027344 0.11364746]
Predicted mean:   [0.49259228 0.33597976 0.5918721  0.21141511]
Validation std:  [0.00186156 0.0625     0.04329427 0.06506348]
Predicted std:   [0.0148968  0.0493543  0.03088683 0.00143539]
Validation metrics:
  MAE_macro: 0.0742
  MSE_macro: 0.0090
  MAE_Valence_best: 0.0130
  MSE_Valence_best: 0.0002
  CCC_Valence_best: 0.2086
  MAE_Arousal_best: 0.1119
  MSE_Arousal_best: 0.0128
  CCC_Arousal_best: -0.9365
  MAE_Submissive_vs._Dominant_best: 0.0742
  MSE_Submissive_vs._Dominant_best: 0.0093
  CCC_Submissive_vs._Dominant_best: -0.4038
  MAE_Serious_vs._Humorous_best: 0.0978
  MSE_Serious_vs._Humorous_best: 0.0136
  CCC_Serious_vs._Humorous_best: 0.0135
Keys:             ['Valence_best', 'Arousal_best', 'Submissive_vs._Dominant_best', 'Serious_vs._Humorous_best']
Validation mean:  [0.5003225 

In [29]:
Path(SAVE_DIR).mkdir(parents=True, exist_ok=True)

# Save torch head
torch.save(head.state_dict(), f"{SAVE_DIR}/temporal_head.pt")
# Save config to rebuild pipeline later
json.dump({
    "model_name": MODEL_NAME,
    "target_sr": TARGET_SR,
    "max_seconds": MAX_SECONDS,
    "target_keys": TARGET_KEYS,
    "head": {"d_model": int(encoder.config.hidden_size), "hidden": 128, "out_dim": len(TARGET_KEYS)}
}, open(f"{SAVE_DIR}/config.json","w"))

print("Saved to", SAVE_DIR)


Saved to ./w2v2_temporal_head


In [14]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1) Load config
cfg_path = f"{SAVE_DIR}/config.json"
state_path = f"{SAVE_DIR}/temporal_head.pt"

with open(cfg_path, "r") as f:
    cfg = json.load(f)

MODEL_NAME  = cfg["model_name"]
TARGET_SR   = cfg["target_sr"]
MAX_SECONDS = cfg["max_seconds"]
TARGET_KEYS = cfg["target_keys"]
head_cfg    = cfg["head"]


In [15]:
# Reload frozen base encoder
fe = AutoFeatureExtractor.from_pretrained(MODEL_NAME, sampling_rate=TARGET_SR)
enc_cfg = AutoConfig.from_pretrained(MODEL_NAME, output_hidden_states=False)
encoder = Wav2Vec2Model.from_pretrained(MODEL_NAME, config=enc_cfg).to(device)
encoder.eval()
for p in encoder.parameters():
    p.requires_grad = False

# Recreate the TemporalHead with same dimensions from config
class TemporalHead(nn.Module):
    def __init__(self, d_model=768, hidden=128, out_dim=len(TARGET_KEYS)):
        super().__init__()
        self.gru = nn.GRU(d_model, hidden, num_layers=1, batch_first=True, bidirectional=True)
        self.att = nn.Sequential(
            nn.Linear(2*hidden, hidden), nn.Tanh(),
            nn.Linear(hidden, 1)
        )
        self.out = nn.Sequential(
            nn.LayerNorm(2*hidden),
            nn.Linear(2*hidden, out_dim)
        )
    def forward(self, hs):                 # hs: (B, T', d_model)
        z, _ = self.gru(hs)                # (B, T', 2H)
        a = self.att(z).squeeze(-1)        # (B, T')
        w = torch.softmax(a, dim=1).unsqueeze(-1)
        pooled = (w * z).sum(dim=1)        # (B, 2H)
        return self.out(pooled)            # (B, out_dim)


head = TemporalHead(
    d_model=head_cfg["d_model"],
    hidden=head_cfg["hidden"],
    out_dim=head_cfg["out_dim"],
).to(device)

# Load trained weights
state = torch.load(state_path, map_location=device)
head.load_state_dict(state)
head.eval()

print("Model reloaded and ready!")


Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model reloaded and ready!


In [32]:
def predict_attributes(audio_path: str):
    # 1) Load & preprocess audio
    wav = load_first_n_seconds(audio_path, TARGET_SR, MAX_SECONDS)  # (T,)
    wav = wav.to(device)

    # 2) Feature extraction
    with torch.no_grad():
        inputs = fe(wav, sampling_rate=TARGET_SR, return_tensors="pt")
        inputs = {k: v.to(device) for k, v in inputs.items()}

        # 3) Encoder
        hs = encoder(**inputs).last_hidden_state  # (1, T', d_model)

        # 4) Temporal head
        preds = head(hs)  # (1, num_targets)
        preds = preds.squeeze(0).cpu().numpy().tolist()
    return dict(zip(TARGET_KEYS, preds))

# test some examples, including my own voice:
result = predict_attributes("/content/drive/MyDrive/meHappy.mp3")
print(result)


RuntimeError: Failed to create AudioDecoder for /content/drive/MyDrive/meLaugh.mp3: Could not open input file: /content/drive/MyDrive/meLaugh.mp3 No such file or directory

In [None]:
# prints the actual emotion in its expected range
print(denormalize_range(result["Valence_best"],"Valence_best"))
print(denormalize_range(result["Arousal_best"],'Arousal_best'))
print(denormalize_range( result["Submissive_vs._Dominant_best"],'Submissive_vs._Dominant_best'))
print(denormalize_range(result["Serious_vs._Humorous_best"],'Serious_vs._Humorous_best'))