In [None]:
!pip -q install torch torchaudio
!pip -q install scikit-learn matplotlib tqdm

In [None]:
import os, random, math, time
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm

from sklearn.metrics import confusion_matrix, classification_report, f1_score
import matplotlib.pyplot as plt

In [None]:
import os, random, math, time
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm
from sklearn.metrics import confusion_matrix, classification_report, f1_score, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
from torchaudio.datasets import SPEECHCOMMANDS

root = "./data_speechcommands"
os.makedirs(root, exist_ok=True)

In [None]:
!pip -q install torchcodec

In [None]:
TARGET_LABELS = ["up", "down", "left", "right"]
label_to_idx = {l: i for i, l in enumerate(TARGET_LABELS)}
idx_to_label = {i: l for l, i in label_to_idx.items()}

train_base = SPEECHCOMMANDS(root, subset="training", download=True)
val_base   = SPEECHCOMMANDS(root, subset="validation", download=True)
test_base  = SPEECHCOMMANDS(root, subset="testing", download=True)

#Filtering

In [None]:
class FilteredSpeechCommands(Dataset):
    def __init__(self, base_ds, allowed_labels, max_items=None, seed=42):
        self.allowed = set(allowed_labels)
        self.idxs = [i for i in range(len(base_ds)) if base_ds[i][2] in self.allowed]
        if max_items is not None and len(self.idxs) > max_items:
            rng = np.random.default_rng(seed)
            self.idxs = rng.choice(self.idxs, size=max_items, replace=False).tolist()

        self.base = base_ds

    def __len__(self):
        return len(self.idxs)

    def __getitem__(self, i):
        return self.base[self.idxs[i]]

train_set = FilteredSpeechCommands(train_base, TARGET_LABELS, max_items=3000)
val_set   = FilteredSpeechCommands(val_base,   TARGET_LABELS, max_items=3000)
test_set  = FilteredSpeechCommands(test_base,  TARGET_LABELS, max_items=3000)

print("Using labels:", TARGET_LABELS)
print("Train/Val/Test sizes:", len(train_set), len(val_set), len(test_set))

#Creating Spectogram Data

In [None]:
TARGET_SR = 16000
TARGET_LEN = 16000

def pad_or_crop_1d(x, target_len=TARGET_LEN):
    T = x.shape[-1]
    if T == target_len:
        return x
    if T < target_len:
        return F.pad(x, (0, target_len - T))
    return x[:target_len]

N_FFT = 1024
HOP = 512
WIN = 1024

spectrogram_tf=torchaudio.transforms.Spectrogram(n_fft=N_FFT,
                                                 hop_length=HOP,
                                                 win_length=WIN,
                                                 power=2.0,
                                                 center=True)


def make_stft_features(wave_1d):
    """
    wave_1d: (T,) float tensor
    returns: (1, F, TT) log-power spectrogram
    """
    spec = spectrogram_tf(wave_1d.unsqueeze(0))  # (1, F, TT)
    spec = torch.log1p(spec)
    return spec

In [None]:
class SpecDataset(Dataset):
    def __init__(self, base_ds, label_to_idx):
        self.base = base_ds
        self.label_to_idx = label_to_idx
        self.resamplers = {}

    def _resample(self, wav, sr):
        if sr == TARGET_SR:
            return wav
        if sr not in self.resamplers:
            self.resamplers[sr] = torchaudio.transforms.Resample(sr, TARGET_SR)
        return self.resamplers[sr](wav)

    def __len__(self):
        return len(self.base)

    def __getitem__(self, idx):
        wav, sr, label, *_ = self.base[idx]  # wav: (1, T)
        wav = self._resample(wav, sr)[0]     # (T,)



        wav = (wav - wav.mean()) / (wav.std() + 1e-6)

        # fixed length
        wav = pad_or_crop_1d(wav, TARGET_LEN)

        # STFT
        spec = make_stft_features(wav)       # (1, F, TT)

        y = self.label_to_idx[label]         # 0..3
        return wav.float(), spec.float(), torch.tensor(y, dtype=torch.long)

train_ds = SpecDataset(train_set, label_to_idx)
val_ds   = SpecDataset(val_set,  label_to_idx)
test_ds  = SpecDataset(test_set,  label_to_idx)

x_raw, x_spec, y = train_ds[0]
print("raw:", x_raw.shape, "spec:", x_spec.shape, "y:", y.item(), idx_to_label[y.item()])

In [None]:
BATCH = 128
NUM_WORKERS = 0

train_loader = DataLoader(train_ds, batch_size=BATCH, shuffle=True,
                          num_workers=NUM_WORKERS, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=BATCH, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=True)
test_loader  = DataLoader(test_ds, batch_size=BATCH, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=True)

In [None]:
num_classes = 4

