In [None]:
!pip install python_speech_features

In [None]:
!pip install image-classifiers

In [None]:
import os
import pathlib
from tensorflow.keras.layers.experimental import preprocessing
from IPython import display
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf
import os
from scipy.io import wavfile
import pandas as pd
import matplotlib.pyplot as plt
from keras.layers import Conv2D,MaxPooling2D,Flatten,LSTM,BatchNormalization,GlobalAveragePooling2D
from keras.layers import Dropout,Dense,TimeDistributed
from keras.models import Sequential
from keras.applications.resnet import ResNet50
from keras.utils.np_utils import to_categorical
from sklearn.utils.class_weight import compute_class_weight
from tqdm import tqdm
from python_speech_features import mfcc
import pickle
from keras.callbacks import ModelCheckpoint
 
import librosa as lr

In [None]:
tf.__version__

In [None]:
# Set the random seed for TensorFlow and NumPy
tf.random.set_seed(1)
np.random.seed(1)

In [None]:
data_dir = pathlib.Path('data/mini_speech_commands')
if not data_dir.exists():
  tf.keras.utils.get_file(
      'mini_speech_commands.zip',
      origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
      extract=True,
      cache_dir='.', cache_subdir='data')
 
commands = np.array(tf.io.gfile.listdir(str(data_dir)))
commands = commands[commands != 'README.md']
print('Commands:', commands)
 
 
filenames = tf.io.gfile.glob(str(data_dir) + '/*/*')
filenames = tf.random.shuffle(filenames)
num_samples = len(filenames)
print('Number of total examples:', num_samples)
print('Number of examples per label:',
      len(tf.io.gfile.listdir(str(data_dir/commands[0]))))
print('Example file tensor:', filenames[0])

In [None]:
train_files = filenames[:6400]
val_files = filenames[6400: 6400 + 1000]
test_files = filenames[-600:]
 
print('Training set size', len(train_files))
print('Validation set size', len(val_files))
print('Test set size', len(test_files))
 
 
def decode_audio(audio_binary):
  audio, _ = tf.audio.decode_wav(audio_binary)
  return tf.squeeze(audio, axis=-1)
 
def get_label(file_path):
  parts = tf.strings.split(file_path, os.path.sep)
 
  # Note: You'll use indexing here instead of tuple unpacking to enable this 
  # to work in a TensorFlow graph.
  return parts[-2] 

In [None]:
def get_waveform_and_label(file_path):
  label = get_label(file_path)
  print("label")
  print(label)
  audio_binary = tf.io.read_file(file_path)
  waveform = decode_audio(audio_binary)
  print("waveform")
  print(waveform)
  return waveform, label
 
 
 
AUTOTUNE = tf.data.AUTOTUNE
files_ds = tf.data.Dataset.from_tensor_slices(train_files)
waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
 
 
 
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 12))
for i, (audio, label) in enumerate(waveform_ds.take(n)):
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  ax.plot(audio.numpy())
  ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
  label = label.numpy().decode('utf-8')
  ax.set_title(label)
 
plt.show()
 
 
 
def get_spectrogram(waveform):
  # Padding for files with less than 16000 samples
  zero_padding = tf.zeros([16000] - tf.shape(waveform), dtype=tf.float32)
 
  # Concatenate audio with padding so that all audio clips will be of the 
  # same length
  waveform = tf.cast(waveform, tf.float32)
  equal_length = tf.concat([waveform, zero_padding], 0)
  spectrogram = tf.signal.stft(
      equal_length, frame_length=255, frame_step=128)
      
  spectrogram = tf.abs(spectrogram)
 
  return spectrogram
 
 
for waveform, label in waveform_ds.take(1):
  label = label.numpy().decode('utf-8')
  spectrogram = get_spectrogram(waveform)
 
