This is the PULSE quantized keyword spotting notebook.

To begin, make sure you have the following modules (the versions listed are the ones tested, may also work on others):
- tensorflow 2.13
- larq 0.13.3
- numpy 1.23.5
- librosa 0.10.0
- matplotlib 3.7.1
- scikit-learn 1.2.2

In [None]:
import tensorflow as tf
import librosa
import random
import larq as lq
import pathlib
import numpy as np
import matplotlib.pyplot as plt
import datetime
import os

import keras.callbacks as callbacks
from IPython.display import Audio
from numpy import ndarray

print("TensorFlow version: ", tf.__version__)
tf.config.list_physical_devices()
print("larq version: ", lq.__version__)

This model was trained on the Google Speech Commands v2 dataset, which can be downloaded from [here](https://www.tensorflow.org/datasets/catalog/speech_commands).

Since we are using only parts of the dataset in our use case, we suggest copying/moving the needed folders of the whole dataset to a new folder, and specifying that folder as the DATASET_PATH.

In [None]:
#Dataset Path

DATASET_PATH = "./current_classes"

data_dir = pathlib.Path(DATASET_PATH)

commands = np.array(tf.io.gfile.listdir(str(data_dir)))
commands = commands[commands != '.DS_Store']
print('Commands:', commands)

In the next block of code, the datased is loaded. The main parameter that could need changing is the BATCH_SIZE (default 32). The dataset is then split into training and validation sets, with the default split being 80% training and 20% validation.

In [None]:
#Load audio files and split them into train, validation and test sets.
BATCH_SIZE = 32
SAMPLE_RATE = 16000

from tensorflow.python.data.ops.dataset_ops import BatchDataset
train_ds: BatchDataset
val_ds: BatchDataset
train_ds, val_ds = tf.keras.utils.audio_dataset_from_directory(
    directory=data_dir,
    batch_size=BATCH_SIZE,
    validation_split=0.2,
    output_sequence_length=SAMPLE_RATE,
    seed=123,
    subset="both",
    label_mode="categorical",
    shuffle=True)

Next, we start to preprocess the audio samples. First we remove the last empty dimension and plot a random sample as a waveform. Then we define 3 functions to be used in the preprocessing pipeline:

-add_noises

-time_shift

-pitch_shift

In [None]:
#Squeeze the audio to remove the last dimension

def squeeze(audio, labels):
  audio = tf.squeeze(audio, axis=-1)
  return audio, labels

label_names = train_ds.class_names

train_ds = train_ds.map(squeeze, tf.data.AUTOTUNE, deterministic=True)
val_ds = val_ds.map(squeeze, tf.data.AUTOTUNE, deterministic=True)

In [None]:
#Plot the first audio file

def plot_waveform(audio):
    audio = audio.numpy().flatten()
    plt.figure(figsize=(12, 3))
    plt.plot(audio)
    plt.show()

In [None]:
#Function for adding noises

#Load noise files
pink_noise_file = './_background_noise_/pink_noise.wav'
white_noise_file = './_background_noise_/white_noise.wav'

pink_noise_audio, _ = librosa.load(pink_noise_file, sr=16000)
white_noise_audio, _ = librosa.load(white_noise_file, sr=16000)

# Convert to tensors
white_noise_tensor = tf.convert_to_tensor(white_noise_audio, dtype=tf.float32)
pink_noise_tensor = tf.convert_to_tensor(pink_noise_audio, dtype=tf.float32)

# Add noise to audio
def add_noises(audio_tensor, noise_types=['pink'], noise_probs=[1], noise_levels=[0.01]):
    for noise_type, noise_prob, noise_level in zip(noise_types, noise_probs, noise_levels):
        if random.random() < noise_prob:
            if noise_type == 'white':
                noise_tensor = white_noise_tensor
            elif noise_type == 'pink':
                noise_tensor = pink_noise_tensor

            batch_size, audio_len = tf.shape(audio_tensor)[0], tf.shape(audio_tensor)[1]
            noise_len = int(tf.shape(noise_tensor)[0])
            start = tf.random.uniform((batch_size,), 0, noise_len - audio_len, dtype=tf.int32)
            indices = tf.expand_dims(tf.range(batch_size), axis=1)
            starts = tf.concat([indices, tf.expand_dims(start, axis=1)], axis=1)
            noise = tf.map_fn(lambda x: tf.slice(noise_tensor, [x[1]], [audio_len]), starts, dtype=tf.float32)
            audio_tensor = audio_tensor + noise_level * noise
    return audio_tensor

In [None]:
#Function for time shifting

def time_shift(audio_tensor, shift_range=0.1):
    batch_size, audio_len = tf.shape(audio_tensor)[0], tf.shape(audio_tensor)[1]
    shift_amount = tf.cast(tf.cast(audio_len, tf.float32) * shift_range, tf.int32)
    shift = tf.random.uniform((batch_size,), minval=-shift_amount, maxval=shift_amount, dtype=tf.int32)

    def shift_audio(x):
        audio, delta = x
        paddings = tf.cond(
            delta < 0,
            lambda: ((-delta, 0),),
            lambda: ((0, delta),)
        )
        return tf.pad(tf.slice(audio, [tf.math.maximum(0, delta)], [audio_len - tf.math.abs(delta)]), paddings)

    shifted_audio = tf.map_fn(shift_audio, (audio_tensor, shift), dtype=tf.float32)
    return shifted_audio

In [None]:
#Function for pitch shifting

def pitch_shift(audio_tensor, pitch_range=(0.3, 0.3), sample_rate=16000):
    def shift_pitch(x):
        audio = x.numpy()
        factor = np.random.uniform(pitch_range[0], pitch_range[1])
        n_bins = 12  # You can adjust this value based on your requirements
        shifted_audio = librosa.effects.pitch_shift(audio, sr=sample_rate, n_steps=factor, bins_per_octave=n_bins)
        return tf.convert_to_tensor(shifted_audio, dtype=tf.float32)

    return tf.py_function(shift_pitch, [audio_tensor], tf.float32)


In [None]:
#Function for applying all the preprocessing steps
def preprocess_audio(audio, label, noise_colors=['pink'], noise_probs=[1], noise_levels=[0.01],
                     shift_range=0.1,
                     pitch_range=(-1, 1)):
    audio = time_shift(audio, shift_range)
    audio = add_noises(audio, noise_colors, noise_probs, noise_levels)
    audio = pitch_shift(audio, pitch_range)
    return audio, label

In the next block, we apply the preprocessing to train and validation datasets. Here you can change the following parameters:
- **NOISE_COLORS**: a list with available noise types. Currently 'pink' and 'white' are supported.
- **NOISE_PROBS**: a list with the probabilities of each noise type being applied. Should be between 0 and 1.
- **NOISE_LEVELS**: a list with the noise intensity for each noise type. Default is 0.01.
- **SHIFT_RANGE**: indicates how much the audio will be shifted in time. Default is 0.1.
- **PITCH_RANGE**: indicates how much the audio will be shifted in pitch. Default is (-1, 1).

In [None]:
NOISE_COLORS = ['pink', 'white']
NOISE_PROBS = [0.5, 0.5]
NOISE_LEVELS = [0.01, 0.01]
SHIFT_RANGE = 0.1
PITCH_RANGE = (-1, 1)

# Apply the preprocessing function to your train_ds and val_ds

train_ds = train_ds.map(lambda audio, label: preprocess_audio(audio, label, NOISE_COLORS, NOISE_PROBS, NOISE_LEVELS, SHIFT_RANGE, PITCH_RANGE), tf.data.AUTOTUNE, deterministic=True)
val_ds = val_ds.map(lambda audio, label: preprocess_audio(audio, label, NOISE_COLORS, NOISE_PROBS, NOISE_LEVELS, SHIFT_RANGE, PITCH_RANGE), tf.data.AUTOTUNE, deterministic=True)

Here you can hear a sample to check if the preprocessing is working as expected. Take note that in order to preview the data, the preprocessing effectively gets applied here for the first time and this block will take some time to execute. This is due to how Tensorflow internally maps transformations on the dataset (lazy mapping).

In [None]:
with tf.device('CPU: 0'):
    random_batch = random.choice(list(train_ds))
    random_audio_data, _ = random_batch
    random_audio_data = random_audio_data.numpy()

random_sample_index = random.randint(0, random_audio_data.shape[0] - 1)
random_sample = random_audio_data[random_sample_index]
audio_player = Audio(random_sample, rate=16000)

audio_player

In [None]:
#Half of validation dataset is moved to test dataset

test_ds = val_ds.shard(num_shards=2, index=0)
val_ds = val_ds.shard(num_shards=2, index=1)

Here you can specify the parameters for computing the 2D representation of the audio data. The default method used is the Short Time Fourier Transform (STFT).

Alternatively you can use the Mel Cepstral Coefficients (MFCC) method by setting MFCC=True, but this is not recommended as it is not as robust as STFT. 

You can modify the available parameters for each method, but the default ones should work fine.

In [None]:
#Spectrogram parameters
frame_length = 255 # 320
frame_step = 128 #160
fft_lenght = 256

MFCC = False # Set to False to use spectrogram

#MFCC parameters
num_mel_bins = 40
num_mfccs = 13

In [None]:
#Define and compute the 2D representation of the audio

def get_spectrogram(waveform):
  # Convert the waveform to a spectrogram via a STFT.
  spectrogram = tf.signal.stft(
      waveform, frame_length=frame_length, frame_step=frame_step, fft_length=fft_lenght)
  # Obtain the magnitude of the STFT.
  spectrogram = tf.abs(spectrogram)

  # Add a `channels` dimension, so that the spectrogram can be used
  # as image-like input data with convolution layers (which expect
  # shape (`batch_size`, `height`, `width`, `channels`)).
  spectrogram = spectrogram[..., tf.newaxis]
  return spectrogram

def compute_mfccs(pcm, sample_rate, num_mel_bins, num_mfccs):
    # A 1024-point STFT with frames of 64 ms and 75% overlap.
    stfts = tf.signal.stft(pcm, frame_length=1024, frame_step=256)
    spectrograms = tf.abs(stfts)

    # Warp the linear scale spectrograms into the mel-scale.
    num_spectrogram_bins = tf.shape(stfts)[-1]
    lower_edge_hertz, upper_edge_hertz = 80.0, 7600.0
    linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
        num_mel_bins, num_spectrogram_bins, sample_rate, lower_edge_hertz, upper_edge_hertz)
    mel_spectrograms = tf.tensordot(spectrograms, linear_to_mel_weight_matrix, 1)
    mel_spectrograms.set_shape(spectrograms.shape[:-1].concatenate(linear_to_mel_weight_matrix.shape[-1:]))

    # Compute a stabilized log to get log-magnitude mel-scale spectrograms.
    log_mel_spectrograms = tf.math.log(mel_spectrograms + 1e-6)

    # Compute MFCCs from log_mel_spectrograms and take the first num_mfccs.
    mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrograms)[..., :num_mfccs]
    
    return mfccs



