# WakeWord - Audio Keyword Spotting Training

Training a DS-CNN model for keyword spotting on Google Speech Commands dataset.

**Features:**
- MFCC feature extraction
- Data augmentation
- DS-CNN architecture (optimized for edge)
- TFLite export with INT8 quantization

In [None]:
# Install dependencies
!pip install -q librosa soundfile

In [None]:
import os
import numpy as np
import tensorflow as tf
import librosa
from pathlib import Path
from tqdm.notebook import tqdm
import json
from sklearn.model_selection import train_test_split

print(f"TensorFlow: {tf.__version__}")
print(f"GPU Available: {tf.config.list_physical_devices('GPU')}")

## Configuration

In [None]:
CONFIG = {
    'sample_rate': 16000,
    'duration': 1.0,
    'n_mfcc': 40,
    'n_fft': 512,
    'hop_length': 160,
    'n_mels': 80,
    'fmin': 20,
    'fmax': 8000,
    'batch_size': 64,
    'epochs': 50,
    'learning_rate': 0.001,
    'dropout': 0.3,
}
CLASSES = ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go']
NUM_CLASSES = len(CLASSES) + 1  # + unknown (no silence since we dont have it)
print(f"Training on {len(CLASSES)} keywords + unknown = {NUM_CLASSES} classes")

## Dataset Loading

In [None]:
import urllib.request
import tarfile

url = 'http://download.tensorflow.org/data/speech_commands_v0.02.tar.gz'
DATA_PATH = '/kaggle/working/speech_commands'
archive_path = '/kaggle/working/speech_commands.tar.gz'

if not os.path.exists(DATA_PATH):
    print('Downloading Speech Commands dataset...')
    urllib.request.urlretrieve(url, archive_path)
    os.makedirs(DATA_PATH, exist_ok=True)
    print('Extracting...')
    with tarfile.open(archive_path, 'r:gz') as tar:
        tar.extractall(DATA_PATH)
    print('Done!')
print(f'Data path: {DATA_PATH}')

In [None]:
def load_audio(path, sr=16000, duration=1.0):
    audio, _ = librosa.load(path, sr=sr, duration=duration)
    target_length = int(sr * duration)
    if len(audio) < target_length:
        audio = np.pad(audio, (0, target_length - len(audio)))
    else:
        audio = audio[:target_length]
    return audio

def extract_mfcc(audio, sr=16000, n_mfcc=40, n_fft=512, hop_length=160, n_mels=80):
    mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
    return mfccs.T

In [None]:
def get_file_list(data_path, classes, max_per_class=2000):
    files, labels = [], []
    class_to_idx = {c: i for i, c in enumerate(classes)}
    class_to_idx['_unknown_'] = len(classes)
    
    for class_name in classes:
        class_dir = Path(data_path) / class_name
        if class_dir.exists():
            class_files = list(class_dir.glob('*.wav'))[:max_per_class]
            files.extend(class_files)
            labels.extend([class_to_idx[class_name]] * len(class_files))
    
    all_dirs = [d for d in Path(data_path).iterdir() if d.is_dir()]
    unknown_files = []
    for d in all_dirs:
        if d.name not in classes and not d.name.startswith('_'):
            unknown_files.extend(list(d.glob('*.wav'))[:200])
    np.random.shuffle(unknown_files)
    files.extend(unknown_files[:max_per_class])
    labels.extend([class_to_idx['_unknown_']] * min(len(unknown_files), max_per_class))
    
    print(f'Total files: {len(files)}')
    return files, labels, class_to_idx

files, labels, class_to_idx = get_file_list(DATA_PATH, CLASSES)
idx_to_class = {v: k for k, v in class_to_idx.items()}

In [None]:
def process_files(files, labels):
    features, valid_labels = [], []
    for f, label in tqdm(zip(files, labels), total=len(files)):
        try:
            audio = load_audio(str(f), CONFIG['sample_rate'], CONFIG['duration'])
            mfcc = extract_mfcc(audio, CONFIG['sample_rate'], CONFIG['n_mfcc'], CONFIG['n_fft'], CONFIG['hop_length'], CONFIG['n_mels'])
            features.append(mfcc)
            valid_labels.append(label)
        except: continue
    return np.array(features), np.array(valid_labels)

X, y = process_files(files, labels)
print(f'Features: {X.shape}, Labels: {y.shape}')

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