In [None]:
class RawOnlyModel(nn.Module):
    def __init__(self, in_ch=1, n_classes=num_classes):
        super().__init__()

        self.block1 = nn.Sequential(
            nn.Conv1d(in_ch, 64, kernel_size=80, stride=4),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(4),
        )

        self.block2 = nn.Sequential(
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.MaxPool1d(4),
        )

        self.block3 = nn.Sequential(
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.MaxPool1d(4),
        )

        self.block4 = nn.Sequential(
            nn.Conv1d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),
        )
        self.fc = nn.Linear(512, n_classes)

    def forward(self, raw, spec):
        x = raw
        if x.dim() == 2:
            x = x.unsqueeze(1)

        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = x.flatten(1)
        return self.fc(x)

raw_model = RawOnlyModel(n_classes=num_classes).to(device)
print("Raw Model:")
print(raw_model)

In [None]:
class SpecOnlyModel(nn.Module):
    def __init__(self, in_ch=1, n_classes=num_classes):
        super().__init__()

        self.block1 = nn.Sequential(
            nn.Conv2d(in_ch, 32, kernel_size=5, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        self.block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=5, padding=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        self.block3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        self.block_custom = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=2),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        self.block4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1)),
        )
        self.fc = nn.Linear(512, n_classes)

    def forward(self, raw, spec):

        x = spec
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block_custom(x)
        x = self.block4(x)
        x = x.flatten(1)
        return self.fc(x)

spec_model = SpecOnlyModel(n_classes=num_classes).to(device)
print("Updated Spec Model with Batch Normalization:")
print(spec_model)

In [None]:
class TwoStreamFusionModel(nn.Module):
    def __init__(self, raw_model, spec_model, num_classes, embedding_dim=128):
        super(TwoStreamFusionModel, self).__init__()
        self.raw_features = nn.Sequential(
            raw_model.block1,
            raw_model.block2,
            raw_model.block3,
            raw_model.block4,
            nn.Flatten()
        )

        self.raw_proj = nn.Linear(512, embedding_dim)
        self.spec_features = nn.Sequential(
            spec_model.block1,
            spec_model.block2,
            spec_model.block3,
            spec_model.block_custom,
            spec_model.block4,
            nn.Flatten()
        )

        self.spec_proj = nn.Linear(512, embedding_dim)
        self.fusion_head = nn.Sequential(
            nn.Linear(embedding_dim * 2, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, num_classes)
        )

    def forward(self, raw, spec):
        if raw.dim() == 2:
            raw = raw.unsqueeze(1)
        raw_feat = self.raw_features(raw)
        zr = self.raw_proj(raw_feat)
        spec_feat = self.spec_features(spec)
        zs = self.spec_proj(spec_feat)
        z = torch.cat([zr, zs], dim=1)
        logits = self.fusion_head(z)
        return logits


raw_model_instance = RawOnlyModel(n_classes=4).to(device)
spec_model_instance = SpecOnlyModel(n_classes=4).to(device)
fusion_model = TwoStreamFusionModel(
    raw_model=raw_model_instance,
    spec_model=spec_model_instance,
    num_classes=4,
    embedding_dim=128
).to(device)

print(fusion_model)

In [None]:
import numpy as np
from tqdm.auto import tqdm
from sklearn.metrics import f1_score
import torch.nn.functional as F

@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    all_preds, all_y = [], []
    total_loss, total = 0.0, 0

    for raw, spec, y in loader:
        raw, spec, y = raw.to(device), spec.to(device), y.to(device)
        logits = model(raw, spec)
        loss = F.cross_entropy(logits, y)

        total_loss += loss.item() * y.size(0)
        total += y.size(0)

        preds = logits.argmax(dim=1)
        all_preds.append(preds.cpu().numpy())
        all_y.append(y.cpu().numpy())

    all_preds = np.concatenate(all_preds)
    all_y = np.concatenate(all_y)

    acc = (all_preds == all_y).mean()
    macro_f1 = f1_score(all_y, all_preds, average="macro")
    return total_loss/total, acc, macro_f1, all_y, all_preds

def train_model(model, train_loader, val_loader, epochs=15, lr=1e-3, model_name="Model"):
    model.to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=0.0001)
    best_val_f1 = -1
    best_state = None
    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': []
    }

    for ep in range(1, epochs + 1):
        model.train()
        running_loss, correct, total = 0.0, 0, 0

        for raw, spec, y in tqdm(train_loader, desc=f"{model_name} Epoch {ep}/{epochs}"):
            raw, spec, y = raw.to(device), spec.to(device), y.to(device)

            opt.zero_grad()
            logits = model(raw, spec)
            loss = F.cross_entropy(logits, y)
            loss.backward()
            opt.step()

            running_loss += loss.item() * y.size(0)
            preds = logits.argmax(dim=1)
            correct += (preds == y).sum().item()
            total += y.size(0)

        train_loss = running_loss / total
        train_acc = correct / total
        val_loss, val_acc, val_f1, _, _ = evaluate(model, val_loader)
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        print(f"Epoch {ep}: train_loss={train_loss:.4f} | val_loss={val_loss:.4f} | val_acc={val_acc:.4f} | val_macroF1={val_f1:.4f}")
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}

    if best_state:
        model.load_state_dict(best_state)

    return model, history

