## Prepare env

In [None]:
from google.colab import drive
drive.mount("/content/gdrive", force_remount=True)

In [None]:
%cd /content/
! rm -rf audio_classification
! mkdir audio_classification
%cd audio_classification

In [None]:
! git clone https://github.com/Vitaljaz/tmp.git

## Import libraries

In [None]:
import os, shutil
import numpy as np
import pandas as pd
import librosa
import librosa.display
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import IPython.display as ipd

## Parameters

In [None]:
# sampling rate
s_rate = 8820
n_fft = 1024
hop_length = 128
n_mels = 128

# define directories
base_dir = '/content/audio_classification/tmp/custom_plastic_gun_shot/plastic_bag_gun_shot'
meta_file = os.path.join(base_dir, 'meta/meta.csv')
audio_dir = os.path.join(base_dir, 'audio/')

# To show more rows and columns without "..."
pd.options.display.max_columns=999
pd.options.display.max_rows=999

## Read CSV file

In [None]:
# load metadata
meta_data = pd.read_csv(meta_file, delimiter=',', skiprows=0, header=0)
print(meta_data.shape)
display(meta_data.head())

## Definitions of functions

In [None]:
# load a wave data
def load_wave_data(audio_dir, file_name):
    file_path = os.path.join(audio_dir, file_name)
    x, fs = librosa.load(file_path, sr=s_rate)
    return x,fs


# change wave data to mel-stft
def calculate_melsp(x, sr, n_fft=1024, hop_length=128, n_mels=128):
    stft = np.abs(librosa.stft(x, n_fft=n_fft, hop_length=hop_length))**2
    melsp = librosa.feature.melspectrogram(S=stft, sr=sr, n_mels=n_mels)
    log_melsp = librosa.power_to_db(melsp)
    #print(log_melsp[:3])  # debug
    return log_melsp


# display wave in plots
def show_wave(x):
    plt.plot(x)
    plt.show()


# display wave in heatmap
def show_melsp(melsp, fs):
    librosa.display.specshow(melsp, sr=fs)
    plt.colorbar()
    plt.show()

## Load Data

In [None]:
# example data
x, fs = load_wave_data(audio_dir, meta_data.loc[0,"file"])

# 5 sec default
tmp = np.zeros(128 * int(np.ceil(3 * s_rate / n_mels)))
for i in range(len(x)):
  tmp[i] = x[i]
