In [7]:
import os
import random
import glob
import numpy as np
import tensorflow as tf
import soundfile as sf
from sklearn.utils import class_weight
from sklearn.model_selection import train_test_split
from pathlib import Path

In [9]:
"""
train_wakeword.py

Usage:
    python train_wakeword.py


Notes:
- Assumes data/positive/*.wav and data/negative/*.wav exist (16 kHz mono but code will resample/pad if needed).
- Adjust hyperparameters like N_EPOCHS, BATCH_SIZE, NUM_MELS to taste.
"""

# -----------------------
# CONFIG
# -----------------------
SAMPLE_RATE = 16000
DURATION_SECONDS = 1.25             # ≈1.25 s
NUM_SAMPLES = int(SAMPLE_RATE * DURATION_SECONDS)  # 20000
NUM_MELS = 40
FRAME_LENGTH = 512
FRAME_STEP = 160                    # 10 ms step
FFT_LENGTH = 512
FMIN = 80.0
FMAX = 7600.0

BATCH_SIZE = 32
N_EPOCHS = 50
SEED = 42
AUTOTUNE = tf.data.AUTOTUNE

DATA_DIR = "/Users/sethwright/Documents/audio-model/data"
POS_DIR = os.path.join(DATA_DIR, "positive")
NEG_DIR = os.path.join(DATA_DIR, "negative")

OUT_DIR = "output"
os.makedirs(OUT_DIR, exist_ok=True)



In [10]:
def load_and_fix_length(path, target_sr=SAMPLE_RATE, target_len=NUM_SAMPLES):
    wav, sr = sf.read(path, dtype='float32')
    if wav.ndim > 1:
        wav = np.mean(wav, axis=1)
    if sr != target_sr:
        wav = tf.signal.resample(wav, int(len(wav) * target_sr / sr)).numpy()
    if len(wav) > target_len:
        start = random.randint(0, len(wav) - target_len)
        wav = wav[start:start + target_len]
    elif len(wav) < target_len:
        pad = target_len - len(wav)
        left = pad // 2
        right = pad - left
        wav = np.pad(wav, (left, right), mode='constant')
    return wav.astype(np.float32)

def waveform_to_log_mel(waveform):
    x = tf.convert_to_tensor(waveform, dtype=tf.float32)
    x = tf.reshape(x, [NUM_SAMPLES])
    x = tf.concat([x[:1], x[1:] - 0.97 * x[:-1]], 0)
    stft = tf.signal.stft(
        x, frame_length=FRAME_LENGTH, frame_step=FRAME_STEP,
        fft_length=FFT_LENGTH, window_fn=tf.signal.hann_window)
    mag = tf.abs(stft)
    num_spectrogram_bins = mag.shape[-1]
    mel_weight = tf.signal.linear_to_mel_weight_matrix(
        NUM_MELS, num_spectrogram_bins, SAMPLE_RATE, FMIN, FMAX)
    mel = tf.matmul(mag, mel_weight)
    log_mel = tf.math.log(mel + 1e-6)
    mean = tf.math.reduce_mean(log_mel)
    std = tf.math.reduce_std(log_mel) + 1e-6
    log_mel = (log_mel - mean) / std
    return log_mel  # (time, mels)

# -----------------------
# DATA SPLIT
# -----------------------
pos_files = sorted(glob.glob(os.path.join(POS_DIR, "*.wav")))
neg_files = sorted(glob.glob(os.path.join(NEG_DIR, "*.wav")))

if not pos_files or not neg_files:
    raise SystemExit(f"Need .wav files in {POS_DIR} and {NEG_DIR}.")

all_files = pos_files + neg_files
all_labels = [1]*len(pos_files) + [0]*len(neg_files)

train_files, test_files, train_labels, test_labels = train_test_split(
    all_files, all_labels, test_size=0.15, stratify=all_labels, random_state=SEED)
