<a href="https://colab.research.google.com/github/DrJinHoChoi/NanoMamba-Interspeech2026/blob/main/colab/SmartEar_KWS_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# NanoMamba - Interspeech 2026 Full Training (GPU)

**NanoMamba: Noise-Robust KWS with SA-SSM**

| Cell | 내용 | 예상 시간 |
|:----:|------|:---------:|
| 1 | 환경설정 + GSC V2 다운로드 | ~5분 |
| 2 | **전체 학습 + 평가 한번에** | ~8-12시간 |
| 3 | 결과 다운로드 | 즉시 |

⚠️ **런타임 → 런타임 유형 변경 → GPU (T4)** 선택 필수!

In [None]:
#@title Cell 1: 환경 설정 + 데이터 다운로드
import torch
print(f"PyTorch: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
    raise RuntimeError("GPU not available! Change runtime type to GPU.")

# Clone repo
!git clone https://github.com/DrJinHoChoi/NanoMamba-Interspeech2026.git
%cd NanoMamba-Interspeech2026

# Download Google Speech Commands V2
import os
DATA_DIR = './data'
os.makedirs(DATA_DIR, exist_ok=True)

if not os.path.exists(os.path.join(DATA_DIR, 'speech_commands_v0.02')):
    print("\n Downloading Google Speech Commands V2...")
    !wget -q http://download.tensorflow.org/data/speech_commands_v0.02.tar.gz -O /tmp/gsc_v2.tar.gz
    !mkdir -p {DATA_DIR}/speech_commands_v0.02
    !tar -xzf /tmp/gsc_v2.tar.gz -C {DATA_DIR}/speech_commands_v0.02
    !rm /tmp/gsc_v2.tar.gz
    print("Download complete!")
else:
    print("Data already exists.")

# Verify
classes = [d for d in os.listdir(f'{DATA_DIR}/speech_commands_v0.02')
           if os.path.isdir(f'{DATA_DIR}/speech_commands_v0.02/{d}') and not d.startswith('_')]
print(f"\nFound {len(classes)} keyword classes")
print("Ready to train!")

In [None]:
#@title Cell 2: 전체 학습 + 노이즈 평가 (한번에 실행)
#@markdown ### 학습 모델 (9종)
#@markdown - NanoMamba-Tiny (4,634), Small (12,032)
#@markdown - BC-ResNet-1 (7,464), BC-ResNet-3 (43,200), DS-CNN-S (23,756)
#@markdown - SA-SSM Ablation: Full, dt_only, b_only, Standard
#@markdown ### 노이즈 평가
#@markdown - 3 noise types (factory, white, babble) x 7 SNR (-15~+15dB)

import subprocess, sys

ALL_MODELS = ",".join([
    # Proposed (Tiny 완료, Small부터)
    "NanoMamba-Small",
    # Baselines
    "BC-ResNet-1", "BC-ResNet-3", "DS-CNN-S",
    # SA-SSM Ablation
    "NanoMamba-Tiny-Full", "NanoMamba-Tiny-dtOnly",
    "NanoMamba-Tiny-bOnly", "NanoMamba-Tiny-Standard",
])

cmd = [
    sys.executable, "-u", "train_all_models.py",
    "--data_dir", "./data",
    "--checkpoint_dir", "./checkpoints_full",
    "--epochs", "30",
    "--batch_size", "64",
    "--seed", "42",
    "--models", ALL_MODELS,
    "--noise_types", "factory,white,babble",
    "--snr_range=-15,-10,-5,0,5,10,15", # Changed: Use = to avoid argparse confusion with negative numbers
    "--per_class",
]

print(f"Running: {' '.join(cmd)}\n")

# Capture and stream output to see errors
process = subprocess.Popen(
    cmd,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    text=True,
    bufsize=1
)

for line in process.stdout:
    print(line, end='')

process.wait()

if process.returncode == 0:
    print("\n" + "="*60)
    print("  ALL TRAINING + EVALUATION COMPLETE!")
    print("="*60)
else:
    print(f"\nProcess exited with code {process.returncode}")

Running: /usr/bin/python3 -u train_all_models.py --data_dir ./data --checkpoint_dir ./checkpoints_full --epochs 30 --batch_size 64 --seed 42 --models NanoMamba-Small,BC-ResNet-1,BC-ResNet-3,DS-CNN-S,NanoMamba-Tiny-Full,NanoMamba-Tiny-dtOnly,NanoMamba-Tiny-bOnly,NanoMamba-Tiny-Standard --noise_types factory,white,babble --snr_range=-15,-10,-5,0,5,10,15 --per_class


  SmartEar KWS - Complete Training Pipeline
  Device: cuda
  Data: ./data
  Epochs: 30, Seed: 42
  Noise types: factory,white,babble
  SNR range: -15,-10,-5,0,5,10,15
  Time: 2026-02-21 02:33:41

  Loading Google Speech Commands V2...
  [training] 86843 samples, 12 classes
  [validation] 10481 samples, 12 classes
  [testing] 11505 samples, 12 classes

  Train: 86843, Val: 10481, Test: 11505

  Model Summary:
  Name                   |     Params |  Size (KB)
  --------------------------------------------------
  NanoKWS-Tiny           |      1,354 |       5.3
  NanoKWS-Small          |      2,144 |       8.4
  NanoKWS-Base  

In [None]:
#@title Cell 2b: Noise Evaluation Only (학습 완료된 모델만)
#@markdown ### BC-ResNet-1 학습 완료 후 Cell 2를 Ctrl+C로 중단하고 이 셀 실행
#@markdown - NanoMamba-Small (95.24%), NanoMamba-Tiny (92.94%)
#@markdown - DS-CNN-S (96.60%), BC-ResNet-1 (~95.7%)
#@markdown - 3 noise types × 7 SNR levels = 84 evaluations

import subprocess, sys, os
os.chdir('/content/NanoMamba-Interspeech2026')

# Only models that have trained checkpoints
TRAINED_MODELS = "NanoMamba-Small,NanoMamba-Tiny,DS-CNN-S,BC-ResNet-1"

cmd = [
    sys.executable, "-u", "train_all_models.py",
    "--data_dir", "./data",
    "--checkpoint_dir", "./checkpoints_full",
    "--eval_only",
    "--models", TRAINED_MODELS,
    "--noise_types", "factory,white,babble",
    "--snr_range=-15,-10,-5,0,5,10,15",
    "--per_class",
]

print(f"Running: {' '.join(cmd)}\n")
print("=" * 60)
print("  NOISE EVALUATION - 4 models × 3 noise × 7 SNR")
print("  Estimated time: ~30-45 minutes")
print("=" * 60 + "\n")

process = subprocess.Popen(
    cmd,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    text=True,
    bufsize=1
)

for line in process.stdout:
    print(line, end='')

process.wait()

if process.returncode == 0:
    print("\n" + "=" * 60)
    print("  ✅ NOISE EVALUATION COMPLETE!")
    print("  Results saved to: checkpoints_full/results/")
    print("=" * 60)
else:
    print(f"\n❌ Process exited with code {process.returncode}")

In [None]:
#@title Cell 2c: Latency & MACs 측정 (CPU + GPU)
#@markdown Hook 기반 MACs 카운팅 + CPU/GPU latency
#@markdown - NanoMamba: raw audio (1, 16000)
#@markdown - CNN models: mel spectrogram (1, 40, 98)

import torch, time, sys, os, json
import torch.nn as nn
os.chdir('/content/NanoMamba-Interspeech2026')
sys.path.insert(0, '/content/NanoMamba-Interspeech2026')
from train_all_models import create_all_models

# --- MACs counter using forward hooks (no thop needed) ---
def count_macs(model, input_tensor):
    total_macs = [0]
    hooks = []
    def linear_hook(m, inp, out):
        x = inp[0]
        seq = x.shape[1] if x.dim() == 3 else 1
        total_macs[0] += seq * m.in_features * m.out_features
    def conv1d_hook(m, inp, out):
        total_macs[0] += (m.in_channels // m.groups) * m.kernel_size[0] * m.out_channels * out.shape[2]
    def conv2d_hook(m, inp, out):
        total_macs[0] += (m.in_channels // m.groups) * m.kernel_size[0] * m.kernel_size[1] * m.out_channels * out.shape[2] * out.shape[3]
    for m in model.modules():
        if isinstance(m, nn.Linear): hooks.append(m.register_forward_hook(linear_hook))
        elif isinstance(m, nn.Conv1d): hooks.append(m.register_forward_hook(conv1d_hook))
        elif isinstance(m, nn.Conv2d): hooks.append(m.register_forward_hook(conv2d_hook))
    with torch.no_grad():
        model(input_tensor)
    for h in hooks: h.remove()
    return total_macs[0]

# --- Input shape per model type ---
def get_input(model, device):
    if hasattr(model, 'snr_estimator'):
        return torch.randn(1, 16000, device=device)  # NanoMamba: raw audio
    else:
        return torch.randn(1, 40, 98, device=device)  # CNN: mel spectrogram

models_to_test = ['NanoMamba-Small', 'NanoMamba-Tiny', 'DS-CNN-S', 'BC-ResNet-1']
all_models = create_all_models()
results = {}

has_gpu = torch.cuda.is_available()
gpu_name = torch.cuda.get_device_name(0) if has_gpu else 'N/A'
print(f"GPU: {gpu_name}")

header = f"{'Model':<22} {'Params':>8} {'INT8(KB)':>9} {'MACs(M)':>9} {'CPU(ms)':>10}"
if has_gpu: header += f" {'GPU(ms)':>10}"
print(f"\n{header}")
print("=" * len(header))

for name in models_to_test:
    if name not in all_models:
        print(f"{name:<22} NOT FOUND"); continue
    model = all_models[name].eval()
    params = sum(p.numel() for p in model.parameters())
    int8_kb = params / 1024

    # MACs (CPU)
    model_cpu = model.to('cpu')
    inp_cpu = get_input(model_cpu, torch.device('cpu'))
    macs = count_macs(model_cpu, inp_cpu)

    # CPU Latency
    with torch.no_grad():
        for _ in range(50): model_cpu(inp_cpu)
        t0 = time.time()
        for _ in range(200): model_cpu(inp_cpu)
        lat_cpu = (time.time() - t0) / 200 * 1000

    # GPU Latency
    lat_gpu = None
    if has_gpu:
        model_gpu = model.to('cuda')
        inp_gpu = get_input(model_gpu, torch.device('cuda'))
        with torch.no_grad():
            for _ in range(200): model_gpu(inp_gpu)
            torch.cuda.synchronize()
            t0 = time.time()
            for _ in range(1000): model_gpu(inp_gpu)
            torch.cuda.synchronize()
            lat_gpu = (time.time() - t0) / 1000 * 1000

    row = f"{name:<22} {params/1e3:>7.1f}K {int8_kb:>8.1f} {macs/1e6:>8.2f}M {lat_cpu:>9.2f}"
    if lat_gpu is not None: row += f" {lat_gpu:>9.2f}"
    print(row)

    results[name] = {
        'params': int(params), 'int8_kb': round(int8_kb, 1),
        'macs': int(macs), 'macs_m': round(macs/1e6, 2),
        'latency_cpu_ms': round(lat_cpu, 2),
        'latency_gpu_ms': round(lat_gpu, 2) if lat_gpu else None,
        'gpu': gpu_name
    }

# Save
os.makedirs('checkpoints_full/results', exist_ok=True)
with open('checkpoints_full/results/efficiency.json', 'w') as f:
    json.dump(results, f, indent=2)
print(f"\n✅ Saved to checkpoints_full/results/efficiency.json")

In [None]:
#@title Cell 3: 결과 확인 + 다운로드
import json
import glob

# Find result files
result_files = glob.glob('checkpoints_full/results/*.json')
print(f"Found {len(result_files)} result files:")
for f in sorted(result_files):
    print(f"  {f}")

# Show latest results
if result_files:
    latest = sorted(result_files)[-1]
    with open(latest) as f:
        results = json.load(f)

    print(f"\n{'='*70}")
    print(f"  Results from: {latest}")
    print(f"{'='*70}")

    # Clean accuracy table
    if 'model_results' in results:
        print(f"\n{'Model':<30} {'Params':>8} {'Val':>8} {'Test':>8}")
        print('-' * 58)
        for name, data in results['model_results'].items():
            val = data.get('best_val_acc', '-')
            test = data.get('test_acc', '-')
            params = data.get('params', '-')
            val_str = f"{val:.2f}%" if isinstance(val, (int, float)) else str(val)
            test_str = f"{test:.2f}%" if isinstance(test, (int, float)) else str(test)
            print(f"{name:<30} {str(params):>8} {val_str:>8} {test_str:>8}")

    # Noise robustness table
    if 'noise_results' in results:
        print(f"\n{'='*70}")
        print("  Noise Robustness Results")
        print(f"{'='*70}")
        noise_data = results['noise_results']
        for model_name, model_noise in noise_data.items():
            print(f"\n  {model_name}:")
            for noise_type, snr_results in model_noise.items():
                snr_str = ", ".join([f"{snr}dB:{acc:.1f}%"
                                     for snr, acc in sorted(snr_results.items(),
                                     key=lambda x: float(x[0]) if x[0] != 'clean' else 999)])
                print(f"    {noise_type}: {snr_str}")

# Zip all results for download
!zip -r /content/smartear_results.zip checkpoints_full/

from google.colab import files
files.download('/content/smartear_results.zip')
print("\nResults downloaded!")