def get_mfcc(waveform, sample_rate, num_mel_bins, num_mfccs):
    # Compute MFCCs from the input waveform.
    mfccs = compute_mfccs(waveform, sample_rate, num_mel_bins, num_mfccs)

    # Add a `channels` dimension, so that the MFCCs can be used
    # as input data with convolution layers (which expect
    # shape (`batch_size`, `height`, `width`, `channels`)).
    mfccs = mfccs[..., tf.newaxis]

    return mfccs


def make_spec_ds(ds: tf.data.Dataset):
  return ds.map(
      map_func=lambda audio,label: (get_spectrogram(audio), label),
      num_parallel_calls=tf.data.AUTOTUNE,
      deterministic=True)

def make_mfcc_ds(ds: tf.data.Dataset):
    return ds.map(
        map_func=lambda audio, label: (get_mfcc(audio, sample_rate = SAMPLE_RATE, num_mel_bins=num_mel_bins, num_mfccs=num_mfccs), label),
        num_parallel_calls=tf.data.AUTOTUNE,
        deterministic=True)

if(MFCC):
    train_spectrogram_ds = make_mfcc_ds(train_ds)
    val_spectrogram_ds = make_mfcc_ds(val_ds)
    test_spectrogram_ds = make_mfcc_ds(test_ds)
