# Voice-Controlled LED: Model Training Notebook

**Workflow:**
1.  Load 2-second `.wav` files.
2.  Convert them to 2D MFCC "images".
3.  Train a Simple 2D CNN to classify them.
4.  Save the quantized model for the Raspberry Pi.


## Step 1: Setup and Dependencies

First, we'll install the necessary libraries and import all required modules.


In [None]:
!pip install librosa soundfile


In [None]:
import numpy as np
import librosa
import soundfile as sf
from pathlib import Path
from typing import Tuple, Optional
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import torch.optim as optim
import matplotlib.pyplot as plt
import scipy.signal


## Step 2: Audio Preprocessing and Dataset Loading

This section handles loading 2-second audio clips, converting them to MFCCs, and wrapping them in a PyTorch `Dataset`.


In [None]:

class AudioPreprocessor:
    """Base class for audio preprocessing."""
    def __init__(self, sample_rate: int = 16000):
        self.sample_rate = sample_rate
    
    def load_audio(self, file_path: str) -> np.ndarray:
        """Load audio file and resample if necessary."""
        audio, sr = librosa.load(file_path, sr=self.sample_rate)
        return audio
    
    def normalize(self, audio: np.ndarray) -> np.ndarray:
        """Normalize audio signal to [-1, 1] range."""
        max_val = np.max(np.abs(audio))
        if max_val > 0:
            return audio / max_val
        return audio
    
    def trim_silence(self, audio: np.ndarray, top_db: int = 20) -> np.ndarray:
        """Trim silence from beginning and end of audio."""
        trimmed, _ = librosa.effects.trim(audio, top_db=top_db)
        return trimmed
    
    def butter_bandpass(self, audio, fs, order=5, lowcut=80, highcut=8000):
        nyq = 0.5 * fs
        low = lowcut / nyq
        high = highcut / nyq
        b, a = scipy.signal.butter(order, [low, high], btype='band')
        y = scipy.signal.lfilter(b, a, audio)
        return y

class VoicePreprocessor(AudioPreprocessor):
    """Master preprocessing pipeline for the project."""
    def __init__(self, sample_rate: int = 16000, n_mfcc: int = 20):
        super().__init__(sample_rate)
        self.n_mfcc = n_mfcc
    
    def preprocess(self, audio: np.ndarray) -> np.ndarray:
        """Main preprocessing function."""
        # Start with baseline preprocessing
        # Corrected sampling frequency argument to self.sample_rate
        audio = self.butter_bandpass(audio, self.sample_rate)
        audio = self.normalize(audio)
        audio = self.trim_silence(audio)
        
        # =================================================================
        # TODO - Signal Processing
        #
        # Experiment with adding signal processing techniques here.
        # Examples:
        # 1. Bandpass filter to focus on speech frequencies (e.g., 80-8000 Hz)
        #    - from scipy.signal import butter, lfilter
        # 2. Notch filter to remove powerline noise (e.g., 60 Hz)
        #    - from scipy.signal import iirnotch
        # 3. Noise reduction algorithms like spectral subtraction.
        #
        # You can add new methods to this class and call them here.
        # =================================================================
        
        return audio
    
    def extract_features(self, audio: np.ndarray) -> np.ndarray:
        """Extract MFCC features from audio to create a 2D 'image'."""
        
        # =================================================================
        # TODO - Feature Extraction
        #
        # The current implementation uses only MFCCs. Experiment with
        # other feature types or combine them.
        #
        # 1. MEL-SPECTROGRAM (Good for CNNs):
        #    mel_spec = librosa.feature.melspectrogram(y=audio, sr=self.sample_rate, n_mels=128)
        #    mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
        #    return mel_spec_db
        #
        # 2. COMBINED FEATURES:
        #    - mfccs = librosa.feature.mfcc(...)
        #    - chroma = librosa.feature.chroma_stft(...)
        #    - combined = np.vstack((mfccs, chroma))
        #    - return combined
        #
        #  adjust the model's input_size if you change the
        # feature dimensions
        # =================================================================
        
        # Baseline feature extraction: MFCCs
        mfccs = librosa.feature.mfcc(
            y=audio,
            sr=self.sample_rate,
            n_mfcc=self.n_mfcc,
            n_fft=400,
            hop_length=160
        )
        return mfccs


class AudioDataset(Dataset):
    """Base dataset class for loading audio files."""
    def __init__(self, data_dir: str, preprocessor: AudioPreprocessor, transform=None):
        self.data_dir = Path(data_dir)
        self.preprocessor = preprocessor
        self.transform = transform
        self.samples = []
        self.labels = []
        self._load_samples()
    
    def _load_samples(self):
        """Load all audio samples and their labels."""
        classes = ['pi_on', 'pi_off', 'background']
        label_map = {cls: idx for idx, cls in enumerate(classes)}
        
        for class_name in classes:
            class_dir = self.data_dir / class_name
            if class_dir.exists():
                for audio_file in class_dir.glob('*.wav'):
                    self.samples.append(str(audio_file))
                    self.labels.append(label_map[class_name])
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        audio = self.preprocessor.load_audio(self.samples[idx])
        label = self.labels[idx]
        return audio, label

