# Chapter 2: Transfer Learning
## Step-by-Step Guide to Adding a New Speech Command
1. Access the model on Google Drive
2. Load the Pretrained Model
If you already have a saved model from the micro speech notebook, load it. This command loads the entire model (architecture (.pb) and weights (variables) from the directory where your saved_model.pb resides.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!saved_model_cli show --dir /content/drive/MyDrive/Notebooks/models --tag_set serve --signature_def serving_default


2025-06-03 00:13:54.786350: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1748909634.896227     752 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1748909634.972235     752 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-06-03 00:13:55.116294: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-06-03 00:14:04.098041: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:152] failed call to cuInit: INTERNAL

In [None]:
import tensorflow as tf
# Enable eager execution, needed for Transfer Learning
tf.compat.v1.disable_eager_execution()  # Disable first (sometimes helps)
tf.compat.v1.enable_eager_execution()   # Force enable
# Check if it's enabled
print("Eager execution enabled:", tf.executing_eagerly())

import numpy as np
import torch

# Load your pretrained model
# Load the TFLite model
model_path = "/content/drive/MyDrive/Colab/models/float_model.tflite"
pretrained_model = tf.lite.Interpreter(model_path=model_path)
pretrained_model.allocate_tensors()

# Get input and output details
input_details = pretrained_model.get_input_details()
print("Input Shape:", input_details[0]['shape'])
output_details = pretrained_model.get_output_details()

# Print summary
print("Model Summary:")
print("Input Shape:", input_details[0]['shape'])
print("Input Type:", input_details[0]['dtype'])
print("Output Shape:", output_details[0]['shape'])
print("Output Type:", output_details[0]['dtype'])

# Create a dummy input with the correct shape and type
dummy_input = np.random.randint(-128, 127, size=(1, 1960), dtype=np.int8)

# load the .pb model for transfer training
pb_model = "/content/drive/MyDrive/Colab/models/saved_model"
# Load the model using TFSMLayer
keras_model = tf.keras.layers.TFSMLayer(pb_model, call_endpoint='serving_default')



Eager execution enabled: True
Input Shape: [   1 1960]
Model Summary:
Input Shape: [   1 1960]
Input Type: <class 'numpy.float32'>
Output Shape: [1 4]
Output Type: <class 'numpy.float32'>


3. Freeze existing layer and modify the Final Layer for Transfer Learning
Assuming the last layer is a Dense layer for classification, you'll replace it with a new Dense layer that has one additional class.

For transfer learning:

- Ensure your preprocessing pipeline produces int8 quantized features of length 1960.

- If you're adding a new word, you'll likely need to modify the final layer(s) of the model to accommodate the new class.

- When fine-tuning, make sure to maintain the input quantization scheme.

To understand the 1960-length input better:

- It could represent 20ms frames of audio, with each frame producing

**98 features (20 * 98 = 1960)**


You should investigate the preprocessing steps used in the original model training to replicate them for your new data.

In [None]:
# Load the SavedModel
pretrained_model = tf.saved_model.load(pb_model)
print(pretrained_model.signatures)

# Extract the inference function
infer = pretrained_model.signatures["serving_default"]

## Get input shape dynamically
input_shape = infer.structured_input_signature[1]["input"].shape[1:]

# Define a Keras wrapper for the SavedModel inference function
class SavedModelWrapper(tf.keras.layers.Layer):
    def __init__(self, infer_function):
        super(SavedModelWrapper, self).__init__()
        self.infer_function = infer_function

    def call(self, inputs):
        return self.infer_function(input=inputs)["output"]

# Create a new input layer
input_tensor = tf.keras.Input(shape=input_shape, dtype=tf.float32, name="input")

# Pass the input through the wrapped SavedModel layer
wrapped_model = SavedModelWrapper(infer)
pretrained_output = wrapped_model(input_tensor)
print(pretrained_output.shape)

# Freeze the original model's weights
for var in pretrained_model.variables:
    var._trainable = False

# Add a new dense output layer for "computer" class
new_output = tf.keras.layers.Dense(5, activation="softmax", name="new_output")(pretrained_output)
new_label_map_index = 5

# Create a new Keras model
transfer_model = tf.keras.Model(inputs=input_tensor, outputs=new_output)

