In [None]:
import os
import pathlib

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display

seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

In [None]:
DATASET_PATH = '../data/mini_speech_commands'

data_dir = pathlib.Path(DATASET_PATH)

if not data_dir.exists():
    tf.keras.utils.get_file(
        'mini_speech_commands.zip',
        origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
        extract=True,
        cache_dir='.', cache_subdir='data')

In [None]:
commands = np.array(tf.io.gfile.listdir(str(data_dir)))
print('Commands:', commands)

In [None]:
commands = commands[commands != 'README.md']

In [None]:
filenames = []
for command in commands:
      filenames += tf.io.gfile.glob(str(data_dir/command) + '/*')

filenames = tf.random.shuffle(filenames)
num_samples = len(filenames)

print('Number of total examples:', num_samples)
for command in commands:
      num_samples_per_command = len(tf.io.gfile.glob(str(data_dir/command) + '/*'))
      print(f'Number of examples for {command}: {num_samples_per_command}')

In [None]:
plt.figure(figsize=(10, 6))
plt.bar(commands, num_samples_per_command, color='skyblue')
plt.xlabel('Commands')
plt.ylabel('Number of Samples')
plt.title('Distribution of Samples per Command')
plt.show()

In [None]:
commands = commands[(commands != 'go')
                & (commands != 'stop') & (commands != 'yes')]

print('Commands:', commands)

In [None]:
filenames = []
for command in commands:
      filenames += tf.io.gfile.glob(str(data_dir/command) + '/*')

filenames = tf.random.shuffle(filenames)
num_samples = len(filenames)
num_samples_per_command = len(tf.io.gfile.listdir(str(data_dir/command)))
print('Number of total examples:', num_samples)
print('Number of examples per label:', num_samples_per_command)
print('Example file tensor:', filenames[0])

In [None]:
plt.figure(figsize=(10, 6))
plt.bar(commands, num_samples_per_command, color='skyblue')
plt.xlabel('Commands')
plt.ylabel('Number of Samples')
plt.title('Distribution of Samples per Command')
plt.show()

In [None]:
train_files = filenames[:4000]
val_files = filenames[4000:4500]
test_files = filenames[4500:]

print('Training set size', len(train_files))
print('Validation set size', len(val_files))
print('Test set size', len(test_files))

In [None]:
test_file = tf.io.read_file(DATASET_PATH+'/down/0a9f9af7_nohash_0.wav')

test_audio, _ = tf.audio.decode_wav(contents=test_file)

test_audio.shape

In [None]:
def decode_audio(audio_binary):
    audio, _ = tf.audio.decode_wav(contents=audio_binary)
    return tf.squeeze(audio, axis=-1)

In [None]:
# def get_label(file_path):
#     parts = tf.strings.split(
#         input=file_path,
#         sep=os.path.sep)
#     return parts[-2]

def get_label(file_path):
    parts = tf.strings.split(input=file_path, sep=os.path.sep)
    label = parts[-2]
    label = tf.cond(label == 'down', lambda: tf.constant('bawah'), lambda: label)
    label = tf.cond(label == 'left', lambda: tf.constant('kiri'), lambda: label)
    label = tf.cond(label == 'right', lambda: tf.constant('kanan'), lambda: label)
    label = tf.cond(label == 'up', lambda: tf.constant('atas'), lambda: label)
    label = tf.cond(label == 'no', lambda: tf.constant('tidak dikenal'), lambda: label)
    return label


In [None]:
# 'down' 'left' 'no' 'right' 'up'

commands = np.array(['bawah', 'kiri', 'tidak dikenal', 'atas'])
commands

In [None]:
# Visualisasi distribusi sampel per label baru
# num_samples_per_command = [len(tf.io.gfile.glob(str(data_dir/command) + '/*')) for command in commands]

plt.figure(figsize=(10, 6))
plt.bar(commands, num_samples_per_command, color='skyblue')
plt.xlabel('Commands')
plt.ylabel('Number of Samples')
plt.title('Distribution of Samples per Command')
plt.show()

In [None]:
def get_waveform_and_label(file_path):
    label = get_label(file_path)
    audio_binary = tf.io.read_file(file_path)
    waveform = decode_audio(audio_binary)
    return waveform, label

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

files_ds = tf.data.Dataset.from_tensor_slices(train_files)

waveform_ds = files_ds.map(
    map_func=get_waveform_and_label,
    num_parallel_calls=AUTOTUNE)

In [None]:
rows = 3
cols = 3
n = rows * cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 12))

for i, (audio, label) in enumerate(waveform_ds.take(n)):
    r = i // cols
    c = i % cols
    ax = axes[r][c]
    ax.plot(audio.numpy())
    ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
    label = label.numpy().decode('utf-8')
    ax.set_title(label)

plt.show()

