In [None]:
!pip install pydub



In [None]:
import os
import pathlib

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display

# Set the seed value for experiment reproducibility.
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

In [None]:
#import tensorflow_datasets as tfds
#(train, val, test), data_info = tfds.load('speech_commands', split=['train', 'validation', 'test'], shuffle_files=True, as_supervised=False,
#    with_info=True) 

In [None]:
ds = tf.keras.utils.get_file(
      'speech_commands_v0.02.tar.gz',
      origin="http://download.tensorflow.org/data/speech_commands_v0.02.tar.gz",
      extract=True,
      cache_dir='.', cache_subdir='data')

In [None]:
'''DATASET_PATH = 'data/mini_speech_commands'

data_dir = pathlib.Path(DATASET_PATH)
if not data_dir.exists():
  tf.keras.utils.get_file(
      'mini_speech_commands.zip',
      origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
      extract=True,
      cache_dir='.', cache_subdir='data')'''

In [None]:
#!unzip './data/mini_speech_commands.zip'

In [None]:
#commands = np.array(tf.io.gfile.listdir(str(DATASET_PATH)))
#commands = commands[commands != 'README.md']
#print('Commands:', commands)

In [None]:
commands = np.array(tf.io.gfile.listdir(str('/content/data')))
commands = commands[commands != 'README.md']
commands = commands[commands != '.DS_Store']
commands = commands[commands != 'testing_list.txt']
commands = commands[commands != '.ipynb_checkpoints']
commands = commands[commands != '_background_noise_']
commands = commands[commands != 'LICENSE']
commands = commands[commands != 'speech_commands_v0.02.tar.gz']
commands = commands[commands != 'validation_list.txt']
print('Commands:', commands)

In [None]:
data_dir = '/content/data'

In [None]:
filenames = tf.io.gfile.glob(str(data_dir) + '/*/*')
filenames = tf.random.shuffle(filenames)
num_samples = len(filenames)
print('Number of total examples:', num_samples)

print('Example file tensor:', filenames[0])

In [None]:
train_files = filenames[:85000]
val_files = filenames[85000: 85000 + 10000]
test_files = filenames[85000 + 10000:]

print('Training set size', len(train_files))
print('Validation set size', len(val_files))
print('Test set size', len(test_files))

In [None]:
def decode_audio(audio_binary):
  # Decode WAV-encoded audio files to `float32` tensors, normalized
  # to the [-1.0, 1.0] range. Return `float32` audio and a sample rate.
  try:
      audio, _ = tf.audio.decode_wav(contents=audio_binary)
  except:
    pass
  # Since all the data is single channel (mono), drop the `channels`
  # axis from the array.
  return tf.squeeze(audio, axis=-1)

In [None]:
def get_label(file_path):
  parts = tf.strings.split(
      input=file_path,
      sep=os.path.sep)
  # Note: You'll use indexing here instead of tuple unpacking to enable this
  # to work in a TensorFlow graph.
  return parts[-2]

In [None]:
def get_waveform_and_label(file_path):
  label = get_label(file_path)
  audio_binary = tf.io.read_file(file_path)
  waveform = decode_audio(audio_binary)
  return waveform, label

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

files_ds = tf.data.Dataset.from_tensor_slices(train_files)

waveform_ds = files_ds.map(
    map_func=get_waveform_and_label,
    num_parallel_calls=AUTOTUNE)

In [None]:
rows = 3
cols = 3
n = rows * cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 12))

for i, (audio, label) in enumerate(waveform_ds.take(n)):
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  ax.plot(audio.numpy())
  ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
  label = label.numpy().decode('utf-8')
  ax.set_title(label)

plt.show()

