<a href="https://colab.research.google.com/github/ARBasharat/AudioClassification/blob/master/AudioClassification1D_wo_Augmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
import random
import librosa
from tensorflow.keras import datasets, layers, models
from sklearn.model_selection import train_test_split

In [None]:
from google.colab import drive
drive.mount("/content/drive")

train = np.load("drive/My Drive/AudioClassification/audio_train.npy").astype('float32')
test = np.load("drive/My Drive/AudioClassification/audio_test.npy").astype('float32')
train_labels_df = pd.read_csv("drive/My Drive/AudioClassification/labels_train.csv")

In [None]:
train_labels = train_labels_df.to_numpy()[:,1]
labels_categorical = keras.utils.to_categorical(train_labels)

X_train_original, X_val_original, y_train_original, y_val = train_test_split(train, 
                          labels_categorical, test_size=0.20, random_state=42)

print("Training Data:", X_train_original.shape)
print("Training Labels:", y_train_original.shape)
print("Validation Data:", X_val_original.shape)
print("Validation Labels:", y_val.shape)
print("Testing Data:", test.shape)

In [None]:
## We switch between white and normal noise randomly

def add_white_noise(data, rate = 0.05):
  data_with_white_noise = data + rate * np.random.randn(len(data))
  return data_with_white_noise

'''
Noise addition using normal distribution with mean = 0 and std =1
Permissible noise factor value = x > 0.004
'''
def add_normal_distributed_noise(data, rate = 0.09, sr = 30000):
  data_with_noramlized_noise = data + rate * np.random.normal(0, 1, len(data))
  return data_with_noramlized_noise

'''
Time Shifting
Permissible factor values = sr/10
'''
def add_time_shift(data, sr = 30000):
  data_with_time_shift = np.roll(data, int(sr/10))
  return data_with_time_shift

'''
Pitch shifting
Permissible factor values = -5 <= x <= 5
'''
def add_pitch_shift(data, sr = 30000, steps = -7):
  data_with_pitch_shift = librosa.effects.pitch_shift(data, sr, n_steps = steps)
  return data_with_pitch_shift

In [None]:
# Get augmented_training data equal to size of real data
def get_augmented_data_two_times(X_train_original, y_train_original):
  new_data = []
  new_labels = []
  for i in range(0, len(X_train_original)):
    data = X_train_original[i]
    label = y_train_original[i]
    choice = random.choice([1, 2, 3, 4]) ## Used for selecting between white noise and normalized noise
    ## get augmented data
    new_data.append(data)
    if choice == 1:
      new_data.append(add_white_noise(data))
    elif choice == 2:
      new_data.append(add_normal_distributed_noise(data))
    elif choice == 3:
      new_data.append(add_time_shift(data))
    elif choice == 4:
      new_data.append(add_pitch_shift(data))
    ## get labels
    new_labels.append(label)
    new_labels.append(label)

  ## Get augmented train data and labels
  train_data = np.array(new_data)
  y_train = np.array(new_labels)
  return train_data, y_train

train_data, y_train = get_augmented_data_two_times(X_train_original, y_train_original)
print(train_data.shape, y_train.shape)

In [None]:
# Get training data without augmentation
def get_data_2(X_train_original, y_train_original):
  new_data = []
  new_labels = []
  for i in range(0, len(X_train_original)):
    data = X_train_original[i]
    label = y_train_original[i]
    new_data.append(data)
    new_labels.append(label)
  train_data = np.array(new_data)
  y_train = np.array(new_labels)
  return train_data, y_train

#train_data, y_train = get_data_2(X_train_original, y_train_original)
#print(train_data.shape, y_train.shape)

In [None]:
train_data /= train_data.max()
X_val_original /= X_val_original.max()
test /= test.max()

In [None]:
X_train = train_data.reshape((train_data.shape[0], train_data.shape[1], 1))
X_val = X_val_original.reshape((X_val_original.shape[0], X_val_original.shape[1], 1))
X_test = test.reshape((test.shape[0], test.shape[1], 1))
print(X_train.shape, X_val.shape, X_test.shape, y_train.shape, y_val.shape)

In [None]:
## Base Model Architecture 
def getBaseModel():
  model = models.Sequential()
  model.add(layers.Conv1D(32, 3, activation='relu', input_shape=(30000, 1)))
  model.add(layers.MaxPooling1D(2))
  model.add(layers.Conv1D(64, 3, activation='relu'))
  model.add(layers.MaxPooling1D(2))
  model.add(layers.Conv1D(128, 3, activation='relu'))
  model.add(layers.Flatten())
  model.add(layers.Dense(128, activation='relu'))
  model.add(layers.Dense(64, activation='relu'))
  model.add(layers.Dense(10, activation='softmax'))
  model.summary()
  return model

def get_model_2():
  model = tf.keras.models.Sequential()
  model.add(layers.Conv1D(32, kernel_size=3, activation='relu', 
                    input_shape=(30000, 1)))
  model.add(layers.Dropout(0.25))
  model.add(layers.Conv1D(48, kernel_size=3, activation='relu'))
  model.add(layers.Conv1D(120, kernel_size=3, activation='relu'))
  model.add(layers.MaxPooling1D(pool_size=2))
  model.add(layers.Dropout(0.25))
  model.add(layers.Flatten())
  model.add(layers.Dense(128, activation='relu'))
  model.add(layers.Dropout(0.25))
  model.add(layers.Dense(64, activation='relu'))
  model.add(layers.Dropout(0.4))
  model.add(layers.Dense(10, activation='softmax'))
  return model


In [None]:
## Compile and Train Model with Early stopping
def compileModel(model, optimizer = 'sgd', epochs = 10):
  earlyStopping = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3)
  
  model.compile(optimizer=optimizer,
          loss=tf.keras.losses.categorical_crossentropy,
          metrics=['accuracy'])
  
  history = model.fit(X_train, y_train, epochs=epochs, verbose=2, batch_size=64,
          validation_data=(X_val, y_val), callbacks=earlyStopping)
  
  return history

## Plot Training History
def plotTrainingAccuracy(history):
  plt.figure()
  plt.plot(history.history['accuracy'], label='training_accuracy')
  plt.plot(history.history['val_accuracy'], label = 'validation_accuracy')
  plt.xlabel('Epoch')
  plt.ylabel('Accuracy')
  plt.legend(loc='upper left')
  plt.show()
  plt.close()

def plotTrainingLoss(history):
  plt.figure()
  plt.plot(history.history['loss'], label='training_loss')
  plt.plot(history.history['val_loss'], label = 'validation_loss')
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.legend(loc='upper left')
  plt.show()
  plt.close()

## Evaluate the model
def predictModel(model, test):
  return model.predict(test, verbose=0)
  

## Evaluate the model
def evaluateModel(model, X_test, y_test):
  test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
  print("\n ### Performance of Test Data ###")
  print("Test Accuracy: ", test_acc)
  print("Test Loss: ", test_loss, "\n")

In [None]:
model = get_model_2()
history = compileModel(model, optimizer = 'adam', epochs = 100)

In [None]:
plotTrainingAccuracy(history)
plotTrainingLoss(history)
evaluateModel(model, X_val, y_val)

In [None]:
predictions = predictModel(model, test_data)
preds = []
for p in predictions:
  preds.append(np.argmax(p))


In [None]:
pd.DataFrame(preds).to_csv("submission.csv")