# Imports

In [1]:
import itertools
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pickle
import pylab
import random
import shutil
import tensorflow as tf
import wave

from pathlib import Path
from PIL import Image
from scipy import signal
from scipy.io import wavfile
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_recall_fscore_support

KeyboardInterrupt: ignored

# Variables & Hyperparameters

## Variables

In [None]:
DATA_TYPE = "raw_audio"
DIRECTORY = "/content/drive/MyDrive/Classes/CSCE 5222 Feature Engineering/Group Project/"
# DATA_DIR = DIRECTORY + "2k-files/" + DATA_TYPE + "/"
AUDIO_DIR = "/content/AudioMNIST/data/"
SPEC_DIR = DIRECTORY + "spectrograms/"

## Hyperparameters

In [None]:
N_CHANNELS = 4
N_CLASSES = 10

EPOCHS = 2
BATCH_SIZE = 10
MAX_LEN = 220
MAX_SIZE = MAX_LEN * MAX_LEN

# Load Dataset

## Download Repository

In [None]:
# Clone repository
! git clone https://github.com/soerenab/AudioMNIST

In [None]:
# Create folder
if not os.path.exists(SPEC_DIR):
    os.makedirs(SPEC_DIR)

In [None]:
# Get raw audio data
def get_wav_info(wav_file):
    wav = wave.open(wav_file, 'r')
    frames = wav.readframes(-1)
    sound_info = pylab.frombuffer(frames, 'int16')
    frame_rate = wav.getframerate()
    wav.close()
    return sound_info, frame_rate

# Generate spectrogram images for all .wav files
c = 0
data = []
for (r, d, files) in os.walk(AUDIO_DIR):
    for f in files:

        f_in_path = r + "/" + f
        f_out_path = SPEC_DIR + f.split(".")[0] + "_spectrogram.png"
        if ".wav" in f and not os.path.exists(f_out_path):

            # Get audio info
            sound_info, frame_rate = get_wav_info(f_in_path)

            # Generate spectrograms
            pylab.figure(figsize=(2, 2), dpi=int(MAX_LEN/2))
            pylab.specgram(sound_info, Fs=frame_rate)
            pylab.savefig(f_out_path)
            pylab.close()

    #     c += 1
    #     if c == 20:
    #         break
    # if c == 20:
    #     break

In [None]:
# Remove unneded files
shutil.rmtree("AudioMNIST/")

## Train Generator

In [None]:
def gen_train():
    random.seed(42)

    # Get train ids
    data = []
    for j in range(1, 61):
        for i in range(10):
            for k in range(50):
                if j < 49:
                    n = str(j)
                    n = n.rjust(2, "0")
                    n = "%d_%s_%d"%(i, n, k)
                    data.append(n)

    # Iterate through ids
    random.shuffle(data)
    c = 0
    for i, n in enumerate(data):
        usr = n.split("_")[1]
        f_name = SPEC_DIR + "/" + n + "_spectrogram.png"
        
        if os.path.exists(f_name):
            try:

                # Load image
                img = tf.io.read_file(f_name)
                img = tf.io.decode_png(img, channels=N_CHANNELS)

            except:
                continue

            # Convert to numpy
            img_n = img.numpy()

            # Get y
            f = f_name.split("/")[-1]
            y = f.split("_")[0]
            y = int(y)
        
            # c += 1
            # if c > 20:
            #     break

            yield img_n, y
# gen_train()

## Validation Generator

In [None]:
def gen_val():
    random.seed(42)

    # Get train ids
    data = []
    for j in range(1, 61):
        for i in range(10):
            for k in range(50):
                if j > 48 and j < 55:
                    n = str(j)
                    n = n.rjust(2, "0")
                    n = "%d_%s_%d"%(i, n, k)
                    data.append(n)

    # Iterate through ids
    random.shuffle(data)
    c = 0
    for i, n in enumerate(data):
        usr = n.split("_")[1]
        f_name = SPEC_DIR + "/" + n + "_spectrogram.png"
        
        if os.path.exists(f_name):
            try:

                # Load image
                img = tf.io.read_file(f_name)
                img = tf.io.decode_png(img, channels=N_CHANNELS)

            except:
                continue

            # Convert to numpy
            img_n = img.numpy()

            # Get y
            f = f_name.split("/")[-1]
            y = f.split("_")[0]
            y = int(y)
        
            # c += 1
            # if c > 20:
            #     break

            yield img_n, y

## Test Generator

In [None]:
def gen_test():
    random.seed(42)

    # Get train ids
    data = []
    for j in range(1, 61):
        for i in range(10):
            for k in range(50):
                if j > 54:
                    n = str(j)
                    n = n.rjust(2, "0")
                    n = "%d_%s_%d"%(i, n, k)
                    data.append(n)

    # Iterate through ids
    random.shuffle(data)
    c = 0
    for i, n in enumerate(data):
        usr = n.split("_")[1]
        f_name = SPEC_DIR + "/" + n + "_spectrogram.png"
        
        if os.path.exists(f_name):
            try:

                # Load image
                img = tf.io.read_file(f_name)
                img = tf.io.decode_png(img, channels=N_CHANNELS)

            except:
                continue

            # Convert to numpy
            img_n = img.numpy()

            # Get y
            f = f_name.split("/")[-1]
            y = f.split("_")[0]
        
            # c += 1
            # if c > 20:
            #     break

            yield img_n, y

