<a href="https://colab.research.google.com/github/Ticopy/Machine_Learning_Exploration/blob/main/Urban_Sound_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Import the Data (.wav files) from google drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pwd

In [None]:
!unzip 'drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/train.zip'

Import modules

In [None]:
import pandas as pd
import numpy as np
import os
import librosa
import pickle
import glob
import IPython.display as ipd
%pylab inline

import warnings
warnings.filterwarnings('ignore')

Loading the dataset

In [None]:
df = pd.read_csv('/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/train_fuSp8nd.csv')
df.head()


In [None]:
#Count of datapoints in each class
df["Class"].value_counts()

In [None]:
ipd.Audio('Train/1.wav')

Data Preprocessing

Preprocessing strategy, 
for each file :
>1. load a file
>2. pad the signal (if necessary)
>3. extracting log spectrogram from signal
>4. normalise spectrogram
>5. save the normalised spectrogram

In [None]:
class Loader:
  #Loader is responsible for loading an audio file
  def __init__(self, sample_rate, duration, mono):
    self.sample_rate = sample_rate
    self.duration = duration
    self.mono = mono

  #need "librosa" module
  def load(self, file_path):
    signal = librosa.load(file_path,
                          sr=self.sample_rate,
                          duration=self.duration,
                          mono=self.mono)[0]
    return signal

class Padder:
  #Padder is responsible to apply padding to an array
  def __init__(self, mode="constant"):
    self.mode = mode

  #need "numpy" module
  def right_pad(self, array, num_missing_item):
    padded_array = np.pad(array,
                          (0,num_missing_item),
                          mode=self.mode)
    return padded_array

class LogSpectrogramExtractor:
  #LogSpectrogramExtractor extracts log spectrogram (in dB) from a time-serie signal
  def __init__(self, frame_size, hop_length):
    self.frame_size = frame_size
    self.hop_length = hop_length
  
  #need "librosa" and "numpy" modules
  def extract(self, signal):
    stft = librosa.stft(signal,
                        n_fft=self.frame_size,
                        hop_length=self.hop_length)[:-1]
    spectrogram = np.abs(stft) 
    log_spectrogram = librosa.amplitude_to_db(spectrogram)
    return log_spectrogram

class MinMaxNormaliser:
  #MinMaxNormaliser applies min max normalisation to an array
  def __init__(self, min_val, max_val):
    self.min = min_val
    self.max = max_val

  def normalise(self, array):
    norm_array = (array - array.min()) / (array.max() - array.min())
    norm_array = norm_array * (self.max - self.min) + self.min
    return norm_array

  def denormalise(self, norm_array, original_min, original_max):
    array = (norm_array - self.min) / (self.max - self.min)
    array = array * (original_max - original_min) + original_min
    return array

class Saver:
  #Saver is responsible to save feature, and the min max values
  def __init__(self, feature_save_dir, min_max_values_save_dir):
    self.feature_save_dir = feature_save_dir
    self.min_max_values_save_dir = min_max_values_save_dir

  def save_feature(self, feature, file_path):
    save_path = self._generate_save_path(file_path)
    np.save(save_path, feature)

  def save_min_max_values(self, min_max_values):
    save_path = os.path.join(self.min_max_values_save_dir, "min_max_value.pkl")
    self._save(min_max_values, save_path)

  #need "pickle" module
  @staticmethod
  def _save(data,save_path):
    with open(save_path, "wb") as f:
      pickle.dump(data, f)

  def _generate_save_path(self, file_path):
    file_name = os.path.split(file_path)[1]
    save_path = os.path.join(self.feature_save_dir, file_name + ".npy")
    return save_path


class PreprocessingPipeline:
  #PreprocessingPipeline process audio files in a directory
  #Need to store the min and max value for all the log spectrogram

  def __init__(self):
    
    self.padder = None
    self.extractor = None
    self.normaliser = None
    self.saver = None
    self.min_max_values = {}
    self._loader = None
    self._num_expected_samples = None

  @property
  def loader(self):
    return self._loader

  @loader.setter
  def loader(self, loader):
    self._loader = loader
    self._num_expected_samples = int(loader.sample_rate * loader.duration)


  def process(self, audio_file_dir):
    for subdir, dirs, files in os.walk(audio_file_dir):
        for file in files:
          file_path = os.path.join(subdir, file)
          if file_path.endswith(".wav"):
            self._process_file(file_path)
            print(f"Processed file {file_path}")
    self.saver.save_min_max_values(self.min_max_values)


  def _process_file(self, file_path):
    signal = self.loader.load(file_path)
    if self._is_padding_necessary(signal):
      signal = self._apply_padding(signal)
    feature = self.extractor.extract(signal)
    norm_feature = self.normaliser.normalise(feature)
    save_path = self.saver.save_feature(norm_feature, file_path)
    self.store_min_max_value(save_path, feature.min(), feature.max())

  def _is_padding_necessary(self, signal):
    
    if len(signal) < self._num_expected_samples:
      return True
    return False

  def _apply_padding(self, signal):
    num_missing_samples = self._num_expected_samples - len(signal)
    padded_signal = self.padder.right_pad(signal, num_missing_samples)
    return padded_signal
  
  def store_min_max_value(self, save_path, min_val, max_val):
    self.min_max_values[save_path] = {
        "min":min_val,
        "max":max_val
    }