# Compile for training
transfer_model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

# Show model summary
transfer_model.summary()

_SignatureMap({'serving_default': <ConcreteFunction () -> Dict[['output', TensorSpec(shape=(1, 4), dtype=tf.float32, name=None)]] at 0x7C1568459ED0>})
(1, 4)


4. Prepare Your New Data
Ensure you have new training data for the additional command, along with a mix of existing data (to avoid forgetting the old commands). Preprocess this data in the same way as during the original model training.

```
dataset/

├── computer/
│   ├── sample1.wav
│   ├── sample2.wav
│   └── ...
├── stop/
│   ├── sample1.wav
│   ├── sample2.wav
│   └── ...
```




In [None]:
!pip install librosa scipy



##Prepare the Dataset for training on the new data

1. Constants & Setup:

Defines constants such as sample rate (16kHz), window size (1960 samples), and stride (480 samples per window).
Specifies the dataset path (DATASET_PATH) where audio files are stored.
Sets the output file path (OUTPUT_PATH) for the processed dataset.
2. Audio Processing Functions:

load_and_preprocess_audio(file_path):

Loads a .wav file using librosa.
Ensures all audio clips have a fixed length (1 second / 16,000 samples).
Trims excess samples if too long or pads if too short.
Normalizes audio to [-1,1] range.
extract_windows(audio, label, stride, window_size):

Slides a window of 1960 samples over the 16,000-sample audio.
Extracts overlapping windows with a stride of 480 samples.
Assigns the same label to each extracted window.
3. Processing the Entire Dataset:

Iterates through subdirectories in audio_dataset, treating each directory name as a label.
Assigns a numeric label (label_map) to each word category.
For each .wav file:
Loads and processes the audio.
Extracts overlapping windows and assigns labels.
Stores extracted windows and labels in lists.
4. Saving the Processed Data:

Converts collected windows (X) and labels (y) into NumPy arrays.
Saves them into processed_data.npz along with the label_map for later training.

In [None]:
import os
import librosa
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

# Constants
SAMPLE_RATE = 16000  # Ensure all audio files are at 16kHz
AUDIO_LENGTH = 16000  # 1-second clips (16,000 samples)
WINDOW_SIZE = 1960  # Model's input size
STRIDE = 480  # 30ms stride for sliding window

DATASET_PATH = "/content/drive/MyDrive/Colab/audio_dataset"  # Update if needed
OUTPUT_PATH = "processed_data.npz"

def load_and_preprocess_audio(file_path):
    """Loads an audio file, normalizes, and ensures fixed length."""
    audio, sr = librosa.load(file_path, sr=SAMPLE_RATE)  # Load file
    if len(audio) > AUDIO_LENGTH:
        audio = audio[:AUDIO_LENGTH]  # Trim if too long
    else:
        audio = np.pad(audio, (0, AUDIO_LENGTH - len(audio)), 'constant')  # Pad if too short

    # Normalize audio to [-1, 1] range
    audio = audio / np.max(np.abs(audio)) if np.max(np.abs(audio)) > 0 else audio
    return audio

def extract_windows(audio, label, stride=STRIDE, window_size=WINDOW_SIZE):
    """Splits 16,000-sample audio into overlapping 1960-sample windows."""
    windows = []
    labels = []

    for start in range(0, len(audio) - window_size, stride):  # Slide over audio
        window = audio[start:start + window_size]
        windows.append(window)
        labels.append(label)  # Assign label based on whether keyword is inside

    return np.array(windows), np.array(labels)

# Step 1: Load dataset and process audio files
all_windows = []
all_labels = []
label_map = {'computer': 4}  # Assign label 0 to "computer" (update for other words as needed)

for label in sorted(os.listdir(DATASET_PATH)):
    class_path = os.path.join(DATASET_PATH, label)
    if os.path.isdir(class_path) and label == "computer":  # Only process "computer"
        for file_name in os.listdir(class_path):
            if file_name.lower().endswith(".wav"):
                file_path = os.path.join(class_path, file_name)
                audio = load_and_preprocess_audio(file_path)
                windows, labels = extract_windows(audio, label_map[label])
                all_windows.append(windows)
                all_labels.append(labels)