In [None]:
def get_spectrogram(waveform):
  # Zero-padding for an audio waveform with less than 16,000 samples.
  input_len = 16000
  waveform = waveform[:input_len]
  zero_padding = tf.zeros(
      [16000] - tf.shape(waveform),
      dtype=tf.float32)
  # Cast the waveform tensors' dtype to float32.
  waveform = tf.cast(waveform, dtype=tf.float32)
  # Concatenate the waveform with `zero_padding`, which ensures all audio
  # clips are of the same length.
  equal_length = tf.concat([waveform, zero_padding], 0)
  # Convert the waveform to a spectrogram via a STFT.
  spectrogram = tf.signal.stft(
      equal_length, frame_length=255, frame_step=128)
  # Obtain the magnitude of the STFT.
  spectrogram = tf.abs(spectrogram)
  # Add a `channels` dimension, so that the spectrogram can be used
  # as image-like input data with convolution layers (which expect
  # shape (`batch_size`, `height`, `width`, `channels`).
  spectrogram = spectrogram[..., tf.newaxis]
  return spectrogram

In [None]:
for waveform, label in waveform_ds.take(270):
  label = label.numpy().decode('utf-8')
  spectrogram = get_spectrogram(waveform)

print('Label:', label)
print('Waveform shape:', waveform.shape)
print('Spectrogram shape:', spectrogram.shape)
print('Audio playback')
display.display(display.Audio(waveform, rate=16000))

In [None]:
def plot_spectrogram(spectrogram, ax):
  if len(spectrogram.shape) > 2:
    assert len(spectrogram.shape) == 3
    spectrogram = np.squeeze(spectrogram, axis=-1)
  # Convert the frequencies to log scale and transpose, so that the time is
  # represented on the x-axis (columns).
  # Add an epsilon to avoid taking a log of zero.
  log_spec = np.log(spectrogram.T + np.finfo(float).eps)
  height = log_spec.shape[0]
  width = log_spec.shape[1]
  X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
  Y = range(height)
  ax.pcolormesh(X, Y, log_spec)

In [None]:
fig, axes = plt.subplots(2, figsize=(12, 8))
timescale = np.arange(waveform.shape[0])
axes[0].plot(timescale, waveform.numpy())
axes[0].set_title('Waveform')
axes[0].set_xlim([0, 16000])

plot_spectrogram(spectrogram.numpy(), axes[1])
axes[1].set_title('Spectrogram')
plt.show()

In [None]:
def get_spectrogram_and_label_id(audio, label):
  spectrogram = get_spectrogram(audio)
  label_id = tf.argmax(label == commands)
  return spectrogram, label_id

In [None]:
spectrogram_ds = waveform_ds.map(
  map_func=get_spectrogram_and_label_id,
  num_parallel_calls=AUTOTUNE)

In [None]:
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 10))

for i, (spectrogram, label_id) in enumerate(spectrogram_ds.take(n)):
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  plot_spectrogram(spectrogram.numpy(), ax)
  ax.set_title(commands[label_id.numpy()])
  ax.axis('off')

plt.show()

In [None]:
def preprocess_dataset(files):
  files_ds = tf.data.Dataset.from_tensor_slices(files)
  output_ds = files_ds.map(
      map_func=get_waveform_and_label,
      num_parallel_calls=AUTOTUNE)
  output_ds = output_ds.map(
      map_func=get_spectrogram_and_label_id,
      num_parallel_calls=AUTOTUNE)
  return output_ds

In [None]:
train_ds = spectrogram_ds
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)

In [None]:
batch_size = 32
train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)

In [None]:
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)

In [None]:
from tensorflow.keras import layers

In [None]:

for spectrogram, _ in spectrogram_ds.take(1):
  input_shape = spectrogram.shape
  

print('Input shape:', input_shape)
num_labels = len(commands)

# Instantiate the `tf.keras.layers.Normalization` layer.
norm_layer = layers.Normalization()
# Fit the state of the layer to the spectrograms
# with `Normalization.adapt`.
norm_layer.adapt(data=spectrogram_ds.map(map_func=lambda spec, label: spec))

