# Sound classification

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import librosa
import librosa.display
from tqdm import tqdm
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import tensorflow.keras.models as models
import tensorflow.keras.layers as layers
import IPython.display as ipd

In [None]:
%matplotlib inline
%load_ext tensorboard

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

## Data Loading and preprocessing

In [None]:
CSV_FILE_PATH = "data/esc50.csv"  # path of csv file
DATA_PATH = "data/audio/44100/" # path to folder containing audio files

In [None]:
#reading the csv file
df = pd.read_csv(CSV_FILE_PATH)
df

In [None]:
class_selection = [
    "thunderstorm",
    "rain",
    "sea_weaves",
    "dog",
    "cat",
    "chirping_birds",    
    "breathing",
    "keyboard_typing",
    "coughing",
    "drinking_sipping",
    "car_horn"
]

In [None]:
df_sel = df[df["category"].isin(class_selection)]
classes = df_sel['category'].unique()

In [None]:
class_dict = {i:x for x,i in enumerate(classes)}
class_dict

In [None]:
df_sel.loc[:, 'target'] = df_sel['category'].map(class_dict)

### Feature engineering and data visualization

In [None]:
# select a sample 
sample_df = df_sel.drop_duplicates(subset=['target'])
sample_df

In [None]:
# computing mel spectrograms

signals = {}
mel_spectrograms = {}
mfccs = {}

for i, row in tqdm(sample_df.iterrows()):  # every row will be like [[index], [filename , target , category]]
    signal , rate = librosa.load(DATA_PATH+ row["filename"])
    signals[row["category"]] = signal    # fill signal for each category. eg. signal["dog"] = signal of dog sound
    
    mel_spec = librosa.feature.melspectrogram(y=signal , sr=rate ,  n_fft=2048, hop_length=512)
    mel_spec = librosa.power_to_db(mel_spec, ref=np.max)  #visualizing mel_spectrogram directly gives black image. So, coverting from power_to_db is required
    mel_spectrograms[row["category"]] = mel_spec
    
    mfcc = librosa.feature.mfcc(y=signal , sr=rate , n_mfcc=13, dct_type=3)
    mfccs[row["category"]] = mfcc

In [None]:
sample_df.iloc[0]["filename"]

In [None]:
# Reproduce some audio samples
print(sample_df.iloc[0]["category"])
ipd.Audio(DATA_PATH + sample_df.iloc[0]["filename"])
print(sample_df.iloc[1]["category"])
ipd.Audio(DATA_PATH + sample_df.iloc[1]["filename"])
print(sample_df.iloc[2]["category"])
ipd.Audio(DATA_PATH + sample_df.iloc[2]["filename"])

In [None]:
import seaborn as sns

def plot_signal_seaborn(signal):
    """
    this function will take the signal dictionary and plot the signals using seaborn
    """
    sns.set(style='whitegrid')
    fig , axes = plt.subplots(nrows=2 , ncols=5 , sharex =False ,sharey=True,figsize=(20,5))
    fig.suptitle('Time series',size=15)
    i=0
    for x in range(2):
        for y in range(5):
            axes[x,y].set_title(list(signals.keys())[i])
            sns.lineplot(data=list(signals.values())[i], ax=axes[x,y])
            axes[x,y].get_xaxis().set_visible(False)
            axes[x,y].get_yaxis().set_visible(False)
            i +=1

In [None]:
plot_signal_seaborn(signals)

In [None]:
def dis_feature(mfccs, cmap=None):
    """
    this function will take the mfcc/mel_spectrogram dictionary and plot the signals
    """
    fig ,axes= plt.subplots(nrows=2 , ncols=5 , sharex=False, sharey=True , figsize=(40,10))
    fig.suptitle('mel')
    i=0
    for x in range(2):
        for y in range(5):
            axes[x,y].set_title(list(mfccs.keys())[i])
            axes[x,y].imshow(list(mfccs.values())[i], cmap=cmap,interpolation='nearest')
            # axes[x,y].get_xaxis().set_visible(False)
            # axes[x,y].get_yaxis().set_visible(False)
            i+=1

In [None]:
dis_feature(mel_spectrograms)

### Audio data augmentation 

In [None]:
df_sel.head()

In [None]:
df_train, df_val = train_test_split(df_sel, test_size=0.2, random_state=2023)

In [None]:
def add_noise(data, scale=0.05):
    noise = np.random.normal(0, scale, len(data))
    audio_noisy = data + noise
    return audio_noisy
    
def pitch_shifting(data, sr=16000):
    sr  = sr
    bins_per_octave = 12
    pitch_pm = 2
    pitch_change =  pitch_pm * 2*(np.random.uniform())   
    data = librosa.effects.pitch_shift(data.astype('float64'),  sr=sr, n_steps=pitch_change, 
                                          bins_per_octave=bins_per_octave)
    return data