# Step 2: Convert to NumPy arrays
X = np.concatenate(all_windows, axis=0)
y = np.concatenate(all_labels, axis=0)

# Step 3: Save processed data
np.savez(OUTPUT_PATH, X=X, y=y, label_map=label_map)

print(f"Processed data saved as {OUTPUT_PATH}")
print(f"Label map: {label_map}")

Processed data saved as processed_data.npz
Label map: {'computer': 4}


It will save processed_data.npz, which contains:

- X_train, y_train: Training data
- X_val, y_val: Validation data
- X_test, y_test: Test data
- labels: Class mappings

Processed Dataset can be used for training.

*5*. Load the new dataset

In [None]:

label_map = {}

# Load the processed dataset
data = np.load("processed_data.npz", allow_pickle=True)
print("Unique labels in saved dataset:", np.unique(data["y"]))
print("Stored label_map in processed_data.npz:", data["label_map"].item())

# Extract the data and labels
X = data["X"]
y = data["y"]
label_map = data["label_map"].item()  # Ensure it's a dictionary

print(f"Loaded data: {X.shape}, {y.shape}")
print(f"Label map: {label_map}")

# Step 2: Split the data manually into train, validation, and test sets
from sklearn.model_selection import train_test_split

X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

print(f"Train set: {X_train.shape}, {y_train.shape}")
print(f"Validation set: {X_val.shape}, {y_val.shape}")
print(f"Test set: {X_test.shape}, {y_test.shape}")

# Step 3: Optionally normalize or reshape the data as required by your model
# Example: Reshaping for Conv2D input if needed
X_train = X_train.reshape((-1, X_train.shape[1], 1))  # Reshape to (batch_size, time_steps, features, 1)
X_val = X_val.reshape((-1, X_val.shape[1], 1))
X_test = X_test.reshape((-1, X_test.shape[1], 1))

# Example: Normalize data (if needed)
X_train = X_train / np.max(np.abs(X_train), axis=1, keepdims=True)
X_val = X_val / np.max(np.abs(X_val), axis=1, keepdims=True)
X_test = X_test / np.max(np.abs(X_test), axis=1, keepdims=True)

# Now you're ready to train your model!
# model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=10)


Unique labels in saved dataset: [4]
Stored label_map in processed_data.npz: {'computer': 4}
Loaded data: (570, 1960), (570,)
Label map: {'computer': 4}
Train set: (456, 1960), (456,)
Validation set: (57, 1960), (57,)
Test set: (57, 1960), (57,)


##Adjusting the One-Hot Encoding:
1. Ensure correct class indices: After modifying the model's output to have 5 units, ensure that y_train, y_val, and y_test contain labels from the correct range. Since your label map is {'computer': 1}, you might need to map your current dataset to match this.

2. Add Dummy Classes (Optional): If your dataset currently only has "computer" and no other labels (like "yes" and "no"), you might need to simulate those other classes for the training process (or adapt the architecture further for transfer learning).

3. One-Hot Encoding: Assuming you have 5 possible classes now, use one-hot encoding as shown before:

In [None]:
from tensorflow.keras.utils import to_categorical

# One-hot encode the labels for 5 classes
num_classes = np.max(y_train) + 1  # Set the correct number of classes
print(f"Number of classes: {num_classes}")  # Debugging print
print("Unique labels in y_train:", np.unique(y_train))
print("Max label:", np.max(y_train))

y_train_cat = to_categorical(y_train, num_classes=num_classes)
y_val_cat = to_categorical(y_val, num_classes=num_classes)
y_test_cat = to_categorical(y_test, num_classes=num_classes)

# Reverse the dictionary to map numbers to words
label_map_reverse = {v: k for k, v in label_map.items()}

# Print the unique labels in y_train as words
unique_labels = np.unique(y_train)  # Get unique numbers in y_train

print("Label map:", label_map)  # Check the original mapping of labels
print("Reversed Label Map:", label_map_reverse)  # Check if index 9 exists

print("Unique labels in y_train:", np.unique(y_train))  # Check actual labels in y_train
print("Max label:", np.max(y_train))  # Double-check if max label is within range

