In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#for using in google colab
!unzip "drive/MyDrive/train.zip" -d "/content/data/"
!unzip "drive/MyDrive/test.zip" -d "/content/data/"

In [None]:
import os 
import imageio
from IPython.display import display, Image
from sklearn.preprocessing import OneHotEncoder
import numpy as np 
import random
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import regularizers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
train_path = 'data/train/mfcc/'
test_path = 'data/test/mfcc/'

#ignore .ipynb_checkpoints
classes = [f for f in os.listdir(train_path) if not f.startswith('.')]
targets = ['yes','no','up','down','left','right','on','off','stop','go','_background_noise_','unknown']
# background noise is considered silence for predictions
prediction_classes = ['yes','no','up','down','left','right','on','off','stop','go','silence','unknown']

#128x87 with 1 channel
#expanded to the dimensions required 
image_shape = (299,299,3)
num_classes = len(targets)

In [None]:
def get_image_names():
  images = []
  for c in classes:
    if c in targets:
      label = targets.index(c)
    else:
      label = targets.index('unknown')
    class_path = train_path + c + '/'
    class_images = os.listdir(class_path)
    labeled = []
    for i in class_images:
      labeled.append([class_path + i,label])
    images.append(labeled)
  return images 

In [None]:
def split_names(image_list, num_batches=2):
  batches = [[] for _ in range(num_batches)]
  for i in image_list:
    num = len(i)
    random.shuffle(i)
    step = num // num_batches 
    #print(step)
    for k in range(0, num_batches):
      batches[k].append(i[ k * step : min( (k+1)*step, num) ])
  return batches 

In [None]:
def make_training_batch(image_names):
  train_x, train_y = [], []
  # training images are grouped by class
  for c in image_names:
    for i in c:
      #load the image as a np array
      x = np.array(imageio.imread(i[0]))
      # reshape to a uniform dimension for the model
      x = np.expand_dims(x,axis=0)
      x = np.resize(x,image_shape)
      train_x.append(x)
      train_y.append(i[1])
  # convert outer lists to np arrays
  train_x = np.array(train_x)
  train_y = np.array(train_y).reshape(-1,1)
  # transform the labels into one-hot vectors 
  onehot = OneHotEncoder()
  train_y = onehot.fit_transform(train_y).toarray()
  return (train_x, train_y)

In [None]:
def training_loop(num_times=1, num_batches=2, model=None):
  # build the model if needed
  if model is None:
    model = build_model()
  
  # callbacks used in training
  early_stop = EarlyStopping(monitor='val_accuracy',min_delta=.001, patience=5, restore_best_weights=False)
  checkpoint = ModelCheckpoint('best_model.h5',monitor='val_accuracy', save_best_only=True,mode='max')

  # get filenames  of training images, labeled with their classes
  name_list = get_image_names()
  for e in range(num_times):
    print("Loop {}".format(e))
    # divide training data into smaller chunks
    # prevents RAM issues with large models and may help with overfitting
    name_batches = split_names(name_list, num_batches)

    for k in range(num_batches):
      print("Batch {}".format(k))
      train_x, train_y = make_training_batch(name_batches[k])
      shuffler = np.random.permutation(len(train_x))
      train_x = train_x[shuffler]
      train_y = train_y[shuffler]
      model.fit(train_x,train_y,batch_size=50,epochs=20, callbacks=[early_stop,checkpoint], validation_split = 0.15)

In [None]:
def build_model():
  model = keras.applications.InceptionResNetV2(weights=None,classes=num_classes)
  opt = keras.optimizers.Adam(learning_rate=0.01)
  model.compile(optimizer=opt,loss="categorical_crossentropy",metrics=['accuracy'])
  return model

In [None]:
model = build_model()
#model = keras.models.load_model("drive/MyDrive/best_spectrogram.h5")
training_loop(1,3,model)

In [None]:
def read_test_images(num=1000,image_names=[]):
  images = []
  names = []
  for k in range(min(num,len(image_names))):
    if (image_names[k][0] == "."):
      continue
    if (not os.path.exists(test_path + image_names[k])):
      continue 
    names.append(image_names[k])
    x = np.array(imageio.imread(test_path + image_names[k]))
    x = np.expand_dims(x,axis=0)
    x = np.resize(x,image_shape)
    images.append(x)
  return (names,np.array(images))

In [None]:
def predict_labels(model, test_x):
  predictions = model.predict(test_x)
  labels = []
  for p in predictions:
    i = np.argmax(p)
    labels.append(prediction_classes[i])
  return labels

In [None]:
def evaluate_model(model):
  test_images = os.listdir(test_path)
  test_num = len(test_images)
  batch_size = 2000
  pred_file = open("predictions.csv","w")
  pred_file.write("fname,label\n")
  for n in range(0,test_num,batch_size):
    image_names, image_batch = read_test_images(batch_size,test_images)
    print(n)
    label_batch = predict_labels(model,image_batch)
    for k in range(min(batch_size,len(image_batch))):
      label = label_batch[k]
      im = image_names[k].replace("png","wav")
      pred_file.write("{},{}\n".format(im,label))
    
    test_images = test_images[len(image_batch):]

  pred_file.close()

In [None]:
evaluate_model(model)