In [None]:
import os
import pathlib

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow_io as tfio

from IPython import display
from IPython.display import Audio
from time import time


In [None]:
DATASET_PATH = '/kaggle/input/mini-speech2/mini_speech_commands'
data_dir = pathlib.Path(DATASET_PATH)

In [None]:
train_ds, val_ds = tf.keras.utils.audio_dataset_from_directory(
    directory=data_dir,
    batch_size=128,
    validation_split=0.2,
    seed=0,
    output_sequence_length=16000,
    subset='both')

label_names = np.array(train_ds.class_names)
#print("label names:", label_names)

In [None]:
def squeeze(audio, labels):
    audio = tf.squeeze(audio, axis=-1)
    return audio, labels

train_ds = train_ds.map(squeeze, tf.data.AUTOTUNE)
val_ds = val_ds.map(squeeze, tf.data.AUTOTUNE)

In [None]:
test_ds = val_ds.shard(num_shards=2, index=0)
val_ds = val_ds.shard(num_shards=2, index=1)

**Convert waveforms to spectrograms**

In [None]:
def get_spectrogram(waveform):
    # Convert the waveform to a spectrogram. SFTF -> returning a 2D Tensor.
    spectrogram = tfio.audio.spectrogram(waveform, nfft=512, window=512, stride=130)
    # Obtain the magnitude.
    spectrogram = tf.abs(spectrogram)
    # shape (`batch_size`, `height`, `width`, `channels`).
    spectrogram = spectrogram[..., tf.newaxis]
    
    return spectrogram

In [None]:
for example_audio, example_labels in train_ds.take(1):  
    break

for i in range(3):
    label = label_names[example_labels[i]]
    waveform = example_audio[i]
    spectrogram = get_spectrogram(waveform)
    
    #print(waveform)
    print('Label:', label)
    print('Waveform shape:', waveform.shape)
    print('Spectrogram shape:', spectrogram.shape)
    print('Audio playback')
    display.display(display.Audio(waveform, rate=16000))

**Mel-Spectogram**

In [None]:
def timeMasking(audio):
    time_mask = tfio.audio.time_mask(audio, param=8)
    return time_mask

In [None]:
def specAugment(audio):
    freq_mask = tfio.audio.freq_mask(audio, param=8)
    return freq_mask

In [None]:
def create_mel_specto(audio):
        
    spectrogram = tfio.audio.spectrogram(audio, nfft=512, window=300, stride=130)
    #spectrogram = tfio.audio.spectrogram(audio, nfft=512, window=512, stride=130)
    
    # Convert to mel-spectrogram
    mel_spectrogram = tfio.audio.melscale(
        spectrogram, rate=32000, mels=128, fmin=0, fmax=14000) 
    # spectrogram, rate=16000, mels=129, fmin=0, fmax=7000) 

    # Convert to db scale mel-spectrogram
    dbscale_mel_spectrogram = tfio.audio.dbscale(
        mel_spectrogram, top_db=85)
    
    dbscale_mel_spectrogram = tf.abs(dbscale_mel_spectrogram) # * -1 wenn zahl < 0
    
    dbscale_mel_spectrogram = dbscale_mel_spectrogram[..., tf.newaxis]
    return dbscale_mel_spectrogram # Output -> ( stride , mels, tf.newaxis )

**Display Spectogram**

In [None]:
def plot_spectrogram(spectrogram, ax):
    if len(spectrogram.shape) > 2:
        assert len(spectrogram.shape) == 3
        spectrogram = np.squeeze(spectrogram, axis=-1)
    
    log_spec = np.log(spectrogram.T + np.finfo(float).eps)
    height = log_spec.shape[0]
    width = log_spec.shape[1]
    X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
    Y = range(height)
    ax.pcolormesh(X, Y, log_spec)

In [None]:
# Normal spectrogram => get_spectrogram(audio)
# Mel-Spec => create_mel_specto(audio)
def make_spec_ds(ds):
    return ds.map(
        map_func=lambda audio,label: (get_spectrogram(audio), label),
        num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
train_spectrogram_ds = make_spec_ds(train_ds)
val_spectrogram_ds = make_spec_ds(val_ds)
test_spectrogram_ds = make_spec_ds(test_ds)

In [None]:
for example_spectrograms, example_spect_labels in train_spectrogram_ds.take(1):
    break
    
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(16, 9))

for i in range(n):
    r = i // cols
    c = i % cols
    ax = axes[r][c]
    plot_spectrogram(example_spectrograms[i], ax)
    ax.set_title(label_names[example_spect_labels[i]])

plt.show()

**Build and Train**

In [None]:
train_spectrogram_ds = train_spectrogram_ds.cache().shuffle(10000).prefetch(tf.data.AUTOTUNE)
val_spectrogram_ds = val_spectrogram_ds.cache().prefetch(tf.data.AUTOTUNE)
test_spectrogram_ds = test_spectrogram_ds.cache().prefetch(tf.data.AUTOTUNE)

