In [1]:
%config IPCompleter.greedy=True

import os
import tifffile
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix,accuracy_score
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D, BatchNormalization
from tensorflow.python.keras import utils
import json
import tensorflow as tf
from tensorflow.keras import layers
from keras.models import Model
from keras.layers import Input
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers.merge import concatenate
from keras.utils import plot_model

with open("../Genre_Track_Id_Dict.json",'r') as j:
    id_genre_dict = json.load(j)
numerical_labels = dict(zip(list(id_genre_dict.keys()),np.arange(0,8)))


In [2]:
data = np.zeros((56000,128,647))
labels = np.zeros((56000,len(numerical_labels)))

ct=-1
for genre in os.listdir("../mp3_files"):
    genre_dir = os.path.join("../mp3_files",genre)
    for fname in [f for f in os.listdir(genre_dir) if ".tiff" in f]:
        mel_db_path = os.path.join(genre_dir,fname)
        try:

            spect = tifffile.imread(mel_db_path)
            if spect.shape[1] == 646:
                spect = np.hstack((spect,np.zeros((128,1))))
            if spect.shape[1] == 647:
                ct+=1
                data[ct,:,:] = spect
                genre_encode = numerical_labels[genre] 
                labels[ct][genre_encode] = 1
        except:
            idk = 2
data = data[0:ct,:,:]
labels = labels[0:ct,:]

data = data/-80

In [3]:
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.30, shuffle=True)
X_train = X_train.reshape(X_train.shape[0], 128, 647, 1)
X_test = X_test.reshape(X_test.shape[0], 128, 647, 1)

X_test, X_val, y_test, y_val = train_test_split(X_test, y_test, test_size=0.20, shuffle=True)

print("Training Shape: {} ... {}".format(X_train.shape,y_train.shape))
print("Testing Shape: {} ... {}".format(X_test.shape,y_test.shape))
print("Validation Shape: {} ... {}".format(X_val.shape,y_val.shape))

Training Shape: (39169, 128, 647, 1) ... (39169, 8)
Testing Shape: (13430, 128, 647, 1) ... (13430, 8)
Validation Shape: (3358, 128, 647, 1) ... (3358, 8)


In [176]:
paddings = tf.constant([[0, 0],[1,1],[1,1],[0,0]])
visible = Input(shape=(128,647,1))

#CNN Block
padded_input = tf.pad(visible,paddings,"CONSTANT")

cnn_conv_1 = BatchNormalization(axis=-1)(Conv2D(16,(3,3),activation='relu',input_shape=(130,649,1))(padded_input))
cnn_mp_1 = layers.MaxPooling2D(pool_size=(2,2),strides=(2,2))(cnn_conv_1)

cnn_padded_2 = tf.pad(cnn_mp_1,paddings,"CONSTANT")
cnn_conv_2 = BatchNormalization(axis=-1)(Conv2D(32,(3,3),activation='relu')(cnn_padded_2))
cnn_mp_2 = layers.MaxPooling2D(pool_size=(2,2),strides=(2,2))(cnn_conv_2)

cnn_padded_3 = tf.pad(cnn_mp_2,paddings,"CONSTANT")
cnn_conv_3 = BatchNormalization(axis=-1)(Conv2D(64,(3,3),activation='relu')(cnn_padded_3))
cnn_mp_3 = layers.MaxPooling2D(pool_size=(2,2),strides=(2,2))(cnn_conv_3)

cnn_padded_4 = tf.pad(cnn_mp_3,paddings,"CONSTANT")
cnn_conv_4 = BatchNormalization(axis=-1)(Conv2D(128,(3,3),activation='relu')(cnn_padded_4))
cnn_mp_4 = layers.MaxPooling2D(pool_size=(4,4),strides=(4,4))(cnn_conv_4)

cnn_padded_5 = tf.pad(cnn_mp_4,paddings,"CONSTANT")
cnn_conv_5 = BatchNormalization(axis=-1)(Conv2D(64,(3,3),activation='relu')(cnn_padded_5))
cnn_mp_5 = layers.MaxPooling2D(pool_size=(4,4),strides=(4,4))(cnn_conv_5)

cnn_out = layers.Flatten()(cnn_mp_5)

# #Rnn Block 
rnn_mp = layers.MaxPooling2D(pool_size=(1,2),strides=(1,2))(visible)
lstm_unit = layers.LSTM(1,return_sequences=False,return_state=False)
rnn_lstm = layers.Bidirectional(lstm_unit)(tf.reshape(rnn_mp,rnn_mp.shape[1:]))
rnn_out = layers.Flatten()(rnn_x)

# # rnn_x = layers.Embedding(input_dim = 27520, output_dim = 16384)(rnn_x)
# #missing embedding
# rnn_x = tf.reshape(rnn_x,rnn_x.shape[1:])
# rnn_x = layers.Bidirectional(gru)(rnn_x)
# rnn_out = tf.reshape(rnn_x,(256,))#layers.Flatten()(rnn_x)

# # combined_tensor = tf.concat([rnn_out,cnn_out],axis=0)
# # combined_tensor = tf.reshape(combined_tensor,(512,1))

out = layers.Dense(8, activation="softmax")(cnn_out)

model = Model(inputs=visible, outputs=out)

model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])


In [177]:
# ##### Think about this ConvLSTM2D


# BATCH_SIZE = 32


# visible = Input(shape=(128,647,1))
# rnn_mp = layers.MaxPooling2D(pool_size=(1,2),strides=(1,2))(visible)
# lstm_unit = layers.LSTM(128, return_sequences=False,return_state=False)
# rnn_mp_reshape = tf.reshape(rnn_mp,BATCH_SIZE+rnn_mp.shape[1:-1])
# rnn_lstm = lstm_unit(rnn_mp_reshape)


# out = layers.Dense(8, activation="softmax")(rnn_lstm)
# model = Model(inputs=visible, outputs=out)

# model.compile(loss='categorical_crossentropy',
#                   optimizer='adam',
#                   metrics=['accuracy'])

# model.summary()


In [None]:
checkpoint_filepath = "Network_Results/Rnn_Cnn_Parallel_Checkpoint"

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=False,
    monitor='accuracy',
    save_best_only=False)

history = model.fit(X_train,
                        y_train, 
                        batch_size=32,
                        validation_data=(X_test, y_test),
                        epochs=30,
                        callbacks = model_checkpoint_callback)

Epoch 1/30
INFO:tensorflow:Assets written to: Network_Results/Rnn_Cnn_Parallel_Checkpoint/assets
Epoch 2/30
INFO:tensorflow:Assets written to: Network_Results/Rnn_Cnn_Parallel_Checkpoint/assets
Epoch 3/30
INFO:tensorflow:Assets written to: Network_Results/Rnn_Cnn_Parallel_Checkpoint/assets
Epoch 4/30
INFO:tensorflow:Assets written to: Network_Results/Rnn_Cnn_Parallel_Checkpoint/assets
Epoch 5/30
INFO:tensorflow:Assets written to: Network_Results/Rnn_Cnn_Parallel_Checkpoint/assets
Epoch 6/30