<a href="https://colab.research.google.com/github/abldvd/CI-Proyects/blob/main/MusicClassifier_ConvolutionalOptimization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pydub
!pip install keras_tuner

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1
Collecting keras_tuner
  Downloading keras_tuner-1.1.0-py3-none-any.whl (98 kB)
[K     |████████████████████████████████| 98 kB 3.2 MB/s 
Collecting kt-legacy
  Downloading kt_legacy-1.0.4-py3-none-any.whl (9.6 kB)
Installing collected packages: kt-legacy, keras-tuner
Successfully installed keras-tuner-1.1.0 kt-legacy-1.0.4


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
PATH = 'drive/MyDrive/Colab Notebooks/datasets/music_files/test'
MAX_AUDIO_LEN = 60
BIN_SIZE = 2**11
!ls drive/MyDrive/'Colab Notebooks'/datasets/music_files/test

Classical  Rock  Synthwave


In [None]:
import os
import math
import librosa
import numpy as np
import random as rd
from pydub import AudioSegment 
from pydub.utils import make_chunks
from scipy.io import wavfile
from tempfile import mktemp
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import EarlyStopping
import keras_tuner as kt

In [None]:
def mp32wav(mp3_path, len=None):
  mp3_audio = AudioSegment.from_file(mp3_path, format="mp3")  # read mp3
  if len and len > mp3_audio.duration_seconds:
      raise Exception('Fixed lenght greater than file lenght')
  wname = mktemp('.wav')  # use temporary file
  if len:
    random_midpoint = np.random.randint(int(len*1000/2), int(mp3_audio.duration_seconds*1000 - len*1000/2))
    mp3_audio = mp3_audio[
      random_midpoint - len*1000/2:  
      random_midpoint + len*1000/2] # crop and save to wav
  mp3_audio.export(wname, format="wav")  
  rate, audio = wavfile.read(wname)  # read as wav file
  os.remove(wname) # dont want leaks here
  return audio, rate

In [None]:
def getSpectrogram(mp3_path, bin_size, len): 
  audio, rate = mp32wav(mp3_path, len)  # get wave file  
  audio = np.mean(audio, axis=1)

  spectrum = librosa.stft(audio, n_fft=bin_size, hop_length=int(rate))
  return spectrum[:, :-1].reshape(spectrum[:, :-1].shape+(1,))

In [None]:
def loadSpectrumData(path, seed=1234, file_limit=200):
  # Loads data from a main folder, having the option to split into validation or training 
  first_file_shape = getSpectrogram(f'{path}/{os.listdir(path)[0]}/{os.listdir(f"{path}/{os.listdir(path)[0]}")[0]}', BIN_SIZE, MAX_AUDIO_LEN).shape
  num_files = int(sum([len(os.listdir(f'{path}/{class_folder}')[:file_limit]) for class_folder in os.listdir(path)]))
  num_classes = len(os.listdir(path))
                             # Inits   
  X = np.zeros((num_files,) + first_file_shape, dtype = 'complex_')  # Lets assume every spectrum will have the same shape
  y = np.zeros(num_files, dtype=str)

  rd.seed(seed)
  last_i = 0
  for class_folder in os.listdir(path): # Iterating over the classes
    file_list = os.listdir(f'{path}/{class_folder}')
    rd.shuffle(file_list)
    if file_limit:       # Appliying memory limits and randomizing
      file_list = file_list[:file_limit]
      rd.shuffle(file_list)

    for i, file_name in enumerate(file_list): 
      try:                 
                              # Iterating and loading spectrum
        X[last_i+i,] = getSpectrogram(f'{path}/{class_folder}/{file_name}', BIN_SIZE, MAX_AUDIO_LEN)

      except ValueError as v: # Our assumption was wrong, some files had a rounding error and had one less half a second
        missed_file = getSpectrogram(f'{path}/{class_folder}/{file_name}', BIN_SIZE, MAX_AUDIO_LEN) 
        X[last_i+i,] = np.c_[missed_file, np.ones(np.shape(missed_file)[0], dtype = 'complex_')] # Lets add something so we dont lose the sample
      y[last_i+i] = class_folder
    last_i += i+1
        
  lb = LabelEncoder() # Transforming y to categorical
  y = keras.utils.to_categorical(lb.fit_transform(y), num_classes=num_classes)
  return X,  y

In [None]:
# DATA --------------------------------------------------------------
num_classes = len(os.listdir(PATH))
input_shape = getSpectrogram(f'{PATH}/{os.listdir(PATH)[0]}/{os.listdir(f"{PATH}/{os.listdir(PATH)[0]}")[0]}', BIN_SIZE, MAX_AUDIO_LEN).shape
X, y = loadSpectrumData(PATH)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

In [None]:
def conv_model(hp):
  model = keras.Sequential()
  model.add(layers.BatchNormalization(input_shape=input_shape))
  for i in range(hp.Int('n_layers', 1, 3)): 
    model.add(layers.Conv2D(hp.Int(f'conv_{i}_units',
                                   min_value=32,
                                   max_value=128,
                                   step=32), 
                            kernel_size=(hp.Choice(f'kernel_i_{i}_size',
                                                 values=[3, 5]), 
                                         3),
                            activation='relu'
    ))
    model.add(layers.Dropout(hp.Float(f'dropout_{i}',
                                    min_value=0.2,
                                    max_value=0.5,
                                    step=0.1)
    ))
    model.add(layers.AveragePooling2D(pool_size=(2, 2)
    ))
  
  model.add(layers.Flatten())
  for i in range(hp.Int('n_connections', 1, 3)):
      model.add(layers.Dense(hp.Choice(f'n_nodes',
                                values=[64, 128, 256]),
                            activation='relu'))
      
  model.add(layers.Dense(num_classes, activation='softmax'))

  model.compile(loss=tf.keras.losses.categorical_crossentropy,
                optimizer=tf.keras.optimizers.Adam(1e-3),
                metrics=['accuracy'])
  
  return model
  

In [None]:
tuner = kt.Hyperband(conv_model,
                     objective='val_accuracy',
                     max_epochs=50,
                     factor=3)

es = EarlyStopping(monitor='val_accuracy', mode='max', verbose=1, patience=15, restore_best_weights=True)

tuner.search(X_train, y_train,
             batch_size=33,
             epochs=100,
             validation_data=(X_test, y_test),
             callbacks=[es],
             verbose=1,
             use_multiprocessing=True,
             workers=6)



Trial 40 Complete [00h 00m 41s]
val_accuracy: 0.7979797720909119

Best val_accuracy So Far: 0.8383838534355164
Total elapsed time: 00h 27m 47s

Search: Running Trial #41

Hyperparameter    |Value             |Best Value So Far 
n_layers          |1                 |1                 
conv_0_units      |96                |32                
kernel_i_0_size   |5                 |3                 
dropout_0         |0.4               |0.2               
n_connections     |2                 |1                 
n_nodes           |128               |64                
conv_1_units      |64                |128               
kernel_i_1_size   |5                 |3                 
dropout_1         |0.4               |0.4               
conv_2_units      |128               |64                
kernel_i_2_size   |3                 |5                 
dropout_2         |0.3               |0.5               
tuner/epochs      |6                 |6                 
tuner/initial_e...|2           

ResourceExhaustedError: ignored