model = models.Sequential([
    layers.Input(shape=input_shape),
    # Downsample the input.
    layers.Resizing(224, 224),
    # Normalize.
    layers.Conv2D(96, 11, strides=4, padding='same'),
    layers.Lambda(tf.nn.local_response_normalization),
    layers.Activation('relu'),
    layers.MaxPooling2D(3, strides=2),
    layers.Conv2D(256, 5, strides=4, padding='same'),
    layers.Lambda(tf.nn.local_response_normalization),
    layers.Activation('relu'),
    layers.MaxPooling2D(3, strides=2),
    layers.Conv2D(384, 3, strides=4, padding='same'),
    layers.Activation('relu'),
    layers.Conv2D(384, 3, strides=4, padding='same'),
    layers.Activation('relu'),
    layers.Conv2D(256, 3, strides=4, padding='same'),
    layers.Activation('relu'),
    layers.Flatten(),
    layers.Dense(4096, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(4096, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_labels)
])

model.summary()

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [None]:
EPOCHS = 15
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

In [None]:
metrics = history.history
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.title('AlexNet')
plt.show()

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)

In [None]:
y_pred = np.argmax(model.predict(test_audio), axis=1)
y_true = test_labels

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')

In [None]:
confusion_mtx = tf.math.confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx,
            xticklabels=commands,
            yticklabels=commands,
            annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)


y_pred = model.predict(test_audio)
y_true = test_labels

cce = tf.keras.losses.SparseCategoricalCrossentropy()
cce(y_true, y_pred).numpy()

In [None]:
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, Add, MaxPooling2D, AveragePooling2D, Dense, Flatten
import tensorflow.keras.activations as activations


In [None]:
from tensorflow.keras.applications import resnet50

In [None]:
model_resnet=resnet50.ResNet50(
    include_top=True,
    weights=None,
    input_shape=([224, 224, 1])
    
)

model_2=models.Sequential()
model_2.add(layers.Resizing(224,224))
model_2.add(model_resnet)

In [None]:
lr, num_epochs, batch_size = 0.001, 10, 256
model_2.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy'],
)

In [None]:
EPOCHS = 15
history = model_2.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    #steps_per_epoch=50,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

In [None]:
metrics = history.history
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.title('ResNet')
plt.show()

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)

In [None]:
y_pred = np.argmax(model_2.predict(test_audio), axis=1)
y_true = test_labels

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)


y_pred = model_2.predict(test_audio)
y_true = test_labels

cce = tf.keras.losses.SparseCategoricalCrossentropy()
cce(y_true, y_pred).numpy()

In [None]:
from tensorflow.keras.applications import densenet

In [None]:
model_densenet=densenet.DenseNet121(
    include_top=True,
    weights=None,
    input_shape=([224, 224, 1])
    
)

model_5=models.Sequential()
model_5.add(layers.Resizing(224,224))
model_5.add(model_densenet)

In [None]:
model_5.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy'],
)

In [None]:
EPOCHS = 15
history = model_5.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    #steps_per_epoch=10,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

In [None]:
metrics = history.history
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.title('DenseNet')
plt.show()


test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)


y_pred = np.argmax(model_5.predict(test_audio), axis=1)
y_true = test_labels

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')


In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)


y_pred = model_5.predict(test_audio)
y_true = test_labels

cce = tf.keras.losses.SparseCategoricalCrossentropy()
cce(y_true, y_pred).numpy()

In [None]:
train_ds = spectrogram_ds
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)


batch_size = 32
train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)


train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)

In [None]:
for spectrogram, _ in spectrogram_ds.take(1):
  input_shape = spectrogram.shape
print('Input shape:', input_shape)
num_labels = len(commands)

# Instantiate the `tf.keras.layers.Normalization` layer.
norm_layer = layers.Normalization()
# Fit the state of the layer to the spectrograms
# with `Normalization.adapt`.
norm_layer.adapt(data=spectrogram_ds.map(map_func=lambda spec, label: spec))




