In [None]:
import os
import random
import glob
import numpy as np
import tensorflow as tf
import soundfile as sf
from sklearn.utils import class_weight
from sklearn.model_selection import train_test_split
from pathlib import Path

In [None]:

# -----------------------
# CONFIG
#Very important to reuse these exact settings for all predictions
# -----------------------
SAMPLE_RATE = 16000
DURATION_SECONDS = 1.00        
NUM_SAMPLES = int(SAMPLE_RATE * DURATION_SECONDS)
NUM_MELS = 40
FRAME_LENGTH = 400
FRAME_STEP = 160                    # 10 ms step
FFT_LENGTH = 512
FMIN = 80.0
FMAX = 7600.0
#Need to tweak to prevent over fitting
BATCH_SIZE = 64
N_EPOCHS = 30
SEED = 42
AUTOTUNE = tf.data.AUTOTUNE
#Change to your path
DATA_DIR = "/Users/sethwright/Documents/audio-model/data"
POS_DIR = os.path.join(DATA_DIR, "Training_POS")
NEG_DIR = os.path.join(DATA_DIR, "Training_NEG")

OUT_DIR = "output"
os.makedirs(OUT_DIR, exist_ok=True)



In [None]:
# -----------------------
# 1️⃣ FILE COLLECTION & DATA SPLIT
# -----------------------
pos_files = sorted(glob.glob(os.path.join(POS_DIR, "*.wav")))
neg_files = sorted(glob.glob(os.path.join(NEG_DIR, "*.wav")))

if not pos_files or not neg_files:
    raise SystemExit(f"No .wav files found in {POS_DIR} or {NEG_DIR}.")

all_files = pos_files + neg_files
all_labels = [1]*len(pos_files) + [0]*len(neg_files)  # 1=positive (wake word), 0=negative

# Train/test split
train_files, test_files, train_labels, test_labels = train_test_split(
    all_files, all_labels, test_size=0.15, stratify=all_labels, random_state=SEED)

# Train/val split
train_files, val_files, train_labels, val_labels = train_test_split(
    train_files, train_labels, test_size=0.1, stratify=train_labels, random_state=SEED)

print(f"Files: total={len(all_files)}, train={len(train_files)}, val={len(val_files)}, test={len(test_files)}")

# -----------------------
# 2️⃣ PREPROCESS HELPERS
# -----------------------

def load_and_fix_length(path, target_sr=SAMPLE_RATE, target_len=NUM_SAMPLES):
    try:
        wav, sr = sf.read(path, dtype='float32')
    except Exception as e:
        print(f"[WARN] Failed to read {path}: {e}. Replacing with silence.")
        wav = np.zeros(target_len, dtype=np.float32)
        sr = target_sr

    if wav.ndim > 1:
        wav = np.mean(wav, axis=1)

    # Resample if needed
    if sr != target_sr:
        try:
            wav = tf.signal.resample(wav, int(len(wav) * target_sr / sr)).numpy()
        except Exception as e:
            print(f"[WARN] Resample failed for {path}: {e}. Using silence.")
            wav = np.zeros(target_len, dtype=np.float32)

    # Fix length (crop or pad)
    if len(wav) > target_len:
        start = random.randint(0, len(wav) - target_len)
        wav = wav[start:start + target_len]
    elif len(wav) < target_len:
        pad = target_len - len(wav)
        left = pad // 2
        right = pad - left
        wav = np.pad(wav, (left, right), mode='constant')

    return wav.astype(np.float32)

def waveform_to_log_mel(waveform):
    # Convert Wav into tensor
    x = tf.convert_to_tensor(waveform, dtype=tf.float32)
    x = tf.reshape(x, [NUM_SAMPLES])
    # Filter unnecessary Sound frequencies
    x = tf.concat([x[:1], x[1:] - 0.97 * x[:-1]], 0)
    # Create Spectrogram with stft
    stft = tf.signal.stft(
        x, frame_length=FRAME_LENGTH, frame_step=FRAME_STEP,
        fft_length=FFT_LENGTH, window_fn=tf.signal.hann_window)
    mag = tf.abs(stft)
    num_spectrogram_bins = mag.shape[-1]
    mel_weight = tf.signal.linear_to_mel_weight_matrix(
        NUM_MELS, num_spectrogram_bins, SAMPLE_RATE, FMIN, FMAX)
    mel = tf.matmul(mag, mel_weight)
    log_mel = tf.math.log(mel + 1e-6)
    mean = tf.math.reduce_mean(log_mel)
    std = tf.math.reduce_std(log_mel) + 1e-6
    log_mel = (log_mel - mean) / std
    return log_mel  # (time, mels)
    
# yield before bringing files into memory to prevent crashing
def gen(files, labels):
    for p, l in zip(files, labels):
        yield p.encode("utf-8"), np.int64(l)

# runs preproccessing logic
def _parse(path_bytes, label):
    path = path_bytes.numpy().decode("utf-8")
    wav = load_and_fix_length(path)
    spec = waveform_to_log_mel(wav).numpy()
    spec = np.expand_dims(spec, axis=-1).astype(np.float32)
    return spec, label
    