melsp_tmp = calculate_melsp(tmp, s_rate, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
show_wave(tmp)
show_melsp(melsp_tmp, fs)

melsp = calculate_melsp(x, s_rate, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)

np_data = np.zeros(128 * 345).reshape(128, 345)
for m in range(melsp.shape[0]):
    for n in range(melsp.shape[1]):
      np_data[m][n] = melsp[m][n]
show_wave(np_data)
show_melsp(np_data, fs)

print('wave size:', x.shape)
print('melsp size:', melsp.shape)
print('sampling rate:', fs)
show_wave(x)
show_melsp(melsp, fs)

In [None]:
ipd.Audio(x, rate=fs)

## Split training and test dataset

In [None]:
# get training dataset and target dataset
filenames = meta_data.loc[:,"file"]
targets = meta_data.loc[:, "class_id"]

f_train, f_test, t_train, t_test = train_test_split(filenames, targets, test_size=0.15, stratify=targets)
f_train = f_train.reset_index(drop=True)
t_train = t_train.reset_index(drop=True)
f_test = f_test.reset_index(drop=True)
t_test = t_test.reset_index(drop=True)

print('Number of data:')
print('f_train:', f_train.shape[0])
print('t_train:', t_train.shape[0])
print('f_test:', f_test.shape[0])
print('t_test:', t_test.shape[0])

## Transform wav data to mel-stft array

In [None]:
sec = 5
freq = n_mels
time = int(np.ceil(sec * s_rate / freq))
print(sec, freq, time)

In [None]:
# save wave data in npz
def save_np_data(filename, x, y):
    np_data = np.zeros(freq*time*len(x)).reshape(len(x), freq, time)
    np_targets = np.zeros(len(y))
    for i in range(len(y)):
        
        _x, fs = load_wave_data(audio_dir, x[i])
        
        tmp = np.zeros(freq * time)
        min_values = min(len(_x), len(tmp))
        for j in range(min_values):
          tmp[j] = _x[j]
        
        _x = calculate_melsp(tmp, s_rate, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
        # for m in range(_x.shape[0]):
        #   for n in range(_x.shape[1]):
        #     np_data[i][m][n] = _x[m][n]
        np_data[i] = _x[0:freq, 0:time]
        np_targets[i] = y[i]
    np.savez(filename, x=np_data, y=np_targets)  

In [None]:
n_classes = 2
# save test dataset
test_npz = 'custom{}_melsp_test_sr{}.npz'.format(n_classes, s_rate)
save_np_data(test_npz, f_test, t_test)

In [None]:
# save raw training dataset
train_npz= 'custom{}_melsp_train_sr{}.npz'.format(n_classes, s_rate)
save_np_data(train_npz, f_train, t_train)

## Audio classification with CNN

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import to_categorical

## Parameters for CNN

In [None]:
train_num = t_train.shape[0]
test_num = t_test.shape[0]

## Load dataset

In [None]:
# load training dataset
data = np.load(train_npz)
x_train = data["x"]
y_train = data["y"]

In [None]:
# load test dataset
test_data = np.load(test_npz)
x_test = test_data["x"]
y_test = test_data["y"]
#print(y_test[:5]) # debug

In [None]:
# redefine target data into one hot vector
y_train = to_categorical(y_train, n_classes)
y_test = to_categorical(y_test, n_classes)
#print(y_test[:5])  # debug

In [None]:
# reshape training dataset
x_train = x_train.reshape(train_num, freq, time, 1)
x_test = x_test.reshape(test_num, freq, time, 1)

In [None]:
print('x_train:', x_train.shape)
print('y_train:', y_train.shape)
print('x_test:', x_test.shape)
print('y_test:', y_test.shape)

## Define a CNN

In [None]:
from keras import layers, models
input_layer = layers.Input(shape=x_train.shape[1:])
out = layers.Conv2D(32, (1,8), strides=(1,2), activation='relu', padding='same')(input_layer)
out = layers.MaxPooling2D(pool_size=(1,4))(out)
out = layers.Dropout(0.25)(out)
out = layers.Conv2D(32, (8,1), strides=(2,1), activation='relu', padding='same')(out)
out = layers.MaxPooling2D(pool_size=(4, 1))(out)
out = layers.Dropout(0.25)(out)
out = layers.Conv2D(64, (1,8), strides=(1,2), activation='relu', padding='same')(out)
out = layers.MaxPooling2D(pool_size=(1,4))(out)
out = layers.Dropout(0.25)(out)
out = layers.Conv2D(64, (8,1), strides=(2,1), activation='relu', padding='same')(out)
out = layers.MaxPooling2D(pool_size=(4, 1))(out)
out = layers.Dropout(0.25)(out)
out = layers.Flatten()(out)
out = layers.Dense(64, activation='relu')(out)
out = layers.Dropout(0.25)(out)
out = layers.Dense(n_classes, activation='softmax')(out)

model = models.Model(inputs=input_layer, outputs=out)
model.summary()

## Optimization and callbacks

In [None]:
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

from tensorflow.keras.callbacks import ModelCheckpoint
model_checkpoint_callback = ModelCheckpoint(
    filepath="/content/audio_classification/model-best.h5",
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True,
    verbose=1)


## Exec training

In [None]:
%%time
# train model
batch_size = 16
#epochs = 1000
n_epochs = 100
val_split = 0.2

fit_log = model.fit(x_train, y_train,
                    validation_split=val_split,
                    epochs=n_epochs, batch_size=batch_size,
                    verbose=1, callbacks=[model_checkpoint_callback])

model.load_weights("/content/audio_classification/model-best.h5")

## Model evaluation

In [None]:
score = model.evaluate(x_test, y_test, verbose=0)
print('loss=', score[0])
print('accuracy=', score[1])

In [None]:
classes = ["plastic_bag_pop", "gun_shot"]
total_results = {}
for i in range(40):
  test_pred = model.predict(x_test)
  true = []
  pred = []
  for i in range(y_test.shape[0]):
    pred_idx = test_pred[i].argmax()
    pred.append(classes[pred_idx])
    true_idx = y_test[i].argmax()
    true.append(classes[true_idx])

  true = np.array(true)
  pred = np.array(pred)
  display(pd.crosstab(true, pred))

  total = 0
  true_predict = 0
  false_predict = 0

  # for i in range(len(classes)):
  #   print(f"i: {i} = {classes[i]}")

  # print('Wrong prediction (file, true, pred):')
  for i in range(y_test.shape[0]):
      if pred[i] != true[i]:
          # print("WRONG:", f_test[i], true[i], pred[i], "i = ", i)
          false_predict += 1
          if f_test[i] in total_results:
            total_results[f_test[i]] -= 1
          else:
            total_results[f_test[i]] = 1
      else:
          # print("CORRECT:", f_test[i], true[i], pred[i], "i = ", i)
          true_predict += 1
          if f_test[i] in total_results:
            total_results[f_test[i]] += 1
          else:
            total_results[f_test[i]] = 1
    
      total+= 1

  print(f"Total: {total}\nCorrect: {true_predict}\nWrong: {false_predict}")

sorted_results = sorted(total_results)
for i in range(len(sorted_results)):
  print(f"{sorted_results[i]} - {total_results[sorted_results[i]]}")
print(len(sorted_results))

## Save model and covert to ONNX

In [None]:
model.save("/content/audio_classification/model")
model.save_weights("/content/audio_classification/model.h5")

In [None]:
! pip install -U tf2onnx

In [None]:
! python -m tf2onnx.convert --saved-model /content/audio_classification/model/ --output /content/audio_classification/model.onnx

In [None]:
! zip -r /content/audio_classification/model.zip /content/audio_classification/model/