def random_shift(data):
    timeshift_fac = 0.2 *2*(np.random.uniform()-0.5)  # up to 20% of length
    start = int(data.shape[0] * timeshift_fac)
    if (start > 0):
        data = np.pad(data,(start,0),mode='constant')[0:data.shape[0]]
    else:
        data = np.pad(data,(0,-start),mode='constant')[0:data.shape[0]]
    return data

def volume_scaling(data):
    dyn_change = np.random.uniform(low=1.5,high=2.5)
    data = data * dyn_change
    return data
    
def time_stretching(data, rate=1.5):
    input_length = len(data)
    streching = data.copy()
    streching = librosa.effects.time_stretch(streching, rate=rate)
    
    if len(streching) > input_length:
        streching = streching[:input_length]
    else:
        streching = np.pad(streching, (0, max(0, input_length - len(streching))), "constant")
    return streching

def audio_augmentation(file, aug):
    directory = 'ESC-50-augmented-data/'
    if not os.path.exists(directory):
        os.makedirs(directory)
    aug = np.array(aug,dtype='float32').reshape(-1,1)
    sf.write(directory+'/'+ file, aug, 16000, 'PCM_24')

In [None]:
import IPython.display as ipd

row = df_sel.iloc[10]

file_name = row["filename"]
print(row["category"])
signal , sr = librosa.load(DATA_PATH+file_name)
print("original")
ipd.Audio(signal, rate=sr)

noised = add_noise(signal, 0.005)
print("noised")
ipd.Audio(noised, rate=sr) 

shifted = pitch_shifting(signal)

print("pitch shifted")
ipd.Audio(shifted, rate=sr)

print("random shifted")
r_shifted = random_shift(signal)
ipd.Audio(r_shifted, rate=sr)

print("volume scaled")
vol_scaled = volume_scaling(signal)
ipd.Audio(vol_scaled, rate=sr)

print("time stretching")
time_stretched = time_stretching(signal)
len(time_stretched)
ipd.Audio(time_stretched, rate=sr)



In [None]:
def augment_df(df):

    totals = []
    
    for i, row in df.iterrows():
        df_temp = pd.DataFrame()
        signal , sr = librosa.load(DATA_PATH+row["filename"])
        aug_signals = {
            "original": signal,
            "noised": add_noise(signal, 0.005),
            "pitch_shift": pitch_shifting(signal),
            "random_shifted": random_shift(signal),
            "vol_scaled": volume_scaling(signal),
            "time_stretched": time_stretching(signal)
        }

        df_temp = df_temp._append([row]*len(aug_signals),ignore_index=True)

        # signal_arrays = []
        # signal_types = []
        # for i, (key, val) in enumerate(aug_signals.items()):
        #     signal_arrays.append(val)
        #     signal_types.append(key)

        df_temp["signal"] = aug_signals.values()
        df_temp["type"] = aug_signals.keys()
        
        totals.append(df_temp)
            
    return pd.concat(totals)

def load_signals(df):
    df["signal"] = df["filename"].apply(lambda x: librosa.load(DATA_PATH+x)[0])
    return df

In [None]:
df_train_aug = augment_df(df_train)

In [None]:
df_val = load_signals(df_val)

In [None]:
df_train_aug.groupby("fold").count()

In [None]:
# # for each audio sample, create three additional random samples to augment data
# # each audio sampel has typically 5 seconds. 

# # TODO: experiment with different lengths of the random samples or no random samples at all and see the performance. 

# X , y = [] , []
# for i, data in tqdm(df_sel.iloc[0:2].iterrows()):
#   print(data["filename"])
#   sig , sr = librosa.load(DATA_PATH+data["filename"])
#   for i in range(3):
#     n = np.random.randint(0, len(sig)-(sr*2)) # chose a random number between 0 and about 3/5 of the signal length or 3 seconds as signals are 5 seconds long.
#     sig_ = sig[n : int(n+(sr*2))] # take a 2 seconds long chunk of the signal starting from the n random position. 
#     mfcc_ = librosa.feature.mfcc(y=sig_ , sr=sr, n_mfcc=13)
#     X.append(mfcc_)
#     y.append(data["target"])

# # convert list to numpy array
# X = np.array(X) 
# y = np.array(y)

# #one-hot encoding the target
# y = tf.keras.utils.to_categorical(y , num_classes=10)

# # our tensorflow model takes input as (no_of_sample , height , width , channel).
# # here X has dimension (no_of_sample , height , width).
# # So, the below code will reshape it to (no_of_sample , height , width , 1).
# X = X.reshape(X.shape[0], X.shape[1], X.shape[2], 1)

In [None]:
def df_to_tf(df):
    sr = 22050
    X , y = [] , []
    for i, data in tqdm(df.iterrows()):
        mfcc_ = librosa.feature.mfcc(y=data["signal"], sr=sr, n_mfcc=13)
        X.append(mfcc_)
        y.append(data["target"])

    # convert list to numpy array
    X = np.array(X) 
    y = np.array(y)

    #one-hot encoding the target
    y = tf.keras.utils.to_categorical(y , num_classes=10)

    # our tensorflow model takes input as (no_of_sample , height , width , channel).
    # here X has dimension (no_of_sample , height , width).
    # So, the below code will reshape it to (no_of_sample , height , width , 1).
    X = X.reshape(X.shape[0], X.shape[1], X.shape[2], 1)
    return X, y