model_3 = models.Sequential([
    layers.Input(shape=input_shape),
    # Downsample the input.
    layers.Resizing(224, 224),
    # Normalize.
    #layers.Conv2D(96, 11, strides=4, padding='same'),
    layers.DepthwiseConv2D( kernel_size=(11,11),   padding='same', depth_multiplier=5),
    layers.Lambda(tf.nn.local_response_normalization),
    layers.Activation('relu'),
    layers.MaxPooling2D(3, strides=2),
    layers.DepthwiseConv2D( kernel_size=(3,3),   padding='same', depth_multiplier=5),
    layers.Lambda(tf.nn.local_response_normalization),
    layers.Activation('relu'),
    layers.MaxPooling2D(3, strides=2),
    layers.DepthwiseConv2D( kernel_size=(3, 3), padding='same', depth_multiplier = 3),
    layers.Activation('relu'),
    #layers.Conv2D( 128, 3,  padding='same', strides=4),
    layers.DepthwiseConv2D( kernel_size=(3,3),   padding='same', depth_multiplier=5),
    layers.Activation('relu'),
    layers.DepthwiseConv2D(3,depth_multiplier = 3, padding='same'),
    layers.Activation('relu'),
    layers.Flatten(),
    layers.Dense(4096, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(512, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_labels)
])

model_3.summary()

In [None]:
model_3.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [None]:
EPOCHS = 10
history = model_3.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),

    
    
)

In [None]:
metrics = history.history
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.title('DenseNet')
plt.show()


test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)


y_pred = np.argmax(model_3.predict(test_audio), axis=1)
y_true = test_labels

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)


y_pred = model_3.predict(test_audio)
y_true = test_labels

cce = tf.keras.losses.SparseCategoricalCrossentropy()
cce(y_true, y_pred).numpy()


In [None]:
train_ds = spectrogram_ds
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)


batch_size = 32
train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)


train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)

In [None]:
for spectrogram, _ in spectrogram_ds.take(1):
  input_shape = spectrogram.shape
print('Input shape:', input_shape)
num_labels = len(commands)

# Instantiate the `tf.keras.layers.Normalization` layer.
norm_layer = layers.Normalization()
# Fit the state of the layer to the spectrograms
# with `Normalization.adapt`.
norm_layer.adapt(data=spectrogram_ds.map(map_func=lambda spec, label: spec))




model_4 = models.Sequential([
    layers.Input(shape=input_shape),
    # Downsample the input.
    layers.Resizing(224, 224),
    # Normalize.
    layers.Conv2D(96, 11,strides=4,padding='same'),
    layers.Lambda(tf.nn.local_response_normalization),
    layers.Activation('relu'),
    layers.MaxPooling2D(3, strides=2),
    layers.SeparableConv2D(64, 5,  padding='same'),
    layers.Lambda(tf.nn.local_response_normalization),
    layers.Activation('relu'),
    layers.MaxPooling2D(3, strides=2),
    layers.SeparableConv2D(128, 3,  padding='same', depth_multiplier=3),
    layers.Activation('relu'),
    layers.Conv2D(64, 3, strides=4, padding='same'),
    layers.Activation('relu'),
    layers.SeparableConv2D(256, 3,  padding='same'),
    layers.Activation('relu'),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_labels)
])

model_4.summary()

In [None]:
model_4.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [None]:
EPOCHS = 15
history = model_4.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2)
    #steps_per_epoch=15
)

In [None]:
metrics = history.history
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.title('DenseNet')
plt.show()


test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)


y_pred = np.argmax(model_4.predict(test_audio), axis=1)
y_true = test_labels

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)


y_pred = model_4.predict(test_audio)
y_true = test_labels


In [None]:
cce = tf.keras.losses.SparseCategoricalCrossentropy()
cce(y_true, y_pred).numpy()


In [None]:
!nvidia-smi -L
