In [2]:
# INFO:
# Der Code basiert auf Tensorflow Modulen, und beeinhaltet Funktionen welche in den zur Verfügung gestellten
# Beispiel Colabs von Tensorflow zu finden wanren.
# Jedoch wurden Änderungen vorgenommen, weswegen der Code nicht 1 zu 1 kopiert werden sollte.

In [None]:
!pip install -U -q tensorflow tensorflow_datasets
!apt install --allow-change-held-packages libcudnn8=8.1.0.77-1+cuda11.2
!pip install -U -q tensorflow_addons

In [None]:
import os
import pathlib

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display



In [None]:
# Setzen eines konstanten Seeds um die Auswahl zu reproduzieren 
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

In [None]:
# Importieren der Trainingsdaten aus privatem Google-Drive Speicher 
from google.colab import drive
import shutil

# "G:\My Drive\second_augmented_dataset.zip"
drive.mount('/content/gdrive/', force_remount=True)
%cd gdrive/MyDrive/
shutil.copy("/testing.zip", "/content/" )


In [None]:
DATASET_PATH = "data/speech_commands/dataset" # Umbenennung des Datenssatz-Ordners in "dataset"
os.mkdir('/content/data/speech_commands')

data_dir = pathlib.Path(DATASET_PATH)

In [None]:
%cd /content 
commands = np.array(tf.io.gfile.listdir(str(data_dir))) # Erstellung einer python list mit allen Symbolnamen 
print('Commands:', commands)

# OUT: ['Ahornblatt' 'Klecks' 'Tropfen' 'Kleeblatt' 'Clown' 'Iglu' 'Hand'
#  'Fragezeichen' 'Uhr' 'KДse' 'Flasche' 'Stoppschild' 'SchildkrФte'
#  'EiswБrfel' 'Kaktus' 'Kerze' 'Schloss' 'Mond' 'Anker' 'Auto' 'Blitz'
#  'Geist' 'NotenschlБssel' 'Katze' 'Baum' 'Drache' 'Spinnennetz' 'Spinne'
#  'Vogel' 'Mann' 'Zebra' 'Sonne' 'Ausrufezeichen' 'Schneeflocke' 'Delfin'
#  'Apfel' 'Herz' 'MarienkДfer' 'Totenkopf' 'Pferd' 'Auge' 'GlБhbirne'
#  'Dino' 'Schneemann' 'Lippen' 'Feuer' 'Stift' 'Bombe' 'Hund' 'YingYang'
#  'Schere' 'Hammer' 'Fadenkreuz' 'Blume' 'Karotte' 'SchlБssel' 'background'
#  'Brille']

In [None]:
train_ds, val_ds = tf.keras.utils.audio_dataset_from_directory(
    directory=data_dir,
    batch_size=64,
    validation_split=0.2,
    seed=0,
    output_sequence_length=16000,
    subset='both')

label_names = np.array(train_ds.class_names)
print()
print("label names:", label_names)

In [None]:
# Funktion squeeze entfernt die zusätzlichen listen, welche nicht benötigt werden, da die Trainingsdaten nur 1CH besitzen 
def squeeze(audio, labels):
  audio = tf.squeeze(audio, axis=-1)
  print("audio", audio)
  print("labels",labels)
  return audio, labels

train_ds = train_ds.map(squeeze, tf.data.AUTOTUNE)
val_ds = val_ds.map(squeeze, tf.data.AUTOTUNE)

In [None]:
# Aufteilung der Daten in ein Trainings- und Test- Datenset 
test_ds = val_ds.shard(num_shards=2, index=0)
val_ds = val_ds.shard(num_shards=2, index=1)

print(train_ds)
print(val_ds)

In [None]:
print("train_ds.take(1)", train_ds.take(1))

for example_audio, example_labels in train_ds.take(1):  
  print("example_audio.shape", example_audio.shape)
  print(example_labels.shape)

In [None]:
# Konvertiert die Rohdaten (waveform) in ein Spektrogramm mit STFT
def get_spectrogram(waveform):
  spectrogram = tf.signal.stft(waveform, frame_length=253, frame_step=128)
  # Umfang des STFT wird durch absoluten Wert des Tensors berechnet 
  spectrogram = tf.abs(spectrogram)

  # Add a `channels` dimension, so that the spectrogram can be used
  # as image-like input data with convolution layers (which expect
  # shape (`batch_size`, `height`, `width`, `channels`).
  spectrogram = spectrogram[..., tf.newaxis]
  return spectrogram

# Funktion gibt ein Bild des Spektrogramms zurück  
def plot_spectrogram(spectrogram, ax):
  if len(spectrogram.shape) > 2:
    assert len(spectrogram.shape) == 3
    spectrogram = np.squeeze(spectrogram, axis=-1)
  # Convert the frequencies to log scale and transpose, so that the time is
  # represented on the x-axis (columns).
  # Add an epsilon to avoid taking a log of zero.
  log_spec = np.log(spectrogram.T + np.finfo(float).eps)
  height = log_spec.shape[0]
  width = log_spec.shape[1]
  X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
  Y = range(height)
  ax.pcolormesh(X, Y, log_spec)