In [None]:
FRAME_SIZE = 2048
HOP_LENGTH = 1024
DURATION = 4
SAMPLE_RATE = 22050
MONO = True

SPECTOGRAMS_SAVE_DIR = "/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/datasets/usc/spectrograms/"
MIN_MAX_VALUES_SAVE_DIR = "/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/datasets/usc/"
FILES_DIR = "/content/Train/"

In [None]:
#Instantiate all objects
loader = Loader(SAMPLE_RATE,DURATION,MONO)
padder = Padder()
log_spectrogram_extractor = LogSpectrogramExtractor(FRAME_SIZE, HOP_LENGTH)
min_max_normaliser = MinMaxNormaliser(0, 1)
saver = Saver(SPECTOGRAMS_SAVE_DIR, MIN_MAX_VALUES_SAVE_DIR)

preprocessing_pipeline = PreprocessingPipeline()
preprocessing_pipeline.loader = loader
preprocessing_pipeline.padder = padder
preprocessing_pipeline.extractor = log_spectrogram_extractor
preprocessing_pipeline.normaliser = min_max_normaliser
preprocessing_pipeline.saver = saver


In [None]:
preprocessing_pipeline.process(FILES_DIR)

Preparing the data set

In [None]:
#Spliting the dataset into labeled folder
import pandas as pd
import os
import shutil

DATASET_TRAIN_FILE_R = r"/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/train_fuSp8nd.csv"

labels = pd.read_csv(DATASET_TRAIN_FILE_R)

#Create 'train_sep' directory
DATASET_DIR_PATH_R = r"/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/datasets/usc/spectrograms/"
SEP_DIR_PATH_R = r"/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/datasets/usc/labeled_spectro/"

if not os.path.exists(SEP_DIR_PATH_R):
  os.mkdir(SEP_DIR_PATH_R)

for filename, class_name in labels.values:
  #Create subdirectory with "class_name"
  if not os.path.exists(SEP_DIR_PATH_R + str(class_name)):
    os.mkdir(SEP_DIR_PATH_R + str(class_name))

  src_path = DATASET_DIR_PATH_R + str(filename) + '.wav.npy'
  dst_path = SEP_DIR_PATH_R + str(class_name) + '/' + str(filename) + '.wav.npy'

  print('Copy file {} to {}'.format(src_path,dst_path))
  shutil.copy(src_path, dst_path)


In [None]:
#Spliting the dataset into labeled folder
import pandas as pd
import os
import shutil

DATASET_TRAIN_FILE_R = r"/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/train_fuSp8nd.csv"

labels = pd.read_csv(DATASET_TRAIN_FILE_R)

#Create 'train_sep' directory
DATASET_DIR_PATH_R = r"/content/Train/"
SEP_DIR_PATH_R = r"/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/datasets/usc/labeled_wav/"

if not os.path.exists(SEP_DIR_PATH_R):
  os.mkdir(SEP_DIR_PATH_R)

for filename, class_name in labels.values:
  #Create subdirectory with "class_name"
  if not os.path.exists(SEP_DIR_PATH_R + str(class_name)):
    os.mkdir(SEP_DIR_PATH_R + str(class_name))

  src_path = DATASET_DIR_PATH_R + str(filename) + '.wav'
  dst_path = SEP_DIR_PATH_R + str(class_name) + '/' + str(filename) + '.wav'

  print('Copy file {} to {}'.format(src_path,dst_path))
  shutil.copy(src_path, dst_path)


In [None]:
#Saving data into a JSON file

import json
import os
import math
import librosa