def normalize(X):
    mean = np.mean(X, axis=(1, 2), keepdims=True)
    std = np.std(X, axis=(1, 2), keepdims=True) + 1e-8
    return (X - mean) / std

X_train = normalize(X_train)[..., np.newaxis]
X_val = normalize(X_val)[..., np.newaxis]
X_test = normalize(X_test)[..., np.newaxis]
print(f'Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}')

## DS-CNN Model

In [None]:
from tensorflow.keras import layers, Model

def create_ds_cnn(input_shape, num_classes, dropout=0.3):
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv2D(64, (3, 3), padding='same', use_bias=False)(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    
    for filters in [64, 64, 128, 128]:
        x = layers.DepthwiseConv2D((3, 3), padding='same', use_bias=False)(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        x = layers.Conv2D(filters, (1, 1), padding='same', use_bias=False)(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        if filters <= 64:
            x = layers.MaxPooling2D((2, 2))(x)
    
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128)(x)
    x = layers.ReLU()(x)
    x = layers.Dropout(dropout)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)
    return Model(inputs, outputs, name='ds_cnn')

model = create_ds_cnn(X_train.shape[1:], NUM_CLASSES, CONFIG['dropout'])
model.summary()

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

model.compile(optimizer=Adam(learning_rate=CONFIG['learning_rate']), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

callbacks = [
    ModelCheckpoint('/kaggle/working/best_model.keras', monitor='val_accuracy', save_best_only=True, mode='max', verbose=1),
    EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True, verbose=1),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)
]

history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=CONFIG['epochs'], batch_size=CONFIG['batch_size'], callbacks=callbacks)

In [None]:
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].plot(history.history['loss'], label='Train')
axes[0].plot(history.history['val_loss'], label='Val')
axes[0].set_title('Loss')
axes[0].legend()
axes[1].plot(history.history['accuracy'], label='Train')
axes[1].plot(history.history['val_accuracy'], label='Val')
axes[1].set_title('Accuracy')
axes[1].legend()
plt.savefig('/kaggle/working/training_history.png', dpi=150)
plt.show()

In [None]:
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print(f'Test Loss: {test_loss:.4f}')
print(f'Test Accuracy: {test_acc:.4f}')

y_pred = np.argmax(model.predict(X_test), axis=1)
unique_labels = np.unique(np.concatenate([y_test, y_pred]))
class_names = [idx_to_class[i] for i in unique_labels]

In [None]:
import time

converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open('/kaggle/working/wakeword_float.tflite', 'wb') as f:
    f.write(tflite_model)
print(f'Float model: {len(tflite_model) / 1024:.2f} KB')

def representative_dataset():
    for i in range(min(500, len(X_train))):
        yield [X_train[i:i+1].astype(np.float32)]

converter_int8 = tf.lite.TFLiteConverter.from_keras_model(model)
converter_int8.optimizations = [tf.lite.Optimize.DEFAULT]
converter_int8.representative_dataset = representative_dataset
converter_int8.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter_int8.inference_input_type = tf.int8
converter_int8.inference_output_type = tf.int8
tflite_int8 = converter_int8.convert()
with open('/kaggle/working/wakeword_int8.tflite', 'wb') as f:
    f.write(tflite_int8)
print(f'INT8 model: {len(tflite_int8) / 1024:.2f} KB')

In [None]:
metadata = {
    'classes': [idx_to_class[i] for i in range(NUM_CLASSES)],
    'input_shape': list(X_train.shape[1:]),
    'config': CONFIG,
    'test_accuracy': float(test_acc),
    'model_params': int(model.count_params()),
    'tflite_float_kb': len(tflite_model) / 1024,
    'tflite_int8_kb': len(tflite_int8) / 1024
}
with open('/kaggle/working/metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)
print('Saved metadata.json')

In [None]:
# CLEANUP - Remove dataset to reduce output size
import shutil
if os.path.exists('/kaggle/working/speech_commands'):
    shutil.rmtree('/kaggle/working/speech_commands')
if os.path.exists('/kaggle/working/speech_commands.tar.gz'):
    os.remove('/kaggle/working/speech_commands.tar.gz')

print('Final output files:')
for f in os.listdir('/kaggle/working'):
    print(f'  {f}: {os.path.getsize(f"/kaggle/working/{f}") / 1024:.2f} KB')