In [None]:
def get_spectrogram(waveform):
    input_len = 16000
    waveform = waveform[:input_len]
    zero_padding = tf.zeros(
        [16000] - tf.shape(waveform),
        dtype=tf.float32)

    waveform = tf.cast(waveform, dtype=tf.float32)

    equal_length = tf.concat([waveform, zero_padding], 0)

    spectrogram = tf.signal.stft(
        equal_length, frame_length=255, frame_step=128)

    spectrogram = tf.abs(spectrogram)

    spectrogram = spectrogram[..., tf.newaxis]
    return spectrogram

In [None]:
for waveform, label in waveform_ds.take(1):
    label = label.numpy().decode('utf-8')
    spectrogram = get_spectrogram(waveform)

print('Label: ', label)
print('Waveform shape: ', waveform.shape)
print('Spectrogram shape: ', spectrogram.shape)
print('Audio playback')
display.display(display.Audio(waveform, rate=16000))


In [None]:
def plot_spectrogram(spectrogram, ax):
    if len(spectrogram.shape) > 2:
        assert len(spectrogram.shape) == 3
        spectrogram = np.squeeze(spectrogram, axis=-1)

    log_spec = np.log(spectrogram.T + np.finfo(float).eps)
    height = log_spec.shape[0]
    width = log_spec.shape[1]
    X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
    Y = range(height)
    ax.pcolormesh(X, Y, log_spec)

In [None]:
fig, axes = plt.subplots(2, figsize=(12, 8))

# Plot Waveform
timescale = np.arange(waveform.shape[0])
axes[0].plot(timescale, waveform)
axes[0].set_title('Waveform')
axes[0].set_xlabel('Time (samples)')
axes[0].set_ylabel('Amplitude')
axes[0].set_xlim([0, 16000])

# Plot Spectrogram
plot_spectrogram(spectrogram.numpy(), axes[1])
axes[1].set_title('Spectrogram')
axes[1].set_xlabel('Time (samples)')
axes[1].set_ylabel('Frequency (bins)')

# Mengatur jarak antara subplot
plt.subplots_adjust(hspace=0.5)

plt.show()


In [None]:
# def get_spectrogram_and_label_id(audio, label):
#     spectrogram = get_spectrogram(audio)
#     label_id = tf.math.argmax(label == commands)
#     return spectrogram, label_id

def get_spectrogram_and_label_id(audio, label):
    spectrogram = get_spectrogram(audio)
    label_id = tf.math.argmax(label == commands)
    return spectrogram, label_id

In [None]:
spectrogram_ds = waveform_ds.map(
    map_func=get_spectrogram_and_label_id,
    num_parallel_calls=AUTOTUNE)

In [None]:
rows = 3
cols = 3
n = rows * cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 10))

for i, (spectrogram, label_id) in enumerate(spectrogram_ds.take(n)):
    r = i // cols
    c = i % cols
    ax = axes[r][c]
    plot_spectrogram(spectrogram.numpy(), ax)
    ax.set_title(commands[label_id.numpy()])
    ax.set_xlabel('Time (frames)')
    ax.set_ylabel('Frequency (bins)')

# Mengatur jarak antara subplot
plt.subplots_adjust(hspace=0.5, wspace=0.5)

plt.show()


In [None]:
def preprocess_dataset(files):
    files_ds = tf.data.Dataset.from_tensor_slices(files)
    output_ds = files_ds.map(
        map_func=get_waveform_and_label,
        num_parallel_calls=AUTOTUNE)
    output_ds = output_ds.map(
        map_func=get_spectrogram_and_label_id,
        num_parallel_calls=AUTOTUNE)
    return output_ds

In [None]:
train_ds = spectrogram_ds
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)

In [None]:
batch_size = 64
train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)

In [None]:
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)

In [None]:
for spectrogram, _ in spectrogram_ds.take(1):
    input_shape = spectrogram.shape
print('Input shape: ', input_shape)
num_labels = len(commands)

In [None]:
norm_layer = layers.Normalization()
norm_layer.adapt(data=spectrogram_ds.map(map_func=lambda spec, label: spec))

model = models.Sequential([
    layers.Input(shape=input_shape),
    layers.Resizing(32,32),
    norm_layer,
    layers.Conv2D(32, 3, activation='relu'),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_labels),

])

model.summary()

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, 
                                                    reduction=tf.keras.losses.Reduction.NONE),
    metrics=['accuracy']
)

In [None]:
EPOCHS = 10
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    # callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(15, 6))
plt.subplot(1, 2, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')

plt.subplot(1, 2, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
    test_audio.append(audio.numpy())
    test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)

In [None]:
y_pred = np.argmax(model.predict(test_audio), axis=1)
y_true = test_labels

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')

In [None]:
from sklearn.metrics import classification_report

print(classification_report(y_true, y_pred))

In [None]:
confusion_mtx = tf.math.confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx,
            xticklabels=commands,
            yticklabels=commands,
            annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()

In [None]:
model.save('../perintah_suara.h5')