train_files, val_files, train_labels, val_labels = train_test_split(
    train_files, train_labels, test_size=0.1, stratify=train_labels, random_state=SEED)

print(f"Files: total={len(all_files)}  train={len(train_files)}  val={len(val_files)}  test={len(test_files)}")

# -----------------------
# TF DATASET PIPELINE
# -----------------------
def gen(files, labels):
    for p, l in zip(files, labels):
        yield p.encode("utf-8"), np.int64(l)

def _parse(path_bytes, label):
    path = path_bytes.numpy().decode("utf-8")
    wav = load_and_fix_length(path)
    spec = waveform_to_log_mel(wav).numpy()
    spec = np.expand_dims(spec, axis=-1).astype(np.float32)
    return spec, label

def tf_parse(path, label):
    spec, lab = tf.py_function(_parse, [path, label], [tf.float32, tf.int64])
    time_frames = 1 + (NUM_SAMPLES - FRAME_LENGTH) // FRAME_STEP
    spec.set_shape([time_frames, NUM_MELS, 1])
    lab.set_shape([])
    return spec, lab

def make_dataset(files, labels, shuffle=False):
    ds = tf.data.Dataset.from_generator(
        lambda: gen(files, labels),
        output_signature=(
            tf.TensorSpec(shape=(), dtype=tf.string),
            tf.TensorSpec(shape=(), dtype=tf.int64)))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(files), seed=SEED)
    ds = ds.map(tf_parse, num_parallel_calls=AUTOTUNE)
    ds = ds.batch(BATCH_SIZE).prefetch(AUTOTUNE)
    return ds

train_ds = make_dataset(train_files, train_labels, shuffle=True)
val_ds = make_dataset(val_files, val_labels)
test_ds = make_dataset(test_files, test_labels)
# Extract all labels from train_ds into a flat array
y_train_all = np.concatenate([y.numpy() for _, y in train_ds])

# Compute weights inversely proportional to class frequency
class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train_all),
    y=y_train_all
)

# Convert to dictionary for Keras
class_weights = dict(enumerate(class_weights))
print("Computed class weights:", class_weights)


Files: total=2712  train=2074  val=231  test=407
Computed class weights: {0: 0.5822571588994947, 1: 3.539249146757679}


In [11]:

# -----------------------
# MODEL
# -----------------------
time_frames = 1 + (NUM_SAMPLES - FRAME_LENGTH) // FRAME_STEP
input_shape = (time_frames, NUM_MELS, 1)

def build_model(input_shape):
    inputs = tf.keras.Input(shape=input_shape)
    x = tf.keras.layers.Conv2D(8, (3,3), padding='same', activation='relu')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPool2D((2,2))(x)

    x = tf.keras.layers.Conv2D(16, (3,3), padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPool2D((2,2))(x)

    x = tf.keras.layers.Conv2D(32, (3,3), padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPool2D((2,2))(x)

    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dense(64, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)
    return tf.keras.Model(inputs, outputs)

model = build_model(input_shape)
model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
              loss='binary_crossentropy',
              metrics=['accuracy', tf.keras.metrics.AUC(name='auc')])
model.summary()

# -----------------------
# TRAIN
# -----------------------
# -----------------------
# TRAIN
# -----------------------
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True
    ),

    # ✅ Updated to save in .keras format (no unsupported args)
    tf.keras.callbacks.ModelCheckpoint(
        filepath=os.path.join(OUT_DIR, "best_model"),
        monitor='val_loss',
        save_best_only=True,
        save_weights_only=False,   # must be False to save full model
    )
]
class_weights = {0: 2.0, 1: 1.0}

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=N_EPOCHS,
    callbacks=callbacks,
    class_weight=class_weights
)

# -----------------------
# EVALUATE
# -----------------------
print("\nEvaluation on test set:")
res = model.evaluate(test_ds)
print("Test results (loss, accuracy, auc):", res)

