In [1]:
from joblib import dump
import os

from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.cluster import KMeans
import torch.nn as nn
import numpy as np
import torchaudio
import torch


In [2]:
def audio_to_spectrogram(file_path: str):
    waveform, sr = torchaudio.load(file_path)

    transformer = torchaudio.transforms.MelSpectrogram(
        sample_rate=sr, n_fft=2048, hop_length=512, n_mels=64
        )

    spectrogram = transformer(waveform)
    spectrogram = torchaudio.transforms.AmplitudeToDB()(spectrogram)
    return spectrogram.squeeze(0).transpose(0, 1)


def vector_quantize(features, n_clusters: int = 100):
    kmeans = KMeans(n_clusters=n_clusters)
    all_data = np.vstack([f.numpy() for f in features])
    kmeans.fit(all_data)

    quantized_features = [
        torch.tensor(kmeans.predict(f.numpy()), dtype=torch.long)
        for f in features
        ]

    return quantized_features, kmeans


def load_and_quantize_data(
        directory: str,
        target_labels: list[str] = ["up", "down", "left", "right"],
        n_clusters: int = 100
        ) -> tuple:

    features = []
    labels = []
    for label in os.listdir(directory):
        if label in target_labels:
            class_dir = os.path.join(directory, label)
            for fname in os.listdir(class_dir):
                file_path = os.path.join(class_dir, fname)
                spectrogram = audio_to_spectrogram(file_path)
                features.append(spectrogram)
                labels.append(label)

    quantized_features, kmeans = vector_quantize(features, n_clusters)

    return quantized_features, labels, kmeans


def pad_sequences(sequences, pad_value: int = 0):
    max_len = max([s.size(0) for s in sequences])

    padded_sequences = [
        torch.nn.functional.pad(s, (0, max_len - s.size(0)), value=pad_value)
        for s in sequences
        ]

    return torch.stack(padded_sequences)


class AudioTransformer(nn.Module):
    def __init__(
            self,
            num_tokens: int, dim_model: int, num_heads: int,
            num_classes: int, dim_feedforward: int = 2048,
            num_layers: int = 1, dropout: int = 0.1
            ) -> None:

        super().__init__()
        self.embedding = nn.Embedding(num_tokens, dim_model)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=dim_model, nhead=num_heads,
            dim_feedforward=dim_feedforward, dropout=dropout
            )

        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer, num_layers=num_layers
            )

        self.fc = nn.Linear(dim_model, num_classes)

    def forward(self, src):
        src = self.embedding(src)  # Replace tokens with embeddings
        output = self.transformer_encoder(src)
        output = output.mean(dim=1)
        output = self.fc(output)
        return output


In [3]:
def eval_model(model, test_loader, criterion) -> tuple[float, float]:
    model.eval()
    test_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            labels = labels.long()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100 * correct / total
    # print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")
    return test_loss, test_accuracy


In [4]:
features, labels, kmeans = load_and_quantize_data("data/train")

dump(kmeans, "models/kmeans_model.joblib")


  super()._check_params_vs_input(X, default_n_init=10)


['models/kmeans_model.joblib']

In [5]:
features_padded = pad_sequences(features)

train_features_padded, test_features_padded, train_labels, test_labels = train_test_split(
    features_padded, labels, test_size=0.2, random_state=42, stratify=labels
)

label_encoder = LabelEncoder()
train_labels_encoded = torch.tensor(label_encoder.fit_transform(train_labels))
test_labels_encoded = torch.tensor(label_encoder.transform(test_labels))
dump(label_encoder, "models/label_encoder.joblib")

train_dataset = TensorDataset(train_features_padded, train_labels_encoded)
test_dataset = TensorDataset(test_features_padded, test_labels_encoded)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [6]:
num_tokens = 100  # Same as number of clusters

model = AudioTransformer(
    num_tokens=num_tokens,
    dim_model=256,
    num_heads=8,
    num_classes=len(np.unique(train_labels))
    )

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

loss_dict = {
    "test": [],
    "train": []
}

for epoch in range(50):
    model.train()
    loss_sum = []
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        loss_sum.append(loss.item())
    
    test_loss, test_accuracy = eval_model(model, test_loader, criterion)

    loss_ = np.mean(np.array(loss_sum))

    loss_dict["test"].append(test_loss)
    loss_dict["train"].append(loss_)

    print(f"Epoch {epoch+1}, Loss: {round(loss_, 4)} Test loss: {round(test_loss, 4)} Test acc: {round(test_accuracy, 2):4}")
    if len(loss_dict["test"]) > 1 and loss_dict["test"][-2] <= loss_dict["test"][-1]:
        break




Epoch 1, Loss: 1.1424 Test loss: 1.0072 Test acc: 59.65
Epoch 2, Loss: 0.9723 Test loss: 0.9144 Test acc: 66.26
Epoch 3, Loss: 0.9285 Test loss: 0.9607 Test acc: 63.09


In [8]:
torch.save(model.state_dict(), "models/model.pth")