else:
    train_spectrogram_ds = make_spec_ds(train_ds)
    val_spectrogram_ds = make_spec_ds(val_ds)
    test_spectrogram_ds = make_spec_ds(test_ds)

The next block of code plots 9 samples after the transformation. 

In [None]:
#Spectrogram plotting function

def plot_spectrogram(spectrogram, ax):
  if len(spectrogram.shape) > 2:
    assert len(spectrogram.shape) == 3
    spectrogram = np.squeeze(spectrogram, axis=-1)
  # Convert the frequencies to log scale and transpose, so that the time is
  # represented on the x-axis (columns).
  # Add an epsilon to avoid taking a log of zero.
  log_spec = np.log(spectrogram.T + np.finfo(float).eps)
  height = log_spec.shape[0]
  width = log_spec.shape[1]
  x = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
  y = range(height)
  ax.pcolormesh(x, y, log_spec)

def plot_mfcc(mfcc, ax):
    if len(mfcc.shape) > 2:
        assert len(mfcc.shape) == 3
        mfcc = np.squeeze(mfcc, axis=-1)
    mfcc_data = np.swapaxes(mfcc, 0, 1)
    ax.imshow(mfcc_data, interpolation='nearest', cmap=plt.cm.coolwarm, origin='lower', aspect='auto')
    ax.set_title('MFCC')