Number of classes: 5
Unique labels in y_train: [4]
Max label: 4
Label map: {'computer': 4}
Reversed Label Map: {4: 'computer'}
Unique labels in y_train: [4]
Max label: 4


##Training with Your Transfer Model
You can then proceed to train the model with the one-hot encoded labels

In [None]:
print(type(X_train))
print(type(y_train_cat))

print("Eager execution enabled:", tf.executing_eagerly())

history = transfer_model.fit(
    X_train, y_train_cat,
    validation_data=(X_val, y_val_cat),
    epochs=10,
    batch_size=32
)

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
Eager execution enabled: True
Epoch 1/10
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 28ms/step - accuracy: 0.0000e+00 - loss: 45.7004 - val_accuracy: 0.0000e+00 - val_loss: 40.6239
Epoch 2/10
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - accuracy: 0.0000e+00 - loss: 44.2599 - val_accuracy: 0.0000e+00 - val_loss: 39.3605
Epoch 3/10
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step - accuracy: 9.0419e-04 - loss: 42.8765 - val_accuracy: 0.0351 - val_loss: 38.1229
Epoch 4/10
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - accuracy: 0.3823 - loss: 41.5847 - val_accuracy: 0.9298 - val_loss: 36.9118
Epoch 5/10
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step - accuracy: 0.9827 - loss: 40.1987 - val_accuracy: 1.0000 - val_loss: 35.7287
Epoch 6/10
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step - 

7. Convert & Optimize for Deployment

After fine-tuning, convert your model to TensorFlow Lite and apply quantization:

In [None]:
import tensorflow as tf

transfer_model.export("custom_keyword_spotter_finetuned")

# Convert to TensorFlow Lite
converter = tf.lite.TFLiteConverter.from_saved_model("custom_keyword_spotter_finetuned")
tflite_model = converter.convert()

# Save the TFLite model
with open("custom_keyword_spotter_finetuned.tflite", "wb") as f:
    f.write(tflite_model)

print("Fine-tuned model saved as custom_keyword_spotter_finetuned.tflite")


Saved artifact at 'custom_keyword_spotter_finetuned'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 1960), dtype=tf.float32, name='input')
Output Type:
  TensorSpec(shape=(1, 5), dtype=tf.float32, name=None)
Captures:
  136431388531536: TensorSpec(shape=(), dtype=tf.resource, name=None)
  136431388530768: TensorSpec(shape=(), dtype=tf.resource, name=None)
  136431388534032: TensorSpec(shape=(), dtype=tf.resource, name=None)
  136431388534224: TensorSpec(shape=(), dtype=tf.resource, name=None)
  136431388532688: TensorSpec(shape=(), dtype=tf.resource, name=None)
  136431388538064: TensorSpec(shape=(), dtype=tf.resource, name=None)
Fine-tuned model saved as custom_keyword_spotter_finetuned.tflite


8. Convert and Quantize the Model

In [None]:
import tensorflow as tf
import numpy as np

# Enable default quantization optimizations
converter.optimizations = [tf.lite.Optimize.DEFAULT]  # Enable default quantization
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

# Define a representative dataset generator
def representative_dataset_gen():
    for i in range(100):
        # Replace this with your real dataset loading function
        sample = np.random.rand(1, 1960).astype(np.float32)  # Adjust the shape as needed
        yield [sample]

# Set the representative dataset for calibration during quantization
converter.representative_dataset = representative_dataset_gen

# Convert and save the quantized model
quantized_tflite_model = converter.convert()

with open("custom_keyword_spotter_finetuned_quantized.tflite", "wb") as f:
    f.write(quantized_tflite_model)

print("Quantized model saved as custom_keyword_spotter_finetuned_quantized.tflite")

Quantized model saved as custom_keyword_spotter_finetuned_quantized.tflite


## Testing the Transfer Learned model's accuracy

---



Verify that the model we've exported is still accurate, using the TF Lite Python API and our test set.