print('Label:', label)
print('Waveform shape:', waveform.shape)
print('Spectrogram shape:', spectrogram.shape)
print('Audio playback')
display.display(display.Audio(waveform, rate=16000))
 
 
def plot_spectrogram(spectrogram, ax):
  # Convert to frequencies to log scale and transpose so that the time is
  # represented in the x-axis (columns).
  log_spec = np.log(spectrogram.T)
  height = log_spec.shape[0]
  width = log_spec.shape[1]
  X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
  Y = range(height)
  ax.pcolormesh(X, Y, log_spec)
 
 
fig, axes = plt.subplots(2, figsize=(12, 8))
timescale = np.arange(waveform.shape[0])
axes[0].plot(timescale, waveform.numpy())
axes[0].set_title('Waveform')
axes[0].set_xlim([0, 16000])
plot_spectrogram(spectrogram.numpy(), axes[1])
axes[1].set_title('Spectrogram')
plt.show()
 
 
def get_spectrogram_and_label_id(audio, label):
  spectrogram = get_spectrogram(audio)
  spectrogram = tf.expand_dims(spectrogram, -1)
  label_id = tf.argmax(label == commands)
  return spectrogram, label_id
 
 
spectrogram_ds = waveform_ds.map(
    get_spectrogram_and_label_id, num_parallel_calls=AUTOTUNE)
 
 
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 10))
for i, (spectrogram, label_id) in enumerate(spectrogram_ds.take(n)):
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  plot_spectrogram(np.squeeze(spectrogram.numpy()), ax)
  ax.set_title(commands[label_id.numpy()])
  ax.axis('off')
  
plt.show()
 
 
def preprocess_dataset(files):
  files_ds = tf.data.Dataset.from_tensor_slices(files)
  output_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
  output_ds = output_ds.map(
      get_spectrogram_and_label_id,  num_parallel_calls=AUTOTUNE)
  return output_ds
 
 
train_ds = spectrogram_ds
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)
print("test_ds")
print(type(train_ds)) 
 
batch_size = 64
train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)
test_ds = test_ds.batch(batch_size) 
 
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)
test_ds = test_ds.cache().prefetch(AUTOTUNE)

In [None]:
iterator = train_ds.__iter__()
next_element = iterator.get_next()
pt = next_element[0]
en = next_element[1]
print(pt.numpy().shape)
print(en.numpy())

In [None]:
iterator1 = val_ds.__iter__()
next_element1 = iterator1.get_next()
pt1 = next_element1[0]
en1 = next_element1[1]
print(pt1.numpy().shape)
print(en1.numpy().shape)

In [None]:
for spectrogram, _ in spectrogram_ds.take(1):
  input_shape = spectrogram.shape
print('Input shape:', input_shape)
num_labels = len(commands)

norm_layer = preprocessing.Normalization()
norm_layer.adapt(spectrogram_ds.map(lambda x, _: x))

In [None]:
from keras import layers
from keras import models
from keras.callbacks import EarlyStopping

In [None]:
input_shape

In [None]:
def preprocess(spectrogram, label):
    spectrogram = tf.repeat(spectrogram, repeats=3, axis=-1)
    return spectrogram, label

spectrogram_ds = spectrogram_ds.map(preprocess)


In [None]:
spectrogram_ds

In [None]:
num_val_samples = 1000
# Split spectrogram_ds into train_ds and val_ds
train_ds = spectrogram_ds.skip(num_val_samples)
val_ds = spectrogram_ds.take(num_val_samples)
test_split = 0.6
# Further split val_ds into val_ds and test_ds
num_test_samples = int(num_val_samples * test_split)
test_ds = val_ds.take(num_test_samples)
val_ds = val_ds.skip(num_test_samples)

# Set batch size and shuffle the train_ds
batch_size = 64
train_ds = train_ds.shuffle(buffer_size=1000).batch(batch_size)
test_ds = test_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)


In [None]:
# Define a function to extract the labels from the dataset
def get_label(spectrogram, label):
    return label

# Map the get_label function to the train_ds to extract the labels
train_labels_ds = train_ds.map(get_label)


In [None]:
test_labels_ds = test_ds.map(get_label)
test_labels_ds


