<a href="https://colab.research.google.com/github/Rafi-ur-Rashid/Audio-News-Classification/blob/main/DL_models_audio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import tensorflow as tf
import os
import librosa
import librosa.display
import IPython.display as ipd
import numpy as np
import sklearn
import matplotlib.pyplot as plt
import pickle
from os import path
from keras.callbacks import ModelCheckpoint

import tensorflow.keras as keras
from tensorflow.keras.utils import to_categorical

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
RANDOM_SEED=2245
from numpy.random import seed
seed(RANDOM_SEED)


In [None]:
gpu_devices = tf.config.experimental.list_physical_devices("GPU")
for device in gpu_devices:
    tf.config.experimental.set_memory_growth(device, True)

In [None]:
corpus_path=r"C:\\ML\\audio_news\\corpus\\"
preprocess_path=r"C:\\ML\\audio_news\\preprocessed-shuffled\\"
mfcc_path=preprocess_path+r"mfccs\\"
lables_path=preprocess_path+r"lables\\"
mel_path=preprocess_path+r"mel_specs\\"
weights_dir="C:\\ML\\audio_news\\weights\\"



In [None]:
#drive
mfcc_path="/content/drive/My Drive/Colab Notebooks/Audio classification/mfccs/"              
lables_path="/content/drive/My Drive/Colab Notebooks/Audio classification/lables/"  

In [None]:
X_train=pickle.load( open(mfcc_path+"train_13_2048_512.pkl",'rb'))
y_train=pickle.load( open(lables_path+"train.pkl",'rb'))
X_test=pickle.load( open(mfcc_path+"test_13_2048_512.pkl",'rb'))
y_test=pickle.load( open(lables_path+"test.pkl",'rb'))
X_validation=pickle.load( open(mfcc_path+"val_13_2048_512.pkl",'rb'))
y_validation=pickle.load( open(lables_path+"val.pkl",'rb'))

#2d CNN

In [None]:
X_train = X_train[..., np.newaxis]
X_validation = X_validation[..., np.newaxis]
X_test = X_test[..., np.newaxis]

input_shape = (X_train.shape[1], X_train.shape[2], 1)

model = keras.Sequential()
# 1st conv layer
model.add(keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
model.add(keras.layers.BatchNormalization())

# 2nd conv layer
model.add(keras.layers.Conv2D(32, (3, 3), activation='relu'))
model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
model.add(keras.layers.BatchNormalization())

# 3rd conv layer
model.add(keras.layers.Conv2D(32, (2, 2), activation='relu'))
model.add(keras.layers.MaxPooling2D((2, 2), strides=(2, 2), padding='same'))
model.add(keras.layers.BatchNormalization())

# flatten output and feed it into dense layer
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(64, activation='relu'))
model.add(keras.layers.Dropout(0.3))

model.add(keras.layers.Dense(5, activation='softmax'))

saved_model="2DCNN_mfcc_13_2048_512.weights.hdf5"

# LSTM

In [None]:

input_shape = (X_train.shape[1], X_train.shape[2])

# build network topology
model = keras.Sequential()

# 2 LSTM layers
model.add(keras.layers.LSTM(64, input_shape=input_shape, return_sequences=True))
model.add(keras.layers.LSTM(32))

# dense layer
model.add(keras.layers.Dense(32, activation='relu'))
model.add(keras.layers.Dropout(0.3))

# output layer
model.add(keras.layers.Dense(5, activation='softmax'))

saved_model="LSTM_mfcc_13_2048_512.weights.hdf5"

In [None]:
optimiser = keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimiser,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
checkpoint = ModelCheckpoint(weights_dir+saved_model, monitor='val_acc', verbose=1, save_best_only=True, mode='max')
callbacks_list = [checkpoint]
model.summary()

In [None]:

history = model.fit(X_train, y_train, validation_data=(X_validation, y_validation), batch_size=32, epochs=30, callbacks = callbacks_list, verbose=1)

In [None]:
history = model.fit(X_train, y_train, 
                    validation_split=0.1,
                    epochs=60,
                    verbose=1,
                    callbacks=[checkpoint])

In [None]:
model_loaded=tf.keras.models.load_model(weights_dir+"cnn_3_class_stft.weights.hdf5")
#model_loaded.evaluate(X_test,y_test)

In [None]:
model.save(model_dir+"cnn2D_stft.h5")

In [None]:
X_train_reshaped=np.reshape(X_train,newshape=(X_train.shape[0],X_train.shape[1],X_train.shape[2],1))

model = Sequential()

# model.In
model.add(Conv2D(filters = 128, kernel_size = (3,3), activation ='relu',input_shape=(X_train_reshaped.shape[1],X_train_reshaped.shape[2],X_train_reshaped.shape[3])))
model.add(Dropout(0.4))
model.add(MaxPooling2D(pool_size=(2,2)))

model.add(Conv2D(filters = 64, kernel_size = (3,3), activation ='relu'))
model.add(Dropout(0.3))
model.add(MaxPooling2D(pool_size=(2,2)))

model.add(Conv2D(filters = 32, kernel_size = (3,3), activation ='relu'))
model.add(Dropout(0.2))
model.add(MaxPooling2D(pool_size=(1,1)))

model.add(tf.keras.layers.GlobalAveragePooling2D(name='GlobalPool'))

model.add(Dense(16, activation = "relu")) #Fully connected layer
model.add(Dense(5, activation = "softmax")) #Classification layer or output layer

model.compile(optimizer="adam", loss='categorical_crossentropy', metrics=['acc'])

model.summary()

checkpoint = ModelCheckpoint(weights_dir+"cnn2D_5_class_stft.weights.hdf5", monitor='val_acc', verbose=1, save_best_only=True, mode='max')

In [None]:
history = model.fit(X_train_reshaped, y_train, 
                    validation_split=0.1,
                    epochs=60,
                    verbose=1,
                    callbacks=[checkpoint])

In [None]:
X_test_reshaped=np.reshape(X_test,newshape=(X_test.shape[0],X_test.shape[1],X_test.shape[2],1))
pickle.dump(X_test, open(test_dir+"X_test_stffs_cnn2D.pkl",'wb'))
pickle.dump(y_test, open(test_dir+"y_test_stffs_cnn2D.pkl",'wb'))


In [None]:
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
"""
model.fit_generator(generator=my_training_batch_generator,
                   steps_per_epoch = int(len(X_train_filenames) / batch_size),
                   epochs = 15,
                   verbose = 1,
                   validation_data = my_validation_batch_generator,
                   validation_steps = int(len(X_val_filenames) / batch_size))
"""

In [None]:
for i in range(500):
  try:  
    x=pickle.load(open(sampled_dir+"economics_"+str(i+1)+".pkl",'rb'))
    if len(x)<3307500 :
      x=np.append(x,np.zeros(3307500-len(x)))
      print(str(i+1)+" "+str(x.shape))
      pickle.dump(x,open(sampled_dir+"economics_"+str(i+1)+".pkl",'wb'))
  except:
    continue