In [None]:
# Helper function to run inference
def run_tflite_inference(tflite_model_path, model_type="Float"):
  # Load test data
  np.random.seed(0) # set random seed for reproducible test results.
  with tf.compat.v1.Session() as sess:
    test_data, test_labels = audio_processor.get_data(
        -1, 0, model_settings, BACKGROUND_FREQUENCY, BACKGROUND_VOLUME_RANGE,
        TIME_SHIFT_MS, 'testing', sess)
  test_data = np.expand_dims(test_data, axis=1).astype(np.float32)

  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(tflite_model_path,
                                    experimental_op_resolver_type=tf.lite.experimental.OpResolverType.BUILTIN_REF)
  interpreter.allocate_tensors()

  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]

  # For quantized models, manually quantize the input data from float to integer
  if model_type == "Quantized":
    input_scale, input_zero_point = input_details["quantization"]
    test_data = test_data / input_scale + input_zero_point
    test_data = test_data.astype(input_details["dtype"])

  correct_predictions = 0
  for i in range(len(test_data)):
    interpreter.set_tensor(input_details["index"], test_data[i])
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]
    top_prediction = output.argmax()
    correct_predictions += (top_prediction == test_labels[i])

  print('%s model accuracy is %f%% (Number of test samples=%d)' % (
      model_type, (correct_predictions * 100) / len(test_data), len(test_data)))

Clone the TensorFlow Github Repository, which contains the relevant code required to run this test.

In [None]:
!git clone -q --depth 1 https://github.com/tensorflow/tensorflow

In [None]:
import sys
# We add this path so we can import the speech processing modules.
sys.path.append("/content/tensorflow/tensorflow/examples/speech_commands/")
import input_data
import models

LOGS_DIR = 'logs/'
SAMPLE_RATE = 16000
CLIP_DURATION_MS = 1000
WINDOW_SIZE_MS = 30.0
WINDOW_STRIDE = 20
FEATURE_BIN_COUNT = 40
PREPROCESS = 'micro'
SILENT_PERCENTAGE = 10
UNKNOWN_PERCENTAGE = 10
FEATURE_BIN_COUNT = 40
BACKGROUND_FREQUENCY = 0.8
BACKGROUND_VOLUME_RANGE = 0.1
TIME_SHIFT_MS = 100.0


#DATA_URL = 'https://storage.googleapis.com/download.tensorflow.org/data/speech_commands_v0.02.tar.gz'
#DATASET_DIR =  'dataset/'

DATA_URL = ""
DATASET_DIR = '/content/drive/MyDrive/Colab/audio_dataset'  # Update if needed


VALIDATION_PERCENTAGE = 10
TESTING_PERCENTAGE = 10

# 5. Combine the original and new labels to create the full list of `WANTED_WORDS`
WANTED_WORDS = WANTED_WORDS = "yes,no,computer"

# Prepare model settings
model_settings = models.prepare_model_settings(
    len(input_data.prepare_words_list(WANTED_WORDS.split(','))),
    SAMPLE_RATE, CLIP_DURATION_MS, WINDOW_SIZE_MS,
    WINDOW_STRIDE, FEATURE_BIN_COUNT, PREPROCESS)

# Set up audio processor
audio_processor = input_data.AudioProcessor(
    DATA_URL, DATASET_DIR,
    SILENT_PERCENTAGE, UNKNOWN_PERCENTAGE,
    WANTED_WORDS.split(','), VALIDATION_PERCENTAGE,
    TESTING_PERCENTAGE, model_settings, LOGS_DIR)

# Now, you can run inference with your prepared model and audio processor
run_tflite_inference("custom_keyword_spotter_finetuned.tflite", model_type="Float")


# Test with a float model
run_tflite_inference("custom_keyword_spotter_finetuned.tflite", model_type="Float")

# Test with a quantized model (if you have one)
run_tflite_inference("custom_keyword_spotter_finetuned_quantized.tflite", model_type="Quantized")


['/content', '/env/python', '/usr/lib/python311.zip', '/usr/lib/python3.11', '/usr/lib/python3.11/lib-dynload', '', '/usr/local/lib/python3.11/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.11/dist-packages/IPython/extensions', '/root/.ipython', '/usr/local/lib/python3.11/dist-packages/setuptools/_vendor', '/content/tensorflow/tensorflow/examples/speech_commands/', '/content/tensorflow/tensorflow/examples/speech_commands/']


ValueError: Background sample is too short! Need more than 16000 samples but only 16000 were found