DATASET_PATH = "/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/datasets/usc/labeled_wav/"
JSON_PATH = "/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/datasets/usc/data_usc_spectro.json"
SAMPLE_RATE = 22050
TRACK_DURATION = 4 # measured in seconds
SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION


def save_spectro(dataset_path, json_path, n_fft=2048, hop_length=512, num_segments=1):
    """Extracts SpectroGrams from sound dataset and saves them into a json file along with class labels.
        :param dataset_path (str): Path to dataset
        :param json_path (str): Path to json file used to save MFCCs
        :param num_spectro (int): Number of coefficients to extract
        :param n_fft (int): Interval we consider to apply FFT. Measured in # of samples
        :param hop_length (int): Sliding window for FFT. Measured in # of samples
        :param: num_segments (int): Number of segments we want to divide sample tracks into
        :return:
        """

    # dictionary to store mapping, labels, and Spectrograms
    data = {
        "mapping": [],
        "labels": [],
        "spectro": []
    }

    samples_per_segment = int(SAMPLES_PER_TRACK / num_segments)
    num_spectro_vectors_per_segment = math.ceil(samples_per_segment / hop_length)

    # loop through all genre sub-folder
    for i, (dirpath, dirnames, filenames) in enumerate(os.walk(dataset_path)):

        # ensure we're processing a genre sub-folder level
        if dirpath is not dataset_path:

            # save genre label (i.e., sub-folder name) in the mapping
            semantic_label = dirpath.split("/")[-1]
            data["mapping"].append(semantic_label)
            print("\nProcessing: {}".format(semantic_label))

            # process all audio files in genre sub-dir
            for f in filenames:

		            # load audio file
                file_path = os.path.join(dirpath, f)
                signal, sample_rate = librosa.load(file_path, sr=SAMPLE_RATE)

                # process all segments of audio file
                for d in range(num_segments):

                    # calculate start and finish sample for current segment
                    start = samples_per_segment * d
                    finish = start + samples_per_segment

                    # extract spectro
                    spectro = librosa.feature.melspectrogram(signal[start:finish], sample_rate, n_fft=n_fft, hop_length=hop_length)
                    spectro = spectro.T

                    # store only mfcc feature with expected number of vectors
                    if len(spectro) == num_spectro_vectors_per_segment:
                        data["spectro"].append(spectro.tolist())
                        data["labels"].append(i-1)
                        print("{}, segment:{}".format(file_path, d+1))

    # save Spectros to json file
    with open(json_path, "w") as fp:
        json.dump(data, fp, indent=4)
        
        
if __name__ == "__main__":
    save_spectro(DATASET_PATH, JSON_PATH, num_segments=1)

13/03/2022
Implementing a neural network for sound classification into multiple categories

In [None]:
#Load data
#Split the data into train and test sets
#Build the network architecture
#Compile network
#Train network

import json
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow.keras as keras

DATASET_PATH = "/content/drive/MyDrive/Hackathlon/001_Urban_Sound_Classification/datasets/usc/data_usc_spectro.json"

def load_data(dataset_path):
  with open(dataset_path, "r") as fp:
    data = json.load(fp)
  
  # convert lists into numpy arrays
  inputs = np.arrays(data["spectro"])
  targets = np.array(data["labels"])

  return inputs, targets

if __name__ = "__main__":

  #Load data
  inputs, targets = load_data(DATASET_PATH)
  
  #Split the data into train and test sets
  inputs_train, inputs_test, targets_train, targets_test = train_test_split(inputs, targets, test_size=0.2)

  #Build the network architecture
  model = keras.Sequential([
                            # input layer
                            keras.layers.Flatten(input_shape=(inputs.shape[1], inputs.shape[2])),

                            # 1st hidden layer using Rectified Linear Unit (ReLU)
                            keras.layers.Dense(512, activation="relu"),

                            # 2nd hidden layer using Rectified Linear Unit (ReLU)
                            keras.layers.Dense(256, activation="relu"),

                            # 3rd hidden layer using Rectified Linear Unit (ReLU)
                            keras.layers.Dense(64, activation="relu"),
                            
                            # output layer
                            keras.layers.Dense(10,activation="softmax")
                        ])
  #Compile network
  optimizer = keras.optimizers.Adam(learning_rate=0.0001)
  model.compile(optimizer=optimizer, 
                loss="sparse_categorical_crossentropy",
                metrics=["accuracy"])
  
  model.summary()

  #Train network
  #Types of batching choosen : Mini-batch
  model.fit(inputs_train, targets_train, 
            validation_data=(inputs_test, targets_test),
            epochs=50,
            batch_size=32)
  