for example_spectrograms, example_spect_labels in train_spectrogram_ds.take(1):
  break

if(MFCC): print("Shape of MFCC: ", example_spectrograms.shape)
else: print("Shape of Spectrogram: ", example_spectrograms.shape)

rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(16, 9))

for i in range(n):
    r = i // cols
    c = i % cols
    ax = axes[r][c]
    if(MFCC): plot_mfcc(example_spectrograms[i].numpy(), ax)
    else: plot_spectrogram(example_spectrograms[i].numpy(), ax)
    ax.set_title(label_names[example_spect_labels[i].numpy().argmax()])

plt.show()

In the next block of code you can toggle normalization of the images. This is disabled by default: with a quantized model we did not see any improvements by normalizing the images. 

In [None]:
normalization = False # Set to True to normalize the spectrogram

#normalization limits
minval = 0
maxval = 1

#Normalization

def normalize_spectrogram(x, min_val = minval, max_val = maxval):
    min_x = tf.reduce_min(x, axis=[1, 2], keepdims=True)
    max_x = tf.reduce_max(x, axis=[1, 2], keepdims=True)
    normalized_x = (x - min_x) * (max_val - min_val) / (max_x - min_x) + min_val
    return normalized_x

if(normalization):
    train_spectrogram_ds = train_spectrogram_ds.map(lambda audio, label: (normalize_spectrogram(audio), label), tf.data.AUTOTUNE)
    val_spectrogram_ds = val_spectrogram_ds.map(lambda audio, label: (normalize_spectrogram(audio), label), tf.data.AUTOTUNE)
    test_spectrogram_ds = test_spectrogram_ds.map(lambda audio, label: (normalize_spectrogram(audio), label), tf.data.AUTOTUNE)

input_shape = example_spectrograms.shape[1:]


Here we define 3 custom objects needed for the quantized model.
- **level_quant_wrapper**: custom weights quantization function
- **CustomLevelClip**: custom constraint function for the weights
- **CustomQuantizeLayer**: this layer is a wrapper for quantizing the activations of each layer. We implemented it as a separate layer rather than as a function in the Larq layers to be able to observe its effect on the data.

In [None]:
#Custom quantization function

def level_quant_wrapper(levels, is_activation=True):
    @tf.custom_gradient
    def level_quant(x):
        
        def grad(dy):
            if is_activation:
                return tf.where(tf.math.logical_and(x >= min(levels), x <= max(levels)), dy, 0)
            else:
                return dy
        
        def quant(x):
            levels_tensor = tf.constant(levels, dtype=tf.float32)
            distances = tf.abs(tf.expand_dims(x, -1) - levels_tensor)
            min_distance_indices = tf.argmin(distances, axis=-1)
            return tf.gather(levels_tensor, min_distance_indices)

        return quant(x), grad

    return level_quant

In [None]:
#Custom Clip function

from keras.constraints import Constraint
import keras.backend as k

class CustomLevelClip(Constraint):
    
    def __init__(self, levels):
        self.levels = levels

    def __call__(self, w):        
        return k.clip(w, min(self.levels), max(self.levels))

    def get_config(self):
        return {'levels': self.levels}

In [None]:
#Custom quantization layer

class CustomQuantizeLayer(tf.keras.layers.Layer):
    def __init__(self, quantizer, input_shape=None, **kwargs):
        if input_shape is not None:
            super(CustomQuantizeLayer, self).__init__(input_shape=input_shape, **kwargs)
        else:
            super(CustomQuantizeLayer, self).__init__(**kwargs)
        self.quantizer = quantizer

    def build(self, input_shape):
        super(CustomQuantizeLayer, self).build(input_shape)

    def call(self, inputs):
        return self.quantizer(inputs)

    def get_config(self):
        config = super().get_config()
        config.update({
            "quantizer": self.quantizer,
            "input_shape": self.input_shape,
        })
        return config