In [None]:
from tensorflow.keras import layers
from tensorflow.keras import models
from tensorflow.keras import optimizers
from tensorflow.keras.applications import VGG19
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications import InceptionV3
from tensorflow.keras.applications import Xception


In [None]:
# VGG19
# Define the input shape
input_shape = (124, 129, 3)

# Define the VGG19 model with pre-trained weights
base_model = VGG19(weights='imagenet', include_top=False, input_shape=input_shape)

# Freeze all layers in the base model
for layer in base_model.layers:
    layer.trainable = False

# Add a custom head to the model
x = layers.Flatten()(base_model.output)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
output = layers.Dense(num_labels, activation='softmax')(x)

# Compile the model
vgg19_model = models.Model(inputs=base_model.input, outputs=output)
vgg19_model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizers.Adam(learning_rate=0.001), metrics=['accuracy'])

print("VGG19")
# Train the model on the train dataset
vgg19_model.fit(train_ds, epochs=50, validation_data=val_ds, callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=5))


In [None]:
# VGG16
# Define the input shape
input_shape = (124, 129, 3)

# Define the VGG16 model with pre-trained weights
base_model = VGG16(weights='imagenet', include_top=False, input_shape=input_shape)

# Freeze all layers in the base model
for layer in base_model.layers:
    layer.trainable = False

# Add a custom head to the model
x = layers.Flatten()(base_model.output)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
output = layers.Dense(num_labels, activation='softmax')(x)

# Compile the model
vgg16_model = models.Model(inputs=base_model.input, outputs=output)
vgg16_model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizers.Adam(learning_rate=0.001), metrics=['accuracy'])

print("VGG16")
# Train the model on the train dataset
vgg16_model.fit(train_ds, epochs=50, validation_data=val_ds, callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=5))


In [None]:
# RESNET50
# Define the input shape
input_shape = (124, 129, 3)

# Define the ResNet50 model with pre-trained weights
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)

# Freeze all layers in the base model
for layer in base_model.layers:
    layer.trainable = True

# Add a custom head to the model
x = layers.GlobalAveragePooling2D()(base_model.output)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
output = layers.Dense(num_labels, activation='softmax')(x)

# Compile the model
resnet_model = models.Model(inputs=base_model.input, outputs=output)
resnet_model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizers.Adam(learning_rate=0.00015), metrics=['accuracy'])


print("ResNet50")
# Train the model on the train dataset
resnet_model.fit(train_ds, epochs=100, validation_data=val_ds, callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=1))


In [None]:
# INCEPTIONV3

# Define the input shape
input_shape = (124, 129, 3)

# Define the InceptionV3 model with pre-trained weights
base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=input_shape)

# Freeze all layers in the base model
for layer in base_model.layers:
    layer.trainable = True

# Add a custom head to the model
x = layers.GlobalAveragePooling2D()(base_model.output)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
output = layers.Dense(num_labels, activation='softmax')(x)

# Compile the model
inception_model = models.Model(inputs=base_model.input, outputs=output)
inception_model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizers.Adam(learning_rate=0.00017), metrics=['accuracy'])

print("InceptionV3")
# Train the model on the train dataset
inception_model.fit(train_ds, epochs=100, validation_data=val_ds, callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=1))


In [None]:
# XCEPTION

# Define the input shape
input_shape = (124, 129, 3)

# Define the Xception model with pre-trained weights
base_model = Xception(weights='imagenet', include_top=False, input_shape=input_shape)

# Freeze all layers in the base model
for layer in base_model.layers:
    layer.trainable = True

# Add a custom head to the model
x = layers.Flatten()(base_model.output)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
output = layers.Dense(num_labels, activation='softmax')(x)

# Compile the model
xception_model = models.Model(inputs=base_model.input, outputs=output)
xception_model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizers.Adam(learning_rate=0.00001), metrics=['accuracy'])

print("Xception")
# Train the model on the train dataset
xception_model.fit(train_ds, epochs=80, validation_data=val_ds, callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=1))