# Funktion erstellt ein Datenset aus den Spektrogrammen 
def make_spec_ds(ds):
  return ds.map(
      map_func=lambda audio,label: (get_spectrogram(audio), label),
      num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
# Datenset aus Spektrogrammen wird erstellt 
train_spectrogram_ds = make_spec_ds(train_ds)
val_spectrogram_ds = make_spec_ds(val_ds)
test_spectrogram_ds = make_spec_ds(test_ds)

In [None]:
# Unterusuchung von Spektrogramme für verschiedene Beispiele aus dem Datenset
print("train_spectrogram_ds", train_spectrogram_ds)
for example_spectrograms, example_spect_labels in train_spectrogram_ds.take(1):
  break
print("example_spectrograms", example_spectrograms)
print("example_spect_labels", example_spect_labels)

In [None]:
# Reduzierung der Latenz beim späteren Trainieren durch vorheriges cachen 
train_spectrogram_ds = train_spectrogram_ds.cache().shuffle(10000).prefetch(tf.data.AUTOTUNE)
val_spectrogram_ds = val_spectrogram_ds.cache().prefetch(tf.data.AUTOTUNE)
test_spectrogram_ds = test_spectrogram_ds.cache().prefetch(tf.data.AUTOTUNE)

In [None]:
# Erstellung eines Sequential-Models 

input_shape = example_spectrograms.shape[1:]
print('Input shape:', input_shape)
num_labels = len(label_names)
print("num_labels", num_labels)

# Normalisierung jedes Pixels für seinen  Mittelwert und seiner Standardabweichung
norm_layer = layers.Normalization()

# Layer werden an die Spektogrammdaten mit .adapt() angepasst 
norm_layer.adapt(data=train_spectrogram_ds.map(map_func=lambda spec, label: spec))

model = models.Sequential([
    layers.Input(shape=input_shape),
    # Downsample the input
    layers.Resizing(64, 64),
    # Normalize data input
    norm_layer,
    layers.Conv2D(32, 3, activation='relu'),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_labels),
])

model.summary()

# OUT:
# Input shape: (124, 129, 1)
# num_labels 58
# Model: "sequential"
# _________________________________________________________________
#  Layer (type)                Output Shape              Param #   
# =================================================================
#  resizing (Resizing)         (None, 64, 64, 1)         0         
                                                                 
#  normalization (Normalizatio  (None, 64, 64, 1)        3         
#  n)                                                              
                                                                 
#  conv2d (Conv2D)             (None, 62, 62, 32)        320       
                                                                 
#  conv2d_1 (Conv2D)           (None, 60, 60, 64)        18496     
                                                                 
#  max_pooling2d (MaxPooling2D  (None, 30, 30, 64)       0         
#  )                                                               
                                                                 
#  dropout (Dropout)           (None, 30, 30, 64)        0         
                                                                 
#  flatten (Flatten)           (None, 57600)             0         
                                                                 
#  dense (Dense)               (None, 128)               7372928   
                                                                 
#  dropout_1 (Dropout)         (None, 128)               0         
                                                                 
#  dense_1 (Dense)             (None, 58)                7482      
                                                                 
# =================================================================
# Total params: 7,399,229
# Trainable params: 7,399,226
# Non-trainable params: 3
# _________________________________________________________________

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [None]:
# Trainieren des kompilierten Sequential-Models
EPOCHS = 20
history = model.fit(
    train_spectrogram_ds,
    validation_data=val_spectrogram_ds,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

In [None]:
# Auswertung der Daten
metrics = history.history
print("metrics", metrics)
plt.figure(figsize=(16,6))
plt.subplot(1,2,1)
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch')
plt.ylabel('Loss [CrossEntropy]')

plt.subplot(1,2,2)
plt.plot(history.epoch, 100*np.array(metrics['accuracy']), 100*np.array(metrics['val_accuracy']))
plt.legend(['accuracy', 'val_accuracy'])
plt.ylim([0, 100])
plt.xlabel('Epoch')
plt.ylabel('Accuracy [%]')

In [None]:
class ExportModel(tf.Module):
  def __init__(self, model):
    self.model = model

    # Accept either a string-filename or a batch of waveforms.
    # YOu could add additional signatures for a single wave, or a ragged-batch. 
    self.__call__.get_concrete_function(
        x=tf.TensorSpec(shape=(), dtype=tf.string))
    self.__call__.get_concrete_function(
       x=tf.TensorSpec(shape=[None, 16000], dtype=tf.float32))


  @tf.function
  def __call__(self, x):
    # If they pass a string, load the file and decode it. 
    if x.dtype == tf.string:
      x = tf.io.read_file(x)
      x, _ = tf.audio.decode_wav(x, desired_channels=1, desired_samples=16000,)
      x = tf.squeeze(x, axis=-1)
      x = x[tf.newaxis, :]
    
    x = get_spectrogram(x)  
    result = self.model(x, training=False)
    
    class_ids = tf.argmax(result, axis=-1)
    class_names = tf.gather(label_names, class_ids)
    return {'predictions':result,
            'class_ids': class_ids,
            'class_names': class_names}

In [None]:
from keras.models import model_from_json

# Speichern des Modells 

model_json = model.to_json() # serialize model to JSON
with open("model.json", "w") as json_file:
    json_file.write(model_json)
    
model.save_weights("model.h5") # # serialize weights to HDF5
print("Saved model to disk")