class VoiceDataset(AudioDataset):
    """Master dataset class that applies the full preprocessing pipeline."""
    def __init__(self, data_dir: str, preprocessor: VoicePreprocessor):
        super().__init__(data_dir, preprocessor)
        self.preprocessor = preprocessor
        # Target samples for 2 seconds at 16kHz
        self.target_samples = 32000 
    
    def __getitem__(self, idx):
        audio, label = super().__getitem__(idx)
        
        # Pad or trim to exact length
        if len(audio) < self.target_samples:
            audio = np.pad(audio, (0, self.target_samples - len(audio)))
        else:
            audio = audio[:self.target_samples]
        
        # Apply custom preprocessing
        processed_audio = self.preprocessor.preprocess(audio)
        features = self.preprocessor.extract_features(processed_audio)
        
        # Convert to tensor and add Channel dimension for CNN
        # Shape becomes (1, n_mfcc, time_steps)
        features_tensor = torch.FloatTensor(features).unsqueeze(0)
        
        return features_tensor, label


## Step 3: Defining the Simple 2D CNN Architecture

define a standard 2D Convolutional Neural Network to process the MFCC images.


In [None]:
class SimpleCNN(nn.Module):
    """
    Simple 2D CNN for audio classification.
    Input shape: (batch, 1, n_mfcc, time_steps)
    """
    def __init__(self, num_classes: int = 3):
        super(SimpleCNN, self).__init__()
        
        # =================================================================
        # TODO - Model Architecture
        #
        # Experiment with the CNN architecture:
        # 1. Add more Conv2d layers.
        # 2. Change kernel sizes or padding.
        # 3. Adjust the size of the fully connected (Linear) layers.
        # =================================================================
        
        # Layer 1
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        
        # Layer 2
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.flatten = nn.Flatten()
        
        # Fully connected layers
        self.fc1 = nn.Sequential(
            nn.Linear(in_features=16 * 5 * 50, out_features=128),
            nn.ReLU()
        )
        
        self.fc2 = nn.Linear(in_features=128, out_features=num_classes)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.fc2(x)
        return x


## Step 4: Training Pipeline

This section includes the functions for training and validating the model.


In [None]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    """Train for one epoch."""
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0
    
    for data, target in dataloader:
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        pred = output.argmax(dim=1)
        correct += pred.eq(target).sum().item()
        total += target.size(0)
    
    return {
        'loss': total_loss / len(dataloader),
        'accuracy': 100. * correct / total
    }

def validate(model, dataloader, criterion, device):
    """Validate model."""
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, target in dataloader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            
            total_loss += loss.item()
            pred = output.argmax(dim=1)
            correct += pred.eq(target).sum().item()
            total += target.size(0)
    
    return {
        'loss': total_loss / len(dataloader),
        'accuracy': 100. * correct / total
    }


## Step 5: Configuration

Set hyperparameters, define paths, and run training.

1.  Upload your `data` directory to Colab or mount Drive.
2.  Adjust `DATA_DIR` to match.


In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:

# =================================================================
# TODO - Hyperparameter Tuning
#
# Experiment with epochs, batch size, and learning rate
# =================================================================

DATA_DIR = '/content/drive/MyDrive/my_recordings'  # Update this path!
MODEL_SAVE_PATH = '/content/drive/MyDrive/keyword_model.pth'
EPOCHS = 30
BATCH_SIZE = 32
LEARNING_RATE = 0.001
VALIDATION_SPLIT = 0.2


# Setup device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Check data
if not os.path.isdir(DATA_DIR):
    print(f"Error: Data directory '{DATA_DIR}' not found.")
else:
    # Initialize preprocessing and dataset
    preprocessor = VoicePreprocessor(sample_rate=16000, n_mfcc=20)
    dataset = VoiceDataset(DATA_DIR, preprocessor)
    print(f"Total dataset size: {len(dataset)}")
    
    # Split dataset
    val_size = int(len(dataset) * VALIDATION_SPLIT)
    train_size = len(dataset) - val_size
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
    
    # Initialize model
    model = SimpleCNN(num_classes=3).to(device)
    
    # Setup training
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    
    # Training loop
    best_val_acc = 0.0
    print(f"\nStarting training for {EPOCHS} epochs...")
    print("=" * 60)
    
    for epoch in range(EPOCHS):
        train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)
        val_metrics = validate(model, val_loader, criterion, device)
        
        print(f"Epoch {epoch+1}/{EPOCHS} - "
              f"Train Loss: {train_metrics['loss']:.4f}, "
              f"Train Acc: {train_metrics['accuracy']:.2f}%, "
              f"Val Loss: {val_metrics['loss']:.4f}, "
              f"Val Acc: {val_metrics['accuracy']:.2f}%")
        
        if val_metrics['accuracy'] > best_val_acc:
            best_val_acc = val_metrics['accuracy']
            torch.save(model.state_dict(), MODEL_SAVE_PATH)
            print(f"  -> Saved best model (val acc: {best_val_acc:.2f}%)")
    
    print("=" * 60)
    print(f"Training complete! Best validation accuracy: {best_val_acc:.2f}%")
    print(f"Model saved to: {MODEL_SAVE_PATH}")