In [None]:
gen_test()

## Datasets from Generators

In [None]:
train = tf.data.Dataset.from_generator(gen_train, 
                                       output_signature=(tf.TensorSpec(shape=(MAX_LEN, MAX_LEN, N_CHANNELS), dtype=tf.uint8), 
                                                         tf.TensorSpec(shape=(), dtype=tf.uint8)
                                                         )
                                       )
val = tf.data.Dataset.from_generator(gen_val, 
                                     output_signature=(tf.TensorSpec(shape=(MAX_LEN, MAX_LEN, N_CHANNELS), dtype=tf.uint8), 
                                                       tf.TensorSpec(shape=(), dtype=tf.uint8)
                                                       )
                                     )
test = tf.data.Dataset.from_generator(gen_test, 
                                      output_signature=(tf.TensorSpec(shape=(MAX_LEN, MAX_LEN, N_CHANNELS), dtype=tf.uint8), 
                                                        tf.TensorSpec(shape=(), dtype=tf.uint8)
                                                        )
                                      )

In [None]:
# Batch
train = train.batch(BATCH_SIZE, drop_remainder=True, num_parallel_calls = tf.data.AUTOTUNE)
val = val.batch(BATCH_SIZE, drop_remainder=True, num_parallel_calls = tf.data.AUTOTUNE)
test = test.batch(BATCH_SIZE, drop_remainder=True, num_parallel_calls = tf.data.AUTOTUNE)

# Model

## Class

In [None]:
class Trainer():

    def __init__(self, name):
        self.name = name

    def build_model(self, img_height, img_width, n_channels, n_classes):
        self.model = tf.keras.models.Sequential()
        self.model.add(tf.keras.layers.Input(shape=(img_height, img_width, n_channels)))
        self.model.add(tf.keras.layers.Conv2D(32, 3, strides=2, padding='same', activation='relu'))
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'))
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.Conv2D(128, 3, padding='same', activation='relu'))
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.Flatten())
        self.model.add(tf.keras.layers.Dense(256, activation='relu'))
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.Dropout(0.5))
        self.model.add(tf.keras.layers.Dense(n_classes, activation='softmax'))

        # Compile model
        self.model.compile(
            loss='sparse_categorical_crossentropy',
            optimizer=tf.keras.optimizers.RMSprop(),
            metrics=['accuracy'],
        )

    def train_model_dataset(self, train_dataset, val_dataset, epochs):
        self.history = self.model.fit(train_dataset, epochs=epochs, validation_data=val_dataset)

    def test_model_dataset(self, test_dataset, verbose=True):
        preds =  self.model.predict(test_dataset)
        preds_select = np.apply_along_axis(np.argmax, 1, preds)

        loss, acc = self.model.evaluate(test_dataset, verbose=2)
        
        true = []
        for e in test_dataset.as_numpy_iterator():
            for n in e[1]:
                true.append(n)

        scores = precision_recall_fscore_support(true, preds_select, average='macro')
        pre = scores[0]
        re = scores[1]
        f1 = scores[2]

        cm_labels = [x for x in range(10)]
        cm = confusion_matrix(true, preds_select, labels=cm_labels)

        if verbose:
            print("\nModel Results:")
            print("Loss: %f\nAccuracy: %f\nPrecision: %f\nRecall: %f\nF1: %f"%(loss, acc, pre, re, f1))
            print("\nConfusion Matrix:")
            disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=cm_labels)
            disp.plot()

        return loss, acc, pre, re, f1, preds, cm

    def plot_loss(self):
        # Plot the loss curves for training and validation.
        history_dict = self.history.history
        loss_values = history_dict['loss']
        val_loss_values = history_dict['val_loss']
        epochs = range(1, len(loss_values)+1)

        plt.figure(figsize=(8,6))
        plt.plot(epochs, loss_values, 'bo', label='Training loss')
        plt.plot(epochs, val_loss_values, 'b', label='Validation loss')
        plt.title('Training and validation loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.show()

    def plot_accuracy(self):
        # Plot the accuracy curves for training and validation.
        history_dict = self.history.history
        acc_values = history_dict['accuracy']
        val_acc_values = history_dict['val_accuracy']
        epochs = range(1, len(acc_values)+1)

        plt.figure(figsize=(8,6))
        plt.plot(epochs, acc_values, 'bo', label='Training accuracy')
        plt.plot(epochs, val_acc_values, 'b', label='Validation accuracy')
        plt.title('Training and validation accuracy')
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.show()

## Training

In [None]:
# Build
model = Trainer(DATA_TYPE)
model.build_model(MAX_LEN, MAX_LEN, N_CHANNELS, N_CLASSES)

In [None]:
# Train
model.train_model_dataset(train, val, EPOCHS)

In [None]:
# Plot training accuracy
model.plot_accuracy()

In [None]:
# Plot training loss
model.plot_loss()

In [None]:
# Get test scores
_ = model.test_model_dataset(test)

# Bottom