The weights of the model are quantized using a custom quantization function. In the LEVELS list you can specify the values that the weights can assume. You can specify as many values as you want.
Do note that the more the values are concentrated around 0, the better the training accuracy is.

In [None]:
#Custom weight quantization parameters

LEVELS = [-1, -0.8, -0.6, -0.4, -0.2, 0, 0.2, 0.4, 0.6, 0.8, 1]

LEVELS.sort()
print("Custom quantizer levels: ", LEVELS)

max_value = max(LEVELS)
min_value = min(LEVELS)

custom_constraint = CustomLevelClip(levels=LEVELS)

kwargs_no_input_quant = dict(
    kernel_quantizer=level_quant_wrapper(levels=LEVELS, is_activation=False),
    kernel_constraint=custom_constraint,
    kernel_initializer=tf.keras.initializers.RandomUniform(minval=min_value, maxval=max_value, seed=np.random.randint(0, 1000)),
    use_bias=False
)

#quantization of activations is done in a separate layer 
quantizer = lq.quantizers.DoReFa(k_bit=3, mode="activations")

#batch normalization arguments
kwargs_BN = dict(
    scale=True
)

Here we define the model architecture.

This architecture is well tested for 5 keywords (95% accuracy), but you can modify the parameters to fit your use case.

**DROPOUT_RATE**: dropout rate for Conv2D layers

**DROPOUT_RATE_DENSE**: dropout rate for Dense layers

Note that the larq model summary function does not show the correct precision of the weights because standard quantizers work with a fixed number of bits instead of an arbitrary number of levels.

In [None]:
DROPOUT_RATE = 0.15
DROPOUT_RATE_DENSE = 0.3

model = tf.keras.models.Sequential()

model.add(CustomQuantizeLayer(quantizer, input_shape=input_shape))
model.add(lq.layers.QuantConv2D(32, (11, 11), **kwargs_no_input_quant))
model.add(tf.keras.layers.MaxPooling2D(3, 3))
model.add(tf.keras.layers.BatchNormalization(**kwargs_BN))
#dropout
model.add(tf.keras.layers.Dropout(DROPOUT_RATE))

model.add(CustomQuantizeLayer(quantizer))
model.add(lq.layers.QuantConv2D(32, (5, 5), **kwargs_no_input_quant))
model.add(tf.keras.layers.MaxPooling2D((2, 2)))
model.add(tf.keras.layers.BatchNormalization(**kwargs_BN))
#dropout
model.add(tf.keras.layers.Dropout(DROPOUT_RATE))

model.add(CustomQuantizeLayer(quantizer))
model.add(lq.layers.QuantConv2D(32, (3, 3), **kwargs_no_input_quant))
model.add(tf.keras.layers.BatchNormalization(**kwargs_BN))
#dropout
model.add(tf.keras.layers.Dropout(DROPOUT_RATE))

#flatten
model.add(tf.keras.layers.Flatten())
model.add(CustomQuantizeLayer(quantizer))
model.add(lq.layers.QuantDense(len(label_names), **kwargs_no_input_quant))
model.add(tf.keras.layers.BatchNormalization(**kwargs_BN))
#dropout
model.add(tf.keras.layers.Dropout(DROPOUT_RATE_DENSE))
model.add(tf.keras.layers.Activation("softmax"))

model.summary()

#print number of trainable and non-trainable parameters
print("Trainable parameters:", model.count_params())

Finally we can compile and fit our model. We have tested with the Adam optimizer and a 0.005 learning rate. 

There is also some code to save tensorboard logs and to implement early stopping. 

Around 20 epochs should be enough to get good results with 5 classes.