# --- TENSORFLOW PIPELINE WRAPPER ---
# Wraps the Python preprocessing logic into a TensorFlow-compatible graph.
def tf_parse(path, label):
    spec, lab = tf.py_function(_parse, [path, label], [tf.float32, tf.int64])
    time_frames = 1 + (NUM_SAMPLES - FRAME_LENGTH) // FRAME_STEP
    spec.set_shape([time_frames, NUM_MELS, 1])
    lab = tf.cast(lab, tf.int64)
    lab.set_shape([])
    return spec, lab



# -----------------------
# 3️⃣ DATASET BUILDERS
# -----------------------
def make_train_dataset(files, labels, batch_size):
    ds = tf.data.Dataset.from_tensor_slices(
        (files, tf.cast(labels, tf.int64))   
    )
    ds = ds.shuffle(buffer_size=len(files), seed=SEED)
    ds = ds.map(tf_parse, num_parallel_calls=AUTOTUNE)
    ds = ds.batch(batch_size).prefetch(AUTOTUNE)
    return ds


def make_dataset(files, labels, shuffle=False):
    ds = tf.data.Dataset.from_generator(
        lambda: gen(files, labels),
        output_signature=(
            tf.TensorSpec(shape=(), dtype=tf.string),
            tf.TensorSpec(shape=(), dtype=tf.int64))
    )
    if shuffle:
        ds = ds.shuffle(buffer_size=len(files), seed=SEED)
    ds = ds.map(tf_parse, num_parallel_calls=AUTOTUNE)
    ds = ds.batch(BATCH_SIZE).prefetch(AUTOTUNE)
    return ds

# -----------------------
# 4️⃣ BUILD TRAIN / VAL / TEST DATASETS
# -----------------------
train_ds = make_train_dataset(train_files, train_labels, batch_size=BATCH_SIZE)
val_ds   = make_dataset(val_files, val_labels)
test_ds  = make_dataset(test_files, test_labels)

# -----------------------
# 5️⃣ CLASS WEIGHTS (10:1)
# label 1 = wake word, 0 = negative
# Smote gave to many false positives
# -----------------------
class_weights = {0: 1.0, 1: 3.0}
print("Class weights:", class_weights)

In [None]:

# -----------------------
# MODEL
# -----------------------
time_frames = 1 + (NUM_SAMPLES - FRAME_LENGTH) // FRAME_STEP
input_shape = (time_frames, NUM_MELS, 1)

def build_model(input_shape):
    #logistic regression model binary classification
    inputs = tf.keras.Input(shape=input_shape)
    x = tf.keras.layers.Conv2D(8, (3,3), padding='same', activation='relu')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPool2D((2,2))(x)

    x = tf.keras.layers.Conv2D(16, (3,3), padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPool2D((2,2))(x)

    x = tf.keras.layers.Conv2D(32, (3,3), padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPool2D((2,2))(x)

    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dense(64, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)
    return tf.keras.Model(inputs, outputs)

model = build_model(input_shape)
model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
              loss='binary_crossentropy',
              metrics=['accuracy', tf.keras.metrics.AUC(name='auc')])
model.summary()

# -----------------------
# TRAIN
# -----------------------
# -----------------------
# TRAIN
# -----------------------
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=3,
        restore_best_weights=True
    ),

 
    tf.keras.callbacks.ModelCheckpoint(
        filepath=os.path.join(OUT_DIR, "best_model"),
        monitor='val_loss',
        save_best_only=True,
        save_weights_only=False,   # must be False to save full model
    )
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=N_EPOCHS,
    class_weight=class_weights,   # ✅ apply 10:1 weighting
    callbacks=callbacks,          # ✅ early stopping, learning rate, etc.
)

# -----------------------
# EVALUATE
# -----------------------
print("\nEvaluation on test set:")
res = model.evaluate(test_ds)
print("Test results (loss, accuracy, auc):", res)

y_true, y_pred = [], []
for x_batch, y_batch in test_ds:
    preds = model.predict(x_batch)
    y_true.extend(y_batch.numpy())
    y_pred.extend(preds.flatten())

y_pred_bin = [1 if p >= 0.5 else 0 for p in y_pred]
from sklearn.metrics import confusion_matrix, classification_report
print("\nConfusion Matrix:")
print(confusion_matrix(y_true, y_pred_bin))
print("\nClassification Report:")
print(classification_report(y_true, y_pred_bin, digits=4))

# -----------------------
# SAVE MODELS
#  Keras model for easy tflite conversion
# -----------------------
saved_model_dir = os.path.join(OUT_DIR, "saved_model3.keras")
model.save(saved_model_dir)  
print("✅ Exported TensorFlow SavedModel to:", saved_model_dir)




In [None]:
# Load trained Keras model
# -----------------------
saved_model_dir = "output/saved_model3"  # no .keras
model.save(saved_model_dir, save_format="tf")
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
print("✅ Model loaded:")
# Convert directly from Keras model in memory


converter.optimizations = []  # float32
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]

tflite_model = converter.convert()

tflite_path = os.path.join(OUT_DIR, "sheila_float32.tflite")
with open(tflite_path, "wb") as f:
    f.write(tflite_model)

print("✅ Saved FLOAT32 TFLite model to:", tflite_path)