y_true, y_pred = [], []
for x_batch, y_batch in test_ds:
    preds = model.predict(x_batch)
    y_true.extend(y_batch.numpy())
    y_pred.extend(preds.flatten())

y_pred_bin = [1 if p >= 0.5 else 0 for p in y_pred]
from sklearn.metrics import confusion_matrix, classification_report
print("\nConfusion Matrix:")
print(confusion_matrix(y_true, y_pred_bin))
print("\nClassification Report:")
print(classification_report(y_true, y_pred_bin, digits=4))

# -----------------------
# SAVE MODELS
# -----------------------
saved_model_dir = os.path.join(OUT_DIR, "saved_model.keras")
model.save(saved_model_dir)  # ✅ new API for TensorFlow SavedModel export
print("✅ Exported TensorFlow SavedModel to:", saved_model_dir)






Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_3 (InputLayer)        [(None, 122, 40, 1)]      0         
                                                                 
 conv2d_6 (Conv2D)           (None, 122, 40, 8)        80        
                                                                 
 batch_normalization_6 (Bat  (None, 122, 40, 8)        32        
 chNormalization)                                                
                                                                 
 max_pooling2d_6 (MaxPoolin  (None, 61, 20, 8)         0         
 g2D)                                                            
                                                                 
 conv2d_7 (Conv2D)           (None, 61, 20, 16)        1168      
                                                                 
 batch_normalization_7 (Bat  (None, 61, 20, 16)        64  

INFO:tensorflow:Assets written to: output/best_model/assets


Epoch 2/50


INFO:tensorflow:Assets written to: output/best_model/assets


Epoch 3/50


INFO:tensorflow:Assets written to: output/best_model/assets


Epoch 4/50


INFO:tensorflow:Assets written to: output/best_model/assets


Epoch 5/50


INFO:tensorflow:Assets written to: output/best_model/assets


Epoch 6/50
Epoch 7/50


INFO:tensorflow:Assets written to: output/best_model/assets


Epoch 8/50
Epoch 9/50


INFO:tensorflow:Assets written to: output/best_model/assets


Epoch 10/50


INFO:tensorflow:Assets written to: output/best_model/assets


Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50

Evaluation on test set:
Test results (loss, accuracy, auc): [0.056574806571006775, 0.9852579832077026, 0.9965912103652954]

Confusion Matrix:
[[347   2]
 [  5  53]]

Classification Report:
              precision    recall  f1-score   support

           0     0.9858    0.9943    0.9900       349
           1     0.9636    0.9138    0.9381        58

    accuracy                         0.9828       407
   macro avg     0.9747    0.9540    0.9640       407
weighted avg     0.9826    0.9828    0.9826       407

✅ Exported TensorFlow SavedModel to: output/saved_model.keras


In [None]:
# -----------------------
# CONVERT TO TFLITE (INT8)
# -----------------------
def representative_dataset_gen():
    for i, path in enumerate(train_files[:200]):
        wav = load_and_fix_length(path)
        spec = waveform_to_log_mel(wav).numpy().astype(np.float32)
        spec = np.expand_dims(spec, (0, -1))
        yield [spec]

converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

tflite_model = converter.convert()
tflite_path = os.path.join(OUT_DIR, "heychef_int8.tflite")
with open(tflite_path, "wb") as f:
    f.write(tflite_model)
print("Saved quantized TFLite model to:", tflite_path)

print("\n✅ All done. Models and logs are in:", OUT_DIR)

In [None]:
# -----------------------
# CONVERT TO TFLITE (FLOAT32)
# -----------------------
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)

# Keep it as float32 for now (no quantization)
converter.optimizations = []  # no quantization
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]

tflite_model = converter.convert()

tflite_path = os.path.join(OUT_DIR, "heychef_float32.tflite")
with open(tflite_path, "wb") as f:
    f.write(tflite_model)

print("✅ Saved FLOAT32 TFLite model to:", tflite_path)