In [None]:
input_shape = example_spectrograms.shape[1:]
print('Input shape:', input_shape)
num_labels = len(label_names)

# Instantiate the `tf.keras.layers.Normalization` layer.
norm_layer = layers.Normalization()
norm_layer.adapt(data=train_spectrogram_ds.map(map_func=lambda spec, label: spec))

model = models.Sequential([
     layers.Input(shape=input_shape),
    # Downsample the input.
    layers.Resizing(32, 32),
    # Normalize.
    norm_layer,
    layers.Conv2D(32, 3, activation='relu'),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_labels),
])

model.summary()

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [None]:
EPOCHS = 0
history = model.fit(
    train_spectrogram_ds,
    validation_data=val_spectrogram_ds,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

**Test the Models**

In [None]:
# If you want to loade a pre traind modell
#classicModel = tf.keras.models.load_model("/kaggle/input/mylittlemodells/teLiteMod.h5")
#print("Done")

In [None]:
# Test normal Modell (selectedAudio is for all test)
selectedAudio = "/kaggle/input/mini-speech2/mini_speech_commands/right/0132a06d_nohash_1.wav" # Audio File

x = tf.io.read_file(selectedAudio)
x, _ = tf.audio.decode_wav(x, desired_channels=1, desired_samples=16000,)
x = tf.squeeze(x, axis=-1)
x = x[tf.newaxis, :]

genSpecto = get_spectrogram(x)

time_before = time()
keras_predic = classicModel.predict(genSpecto)
time_after = time()
total_time = time_after - time_before

class_ids = tf.argmax(keras_predic, axis=-1)
class_names = tf.gather(label_names, class_ids)

test = 0.0
for i in keras_predic:
    for j in i:
        if j < 0:
            x = 0
            test += x
        else:
            test += j
#print(test)

for i in keras_predic:
    for j in i:
        if j < 0:
            x = 0
            print(x*100/test)
        else:
            print(j*100/test)


print("The Prediction is:", keras_predic)
print("The Prediction is:", class_names.numpy()[0])
print("Total time:", total_time)


In [None]:
# Test Lite

#Load file
#tfLiteSimple = tf.lite.Interpreter(model_path="/kaggle/input/mylittlemodells/teLiteMod.tflite")
tfLiteSimple = tf.lite.Interpreter(model_path="/kaggle/input/mylittlemodells/teLiteMod_optim.tflite")

tfLiteSimple.allocate_tensors()

#Input optput tensor
input_ten = tfLiteSimple.get_input_details()
output_ten = tfLiteSimple.get_output_details()
#print(input_ten)

In [None]:
tfLiteSimple.set_tensor(input_ten[0]['index'], genSpecto)
time_before = time()
lite_predic = tfLiteSimple.invoke()
time_after = time()
total_time = time_after - time_before
out_data = tfLiteSimple.get_tensor(output_ten[0]['index']) 
test = 0.0
for i in out_data:
    for j in i:
        if j < 0:
            x = 0
            test += x
        else:
            test += j
#print(test)

for i in out_data:
    for j in i:
        if j < 0:
            x = 0
            print(x*100/test)
        else:
            print(j*100/test)


print(out_data)
print("Total time:", total_time)

In [None]:
metrics = history.history
plt.figure(figsize=(16,6))
plt.subplot(1,2,1)
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch')
plt.ylabel('Loss [CrossEntropy]')

plt.subplot(1,2,2)
plt.plot(history.epoch, 100*np.array(metrics['accuracy']), 100*np.array(metrics['val_accuracy']))
plt.legend(['accuracy', 'val_accuracy'])
plt.ylim([0, 100])
plt.xlabel('Epoch')
plt.ylabel('Accuracy [%]')

**Evaluate Model**

In [None]:
model.evaluate(test_spectrogram_ds, return_dict=True)

In [None]:
y_pred = model.predict(test_spectrogram_ds)
y_pred = tf.argmax(y_pred, axis=1)
y_true = tf.concat(list(test_spectrogram_ds.map(lambda s,lab: lab)), axis=0)

confusion_mtx = tf.math.confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx,
            xticklabels=label_names,
            yticklabels=label_names,
            annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()

In [None]:
x = data_dir/'no/01bb6a2a_nohash_0.wav'
x = tf.io.read_file(str(x))
x, sample_rate = tf.audio.decode_wav(x, desired_channels=1, desired_samples=16000,)
x = tf.squeeze(x, axis=-1)
waveform = x
x = create_mel_specto(x)
x = x[tf.newaxis,...]

prediction = model(x)
x_labels = ['no', 'yes', 'down', 'go', 'left', 'up', 'right', 'stop']
plt.bar(x_labels, tf.nn.softmax(prediction[0]))
plt.title('No')
plt.show()

display.display(display.Audio(waveform, rate=16000))