In [None]:

X_train, y_train = df_to_tf(df_train_aug)
X_val, y_val = df_to_tf(df_val)

In [None]:
X_train.shape
y_train.shape

## Modelling

In [None]:
# Modeling
INPUTSHAPE = (13,216,1)

In [None]:
LOGDIR = "logs"
CPKT = "cpkt/"

In [None]:
#this callback is used to prevent overfitting.
callback_1 = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', min_delta=0.01, patience=60, verbose=0, mode='auto',
    baseline=None, restore_best_weights=False
)

#this checkpoint saves the best weights of model at every epoch
callback_2 = tf.keras.callbacks.ModelCheckpoint(
    CPKT, monitor='val_loss', verbose=0, save_best_only=True,
    save_weights_only=True, mode='auto', save_freq='epoch', options=None
)

#this is for tensorboard
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=LOGDIR)

In [None]:
from keras.layers import Dropout, BatchNormalization
def create_model():
    model = models.Sequential([
                          layers.Conv2D(16 , (3,3),activation = 'relu',padding='valid', input_shape = INPUTSHAPE),
                          BatchNormalization(),
                          layers.Conv2D(64, (3,3), activation='relu',padding='valid'),
                          BatchNormalization(),
                          layers.Conv2D(32, (3,3), activation='relu',padding='valid'),
                          BatchNormalization(),
                          layers.GlobalAveragePooling2D(),
                          Dropout(0.5),
                          layers.Dense(32 , activation = 'relu'),
                          Dropout(0.5),
                          layers.Dense(10 , activation = 'softmax')
    ])
    model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = 'acc')
    return model

### Cross validation

In [None]:
df_augmented = augment_df(df_sel)

In [None]:
# evaluate single fold
df_train_fold = df_augmented[df_augmented["fold"] !=1] 
df_val_fold = df_augmented[(df_augmented["fold"] == 1) & (df_augmented["type"] == "original")] # Select only the original data for the validation fold.

X_train, y_train =  df_to_tf(df_train_fold)
X_val, y_val = df_to_tf(df_val_fold)

model = create_model()
model.fit(X_train,y_train,
        validation_data=(X_val,y_val),
        epochs=150,
        callbacks = [callback_1 , callback_2 , tensorboard_callback])

_, accuracy = model.evaluate(X_val, y_val, verbose=0)

In [None]:
inverted_dict = {value: key for key, value in class_dict.items()}
# Assuming `sample` is your single sample
row = df_val_fold.sample(1)

ipd.Audio(row["signal"].iloc[0], rate=sr)
print(row["target"])
xp, yp = df_to_tf(row)
prediction = model.predict(xp)  # Get the model's prediction

# # The prediction is an array of probabilities for each class. 
# # To get the class with the highest probability, you can use argmax
predicted_class = np.argmax(prediction)

print(f"The predicted class is {inverted_dict[predicted_class]}")

In [None]:
import numpy as np
import tensorflow as tf

# df_sel_signals = load_signals(df_sel)
# df_sel_signals["type"] = "original"
df_augmented = augment_df(df_sel)

# 5 folds

n_folds = 2
fold_accuracy = []

for i in range(1, n_folds + 1):

    # split folds
    df_train_fold = df_augmented[df_augmented["fold"] !=i]
    df_val_fold = df_augmented[(df_augmented["fold"] == i) & (df_augmented["type"] == "original")] # Select only the original data for the validation fold.

    # convert to tensors
    X_train, y_train =  df_to_tf(df_train_fold)
    X_val, y_val = df_to_tf(df_val_fold)

    model = create_model()
    model.fit(X_train,y_train,
            validation_data=(X_val,y_val),
            epochs=120,
            callbacks = [callback_1 , callback_2 , tensorboard_callback])
    
    _, accuracy = model.evaluate(X_val, y_val, verbose=0)
    fold_accuracy.append(accuracy)

print(f"Average Accuracy: {np.mean(fold_accuracy) * 100}")


### Training and validation

In [None]:
# Having fined tuned the model, now split data in training and validation

model = create_model()
model.summary()

In [None]:
# split data
df_train, df_val = train_test_split(df_sel)

# for validation use only original data, exclude augmented. 
df_train = augment_df(df_train)
df_val = load_signals(df_val)

print("training size", df_train.shape)
print("validation size", df_val.shape)

# convert to tensors
X_train, y_train =  df_to_tf(df_train)
X_val, y_val = df_to_tf(df_val)

model = create_model()
model.fit(X_train,y_train,
        validation_data=(X_val,y_val),
        epochs=90,
        callbacks = [callback_1 , callback_2 , tensorboard_callback])