In [None]:
model.compile(tf.keras.optimizers.legacy.Adam(learning_rate=0.005),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

earlystopping = callbacks.EarlyStopping(monitor="val_loss",
                                        mode="min", patience=5,
                                        restore_best_weights=True)

model.fit(train_spectrogram_ds,
          epochs=20,
          callbacks=[tensorboard_callback, earlystopping],
          validation_data=val_spectrogram_ds)

In [None]:
#Evaluate the model on the test dataset
model.evaluate(test_spectrogram_ds)

In [None]:
SAVED_WEIGHTS_PATH = "./saved_weights/"
SAVED_MODELS_PATH = "./saved_models/"
FILE_NAME = "model95.h5"

In [None]:
#Save weights

if not os.path.exists(SAVED_WEIGHTS_PATH):
    os.makedirs(SAVED_WEIGHTS_PATH)

model.save_weights(SAVED_WEIGHTS_PATH + FILE_NAME)

In [None]:
#Save model

model.save(SAVED_MODELS_PATH + FILE_NAME.replace(".h5", ""))

In [None]:
#Load saved weights

model.load_weights(SAVED_WEIGHTS_PATH + FILE_NAME)

if not model._is_compiled:
    model.compile(tf.keras.optimizers.legacy.Adam(learning_rate=0.005),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

In [None]:
#Load Model

with lq.context.quantized_scope(True):
    model = tf.keras.models.load_model("model")
    weights = model.get_weights()
    
model.summary()

Weights are stored as 32bit floating point numbers, to get the quantized weights we need to use larq's quantized_scope.

In [None]:
with lq.context.quantized_scope(True):
    quantized_weights = model.get_weights()
print(quantized_weights)

To end we can calculate the confusion matrix of our model and all the intermediate outputs of each layer.

In [None]:
#Confusion matrix function
def get_true_and_predicted_labels(model, dataset):
    y_pred = []
    y_true = []

    for image_batch, label_batch in dataset:
        y_true.append(tf.argmax(label_batch, axis=-1))  # Convert one-hot encoded labels to class indices
        preds = model.predict(image_batch, verbose=0)
        y_pred.append(np.argmax(preds, axis=-1))

    correct_labels = tf.concat([item for item in y_true], axis=0)
    predicted_labels = tf.concat([item for item in y_pred], axis=0)

    return correct_labels, predicted_labels


from sklearn.metrics import ConfusionMatrixDisplay

def print_confusion_matrix(model, val_spectrogram_ds):
    correct_labels, predicted_labels = get_true_and_predicted_labels(model, val_spectrogram_ds)
    confusion_mtx = tf.math.confusion_matrix(correct_labels, predicted_labels).numpy()

     # Optionally, you can use ConfusionMatrixDisplay from scikit-learn to visualize the confusion matrix
    
    fig, ax = plt.subplots(figsize=(5, 5))
    display = ConfusionMatrixDisplay(confusion_mtx, display_labels=label_names)

    display.plot(xticks_rotation='vertical', ax=ax)
    plt.show()


print_confusion_matrix(model, test_spectrogram_ds)

In [None]:
#function to get the output of a layer

def get_layer_output(model, layer_index, test_ds: tf.data.Dataset):

    # Create a new model with the specified layer's output
    layer_output_model = tf.keras.Model(inputs=model.inputs, outputs=model.layers[layer_index].output)

    # Pass the input_data to the new model to get the output of the specified layer
    example, label = test_ds.rebatch(1).shuffle(len(test_ds)).take(1).as_numpy_iterator().next()    

    #convert label to string
    label = label_names[np.argmax(label)]

    #get layer output
    layer_output = layer_output_model.predict(example)

    return layer_output, label

#functions to plot feature maps

show_colorbar = True

def plot_feature_maps(model, layer_index, test_spectrogram_ds: tf.data.Dataset):

    #handle plotting of input
    if layer_index == -1:
        example, label = test_spectrogram_ds.rebatch(1).shuffle(len(test_spectrogram_ds)).take(1).as_numpy_iterator().next()
        label = label_names[np.argmax(label)]
        plot_conv_feature_maps(example, 1, layer_index, model, label)
        return

    feature_maps, label = get_layer_output(model, layer_index, test_spectrogram_ds)
    
    # Check the dimensions of the layer output
    #if("CustomQuantizeLayer" in str(type(model.layers[layer_index]))):
    #    print("Specified layers is a CustomQuantizeLayer. Cannot plot feature maps.")
    #    return
    if len(feature_maps.shape) == 4:  # Conv2D or MaxPooling2D layers
        num_feature_maps = feature_maps.shape[-1]
        plot_conv_feature_maps(feature_maps, num_feature_maps, layer_index, model, label)
        
    elif len(feature_maps.shape) == 2:  # Dense layer
        plot_dense_feature_maps(feature_maps, layer_index, model, label)
        
    else:
        print(f"Layer {layer_index} has an unsupported output shape. Cannot plot feature maps.")


import numpy as np
import matplotlib.pyplot as plt

import numpy as np
import matplotlib.pyplot as plt

def plot_conv_feature_maps(feature_maps, num_feature_maps, layer_index, model, label, show_colorbar=show_colorbar):
    # Create a grid of subplots
    if num_feature_maps == 1:
        num_cols = 1
        num_rows = 1
        figsize = (10, 7)  # Rectangular dimensions for single-channel input spectrogram
    else:
        num_cols = 4
        num_rows = num_feature_maps // num_cols + (num_feature_maps % num_cols > 0)
        figsize = (15, 15)  # Square dimensions for multi-channel feature maps

    # Set up the figure
    fig, axes = plt.subplots(num_rows, num_cols, figsize=figsize)

    # Set title of the figure as the layer name and index
    if (layer_index != -1) & (layer_index != 0): fig.suptitle(f'Feature Maps of Layer {layer_index}: {model.layers[layer_index].name}\nOutput shape {feature_maps.shape}\nLabel: {label}')
    elif layer_index == -1 : fig.suptitle(f'Input Spectrogram\nLabel: {label}')
    elif layer_index == 0 : fig.suptitle(f'Quantized Input Spectrogram\nLabel: {label}')
    
    # Plot each feature map
    for i in range(num_rows):
        for j in range(num_cols):
            idx = i * num_cols + j
            if idx < num_feature_maps:
                if num_feature_maps == 1:
                    ax = axes
                else:
                    ax = axes[i, j]
                img = ax.imshow(feature_maps[0, :, :, idx], cmap='gray')
                if (layer_index != -1) & (layer_index != 0): ax.set_title(f'Feature Map {idx}')
                ax.axis('off')
                
                # Add a colorbar legend to the right of each image if show_colorbar is True
                if show_colorbar:
                    cbar = fig.colorbar(img, ax=ax)
                    cbar.ax.tick_params(labelsize=8)

    # Display the plot
    plt.show()



def plot_dense_feature_maps(feature_maps, layer_index, model, label):
    # Reshape the feature maps to a 1D array
    reshaped_feature_maps = np.reshape(feature_maps, (-1,))

    # Set up the figure
    fig, ax = plt.subplots(figsize=(15, 5))

    # Set title of the figure as the layer name and index
    fig.suptitle(f'Feature Maps of Layer {layer_index}: {model.layers[layer_index].name}\nOutput shape {feature_maps.shape}\nLabel: {label}')

    # Plot the feature maps as a bar plot
    ax.bar(range(len(reshaped_feature_maps)), reshaped_feature_maps)
    ax.set_xlabel('Feature Map Index')
    ax.set_ylabel('Value')
    #set x ticks to be label names if layer is last one 
    if(layer_index == len(model.layers) - 1):
        ax.set_xticks(np.arange(len(reshaped_feature_maps)), label_names)
        plt.xticks(rotation=90)

    # Display the plot
    plt.show()

To view the output of a specific layer we select a random (but constant between executions) sample by setting a seed.

To select a different layer, change the layer index in the last line of code (-1 shows the input sample).

In [None]:
tf.keras.utils.set_random_seed(399)
plot_feature_maps(model, -1, test_spectrogram_ds)

In [None]:
#print model quantized weights for all layers
with lq.context.quantized_scope(True):
    for layer in model.layers:
            print(layer.name)
            print(layer.get_weights())

In [None]:
#Function to print all layers activations

def print_layers_activations(model: tf.keras.Sequential, test_ds: tf.data.Dataset, mode, index=None):
    
    if mode == "all":
        for i in range(model.layers.__len__()):
            print(f"Layer {i}: {model.layers[i].name}")
            feature_maps, label = get_layer_output(model, i, test_ds)
            print(feature_maps)
            print(feature_maps.shape)
            print("\n")
    elif mode == "single":
        if index == -1:
            example, label = test_ds.rebatch(1).shuffle(len(test_ds)).take(1).as_numpy_iterator().next() 
            print("Spectrogram with label " + label_names[np.argmax(label)])
            print("\n")
            print(example)
            print(example.shape)
            return
        print(f"Layer {index}: {model.layers[index].name}")
        feature_maps, label = get_layer_output(model, index, test_ds)
        np.set_printoptions(threshold=np.inf)
        print(feature_maps)
        print(feature_maps.shape)
        print("\n")
    print(label_names[np.argmax(label)])

#Print all layers activations
print_layers_activations(model, test_spectrogram_ds, "all")