#Raw Audio Model

In [None]:
# Parameters
LR = 1e-3
TARGET_LABELS = ['left', 'right', 'up', 'down']

# Raw Audio Model
raw_model, raw_hist = train_model(RawOnlyModel(n_classes=4), train_loader, val_loader, 15, LR, "RawOnly")

# Spectogram and Feature fusion model

In [None]:
# Parameters
EPOCHS = 15
LR = 1e-3
TARGET_LABELS = ['left', 'right', 'up', 'down']

# Spec Model
spec_model, spec_hist = train_model(SpecOnlyModel(n_classes=4), train_loader, val_loader, EPOCHS, LR, "SpecOnly")

# Fusion model
fusion_model, fusion_hist = train_model(TwoStreamFusionModel(raw_model, spec_model, 4), train_loader, val_loader, EPOCHS, LR, "Fusion")

#Evaluation and Reports

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import ConfusionMatrixDisplay

def plot_training_history(history, model_name):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    ax1.plot(history['train_loss'], label='Train')
    ax1.plot(history['val_loss'], label='Val')
    ax1.set_title(f'{model_name} Loss')
    ax1.legend()

    ax2.plot(history['train_acc'], label='Train')
    ax2.plot(history['val_acc'], label='Val')
    ax2.set_title(f'{model_name} Accuracy')
    ax2.legend()
    plt.show()

def run_final_evaluation(models_dict, test_loader, target_names):
    results = []

    for name, model in models_dict.items():
        loss, acc, f1, y_true, y_pred = evaluate(model, test_loader)
        results.append({'Model': name, 'Accuracy': acc, 'Macro F1': f1})

        print(f"\n--- {name} Classification Report ---")
        print(classification_report(y_true, y_pred, target_names=target_names))

        # Confusion Matrix
        fig, ax = plt.subplots(figsize=(5, 5))
        ConfusionMatrixDisplay.from_predictions(y_true, y_pred, display_labels=target_names, cmap='Blues', ax=ax)
        ax.set_title(f"Confusion Matrix: {name}")
        plt.show()

    return results

def plot_comparison_histogram(results):
    models = [r['Model'] for r in results]
    accs = [r['Accuracy'] for r in results]
    f1s = [r['Macro F1'] for r in results]

    x = np.arange(len(models))
    width = 0.35

    fig, ax = plt.subplots(figsize=(10, 6))
    ax.bar(x - width/2, accs, width, label='Accuracy', color='skyblue')
    ax.bar(x + width/2, f1s, width, label='Macro F1', color='salmon')

    ax.set_ylabel('Scores')
    ax.set_title('Performance Comparison of All Three Models')
    ax.set_xticks(x)
    ax.set_xticklabels(models)
    ax.legend()
    plt.show()

In [None]:

plot_training_history(raw_hist, "RawOnly")
plot_training_history(spec_hist, "SpecOnly")
plot_training_history(fusion_hist, "TwoStreamFusion")

In [None]:
comparison_results = run_final_evaluation(
    {"Raw Only": raw_model, "Spec Only": spec_model, "Two-Stream Fusion": fusion_model},
    test_loader,
    TARGET_LABELS
)

In [None]:
def verify_fusion_logic(model, loader, device):
    model.eval()
    raw, spec, y = next(iter(loader))

    raw = raw.to(device)
    spec = spec.to(device)
    print("\nInput Data Shapes")
    print(f"Raw Signal Batch:   {raw.shape}  (Expected: [Batch, 16000])")
    print(f"Spectrogram Batch:  {spec.shape} (Expected: [Batch, 1, Freq, Time])")

    with torch.no_grad():
        x_raw_input = raw.unsqueeze(1) if raw.dim() == 2 else raw
        raw_feat = model.raw_features(x_raw_input)
        zr = model.raw_proj(raw_feat)
        spec_feat = model.spec_features(spec)
        zs = model.spec_proj(spec_feat)
        print(f"Raw Embedding (zr):  {zr.shape} (Expected: [Batch, emb_dim])")
        print(f"Spec Embedding (zs): {zs.shape} (Expected: [Batch, emb_dim])")
        z = torch.cat([zr, zs], dim=1)
        print(f"Concatenated (z):    {z.shape} (Expected: [Batch, 2 * emb_dim]) ")
        logits = model.fusion_head(z)
        print(f"Final Logits:        {logits.shape} (Expected: [Batch, Num_Classes])")
    input_match = (zr.shape[1] == zs.shape[1])
    concat_match = (z.shape[1] == zr.shape[1] + zs.shape[1])

verify_fusion_logic(fusion_model, train_loader, device)