Configurations

In [None]:
SAMPLE_RATE = 16000
DURATION = 3
AUDIO_DIR_PATH = "./data/train" 
HOP_LENGTH = 512
LABELS = 3 # my data only consist of 3 different speaker, change to 200 here

Load audio and load it into raw waveforms segments

In [34]:
import os
import librosa

segments = []
for audio_file_name in os.listdir(AUDIO_DIR_PATH):    
    waveform, sr = librosa.load(os.path.join(AUDIO_DIR_PATH, audio_file_name), sr=SAMPLE_RATE)
    total_samples = DURATION * SAMPLE_RATE 
    for i in range(0, len(waveform) - total_samples + 1, total_samples):
        segments.append(waveform[i:i + total_samples]) # last incomplete segment will be excluded

In [35]:
len(segments)

7

Prosodic Features

In [36]:
import numpy as np

def get_prosodic_features(segment): 
    # Each row = 1 time frame
    # hop_duration_ms = (512/ 16000) * 1000 = 32ms
    # Each column = 1 feature (pitch, energy)

    pitch = librosa.yin(segment, fmin=50, fmax=300, sr=sr, hop_length=HOP_LENGTH)
    energy = librosa.feature.rms(y=segment, hop_length=HOP_LENGTH)[0]

    # Frame-align pitch & energy (zero-pad to same length)
    length = min(len(pitch), len(energy))
    pitch = pitch[:length]
    energy = energy[:length]

    # Combine frame-wise
    features = np.stack([pitch, energy], axis=1)  # [T x 2]

    return features

In [37]:
import random
from scipy.ndimage import zoom

def augment_features(features, method):
    if method == 'mask':
        # Random time masking
        features = features.copy()
        T = len(features)
        start = random.randint(0, T // 2)
        end = min(T, start + random.randint(5, 15))
        features[start:end] = 0
    elif method == 'noise':
        # simulates natural variability in how people speak or in how features are extracted — just like:
        # Slight jitter in pitch
        # Minor fluctuations in energy
        # Recording imperfections or mic sensitivity
        # Subtle variation in speaking effort or environment
        features += np.random.normal(0, 0.1, features.shape)
    elif method == 'warp':
        # Time-warp (stretch or shrink)
        scale = random.uniform(0.8, 1.2)
        features = zoom(features, [scale, 1], order=1)
    return features

Model Loading

In [38]:
import torch
import torch.nn as nn

class ProsodyRNN(nn.Module):
    def __init__(self, input_dim=2, hidden_dim=64, output_dim=128):
        super().__init__()
        self.rnn = nn.GRU(input_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.proj = nn.Linear(hidden_dim * 2, output_dim)

    def forward(self, x):
        _, h = self.rnn(x)  # h shape: [2, B, H]
        h = torch.cat([h[0], h[1]], dim=1)  # [B, 2H]
        return self.proj(h)  # [B, output_dim]

In [39]:
import torch.nn.functional as F

def contrastive_loss(z1, z2, temperature=0.1):
    z1 = F.normalize(z1, dim=1)
    z2 = F.normalize(z2, dim=1)
    logits = z1 @ z2.T / temperature
    labels = torch.arange(len(z1)).to(z1.device)
    return F.cross_entropy(logits, labels)

In [None]:
model = ProsodyRNN()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

for epoch in range(20):
    for segment in segments:
        f1 = get_prosodic_features(segment)
        f2 = augment_features(f1, method='noise')

        x1 = torch.tensor(f1, dtype=torch.float32).unsqueeze(0)
        x2 = torch.tensor(f2, dtype=torch.float32).unsqueeze(0)

        z1 = model(x1)
        z2 = model(x2)

        loss = contrastive_loss(z1, z2)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1} Loss: {loss.item():.4f}")

In [None]:
torch.save(model.state_dict(), './model/weights/prosody_rnn.pt')