### import libraries

In [None]:
import os
import librosa
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm
import soundfile as sf
import random
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Embedding ,  Permute,Dropout, AvgPool2D , BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN , Reshape ,GlobalAvgPool1D , GlobalMaxPooling1D
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense , Bidirectional , LSTM
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
import tensorflow_model_optimization as tfmot
from tensorflow.keras.regularizers import l2
scaler = StandardScaler()

### Functions

In [None]:
def plotLearningCurve(history,epochs):
  """
  Plot accuracy chart vs number of epoch performed
    history : store model performance data
    epochs : stores number of epochs
  Return : Graph
  """
  epochRange = range(1,epochs+1)
  plt.plot(epochRange,history.history['categorical_accuracy'])
  plt.plot(epochRange,history.history['val_categorical_accuracy'])
  plt.title('Model Accuracy')
  plt.xlabel('Epoch')
  plt.ylabel('Accuracy')
  plt.legend(['Train','Validation'],loc='best')
  plt.show()

  plt.plot(epochRange,history.history['loss'])
  plt.plot(epochRange,history.history['val_loss'])
  plt.title('Model Loss')
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.legend(['Train','Validation'],loc='best')
  plt.show()

In [None]:
def get_test_accuracy(predictions, y_test):
    """
    Calculate accuracy for test cases
        predictions : Stores predicted categories
        y_test : Stores actual categories
    Return : accuracy as float datatype
    """
    # Get predicted class indices
    predicted_classes = np.argmax(predictions, axis=1)
    true_classes = np.argmax(y_test, axis=1)
    
    # Compute accuracy
    accuracy = np.mean(predicted_classes == true_classes)
    print(f"Test Accuracy: {accuracy * 100:.2f}%")
    return accuracy


In [None]:
def add_background_noise(audio, output_file_path, noise_level=0.2):
    """
    Add background noise to actual data for data augmentation.
        audio : actual audio data
        output_file_path : file path address to store augemented data for manual testing
        noise_level : supression metric for added noice

        formula : audio + background_noise * noise_level
    
    return : augmented data as list of list datatype
    """
    noise_files = os.listdir()
    noise_files = [x for x in noise_files if x != '.DS_Store' and x.endswith('.wav')]
    augmented = []
    for noise_data in noise_files:
        y, sr = librosa.load(noise_data, sr=20000)
        noise = y.astype(np.float32)

        # Adjust noise volume
        # noise = noise - abs(noise_level)  # Make noise quieter

        # Loop or trim noise to match audio length
        if len(noise) < len(audio):
            noise = np.pad(noise, (0, len(audio) - len(noise)), 'wrap')
        else:
            noise = noise[:len(audio)]

        # Combine audio and noise
        augmented_audio = audio + noise_level * noise
        augmented.append(augmented_audio)

        # Export augmented audio
        sf.write(output_file_path, augmented_audio, sr)
    return augmented

In [None]:
def sliding_window(arr, window_size, step_size):
    for i in range(0, len(arr) - window_size + 1, step_size):
        for elem in arr[i:i + window_size]:
            yield elem

In [None]:
def sliding_window_average(signal, window_size):
    if len(signal) > window_size:
        window = np.ones(window_size) / window_size
        smoothed_signal = np.convolve(signal, window, mode='same')
        return smoothed_signal
    return signal

In [None]:
def amplitude_shift(audio):
    shift_factor = np.random.uniform(0.9, 1.1)
    # Apply the amplitude shift by multiplying the audio with the shift factor
    shifted_audio = audio * shift_factor
    return shifted_audio

In [None]:
def apply_hamming_window(windows):

    window_size = len(windows)  # Get the size of each window

    hamming = np.hamming(window_size)  # Create Hamming window

    # Reshape to (1, window_size) for broadcasting along each row (window)
    hamming = hamming.reshape(1, -1)  # Changed to (1, window_size)

    # Apply the Hamming window to each window
    # windows_hamming = windows * hamming
    windows_hamming = windows


    return windows_hamming

In [None]:
def apply_standard_scalar(audio_data):
    """
    Apply standard scaler to fit data to standard normal form
        audio_data : stores augemented/preprocessed audio data

    return : scaled data as list
    """
    if audio_data.shape[1] > 0:
        scaled_data = scaler.fit_transform(audio_data)
        return scaled_data
    return audio_data

In [None]:
def process_audio_files(main_folder):
    processed_data = {}
    folders = os.listdir(main_folder)
    folders = [x for x in folders if x != '.DS_Store']
    print(folders)

    total_files = sum(len(os.listdir(os.path.join(main_folder, folder))) for folder in folders)
    with tqdm(total=total_files, desc='Processing Audio Files', unit='file') as pbar:
        for folder in folders:
            folder_path = os.path.join(main_folder, folder)
            if os.path.isdir(folder_path):
                processed_data[folder] = []
                for file_name in os.listdir(folder_path):
                    if file_name.endswith('.wav'):
                        file_path = os.path.join(folder_path, file_name)
                        y, sr = librosa.load(file_path, sr=20000)
                        original_data = y.astype(np.float32)

                        # Apply augmentations
                        shifted_data = amplitude_shift(original_data)#randon int
                        noisy_data_list = add_background_noise(shifted_data , os.path.join('Output' ,folder, 'augmented_' + file_name))

                        # Process each augmented data variant
                        for variant_name, variant_data in zip(
                            ['original'] + [f'augmented_{i}' for i in range(len(noisy_data_list))],
                            [original_data] + noisy_data_list):
                            
                            # Sliding window
                            # windowed_data = sliding_window(variant_data, 30000, 5000)
                            # windows = sliding_window_average(variant_data, int(sr * 0.1))

                            # Hamming window
                            windows_hamming = apply_hamming_window(list(variant_data))

                            # Standard Scalar
                            # scaled_data = apply_standard_scalar(windows_hamming)

                            # Store processed data
                            processed_data[folder].append({
                                'label': folder,
                                'variant': variant_name,
                                'data': windows_hamming
                            })
                    pbar.update(1)

    return processed_data


In [None]:
# Define the main folder path
main_folder = 'Original'

# Process all audio files with augmentations, sliding window, hamming, and scaling
final_data = process_audio_files(main_folder)
final_data

In [None]:
df = pd.DataFrame(columns=['label', 'variant', 'data'])

for label, variants in final_data.items():
    for variant in variants:
        df = pd.concat([df, pd.DataFrame([{'label': label, 'variant': variant['variant'], 'data': variant['data']}])], ignore_index=True)

df
# df['data'] = df['data'].apply(lambda x: [item for sublist in x for item in sublist])

In [None]:
# df.to_csv('final_data.csv', index=False) # Save in local for manual inspection

### Modelling

In [None]:
# Encode catogerical field 
le = LabelEncoder()
le.fit(df['label'])
df['label'] = le.transform(df['label'])

In [None]:
'''
Set dependent and independent field
    x : independent
    y : dependent
'''
x = df['data']
y = df['label']

In [None]:
'''
Pad the data to same lenght to convert to tensor form.
Standarize encoded field to integer datatype.
'''
x = sequence.pad_sequences(x, maxlen=30225, padding='post', truncating='post', dtype='float32')
y = y.astype(np.int32)

In [None]:
x_train , x_test , y_train , y_test = train_test_split(x , y , test_size = 0.2) # Split data into test and train

In [None]:
'''
Convert the data to tensor format.
'''
x_train = x_train.reshape((x_train.shape[0],1, 30225, 1))  # (batch_size, height, width, channels)
x_test = x_test.reshape((x_test.shape[0],1, 30225 , 1))
y_train = to_categorical(y_train, num_classes=8)
y_test = to_categorical(y_test, num_classes=8)

### Model 1 CNN

In [None]:
model_cnn = Sequential([
    # Embedding(input_dim=50000 , output_dim=128, input_length=500),
    #SFEB
    Conv2D(8, (1,9), strides = (1,2) , activation='relu', input_shape= (1 , 30225 , 1)),
    BatchNormalization(),
    Conv2D(64, (1,5), strides = (1,2),  activation='relu'),
    BatchNormalization(),
    MaxPooling2D(pool_size = (1, 50), strides = (1, 50)),
    Permute((3, 2, 1)), #SwapAxes
    #TFEB
    Conv2D(32 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Conv2D(64 ,(3,3) , padding = 'same' , activation = 'relu'),
    Conv2D(64 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Conv2D(128 ,(3,3) , padding = 'same' , activation = 'relu'),
    Conv2D(128 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Conv2D(256 ,(3,3) , padding = 'same' , activation = 'relu'),
    Conv2D(256 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Conv2D(512 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    Conv2D(512 , (3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Dropout(0.2),
    Conv2D(8 , (1,1) , activation = 'relu'),
    BatchNormalization(),
    AvgPool2D((1,4)),
    Flatten(),
    Dense(8 , activation = 'relu'),
    Dense(8 , activation = 'softmax')#Output
])

model_cnn.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model_cnn.summary()

### With Sliding Window

In [None]:
plotLearningCurve(model_cnn,10)

### without sliding window

In [None]:
model = model_cnn.fit(x_train , y_train , epochs = 50 , validation_data = (x_test , y_test))

In [None]:
plotLearningCurve(model,50)

In [None]:
model_cnn.save("model_cnn_without_quantization.h5")

### Real time accuracy

##### Testing real time accuracy by passing test data to saved model.
##### Ideally newly collected data is used for real time testing but due to lack of resources and data, test data was used.

In [None]:
input_details = model_cnn.input_shape
output_details = model_cnn.output_shape
print(input_details)
print(output_details)

In [None]:
predictions = model_cnn.predict(x_test)

# If using one-hot encoded labels, convert predictions to class indices
predicted_classes = np.argmax(predictions, axis=1)  # Convert to class indices
true_classes = np.argmax(y_test, axis=1)  # Convert true labels to class indices

# Calculate accuracy
accuracy = accuracy_score(true_classes, predicted_classes)
print(f"Test Accuracy: {accuracy:.2f}")

### Quantization

##### What is Quantization?
#####

In [None]:
interpreter = tf.lite.Interpreter(model_path="quantized_model.tflite") # Quantize the model and save as tflite

In [None]:
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
interpreter.allocate_tensors()
print(input_details)
print(output_details)

In [None]:
predictions = []
for img in x_test:  
    interpreter.set_tensor(input_details[0]['index'], [img.astype('float32')])
    interpreter.invoke()
    output_data = interpreter.get_tensor(output_details[0]['index'])
    predictions.append(output_data[0])
predictions = np.array(predictions)
get_test_accuracy(predictions, y_test)


### Save in tflite

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_cnn)
# Enable full integer quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

# Convert the model
quantized_model = converter.convert()
with open("quantized_model.tflite", "wb") as f:
    f.write(quantized_model)

### Model 2 CNN_LSTM

In [None]:
model_cnn_rnn = Sequential([
    # Embedding(input_dim=50000 , output_dim=128, input_length=500),
    #SFEB
    Conv2D(8, (1,9), strides = (1,2) , activation='relu', input_shape= (1 , 30225,1)),
    BatchNormalization(),
    Conv2D(64, (1,5), strides = (1,2),  activation='relu'),
    BatchNormalization(),
    MaxPooling2D((1, 50)),
    Permute((3, 2, 1)), #SwapAxes
    #TFEB
    Conv2D(32 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Conv2D(64 ,(3,3) , padding = 'same' , activation = 'relu'),
    Conv2D(64 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Conv2D(128 ,(3,3) , padding = 'same' , activation = 'relu'),
    Conv2D(128 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Conv2D(256 ,(3,3) , padding = 'same' , activation = 'relu'),
    Conv2D(256 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Conv2D(512 ,(3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    Conv2D(512 , (3,3) , padding = 'same' , activation = 'relu'),
    BatchNormalization(),
    MaxPooling2D((2,2)),
    Dropout(0.2),
    Conv2D(32 , (1,1) , activation = 'relu'),
    BatchNormalization(),
    AvgPool2D((1,4)),
    Reshape((2, 32)),
    LSTM(8 , activation='relu', return_sequences=True), # LSTM layer 
    BatchNormalization(),
    GlobalAvgPool1D(),
    # SimpleRNN(8 , activation='relu', return_sequences=True),
    # BatchNormalization(),
    # GlobalAvgPool1D(),
    Dense(8 , activation = 'relu'),
    Dense(8 , activation = 'softmax')#Output
])

model_cnn_rnn .compile(optimizer='adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model_cnn_rnn .summary()

In [None]:
model_2 = model_cnn_rnn.fit(x_train , y_train , epochs = 50 , validation_data = (x_test , y_test))

In [None]:
plotLearningCurve(model_2,50)

In [None]:
model_cnn_rnn.save("model_cnn_rnn_without_quantization.h5")

In [None]:
predictions = model_cnn_rnn.predict(x_test)

# If using one-hot encoded labels, convert predictions to class indices
predicted_classes = np.argmax(predictions, axis=1)  # Convert to class indices
true_classes = np.argmax(y_test, axis=1)  # Convert true labels to class indices

# Calculate accuracy
accuracy = accuracy_score(true_classes, predicted_classes)
print(f"Test Accuracy: {accuracy:.2f}")

### Pruned

In [None]:
pruned_model_cnn = Sequential([
    # Embedding(input_dim=50000 , output_dim=128, input_length=500),
    #SFEB
    Conv2D(7, (1,9), strides = (1,2) , activation='relu', input_shape= (1 , 30225,1), kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    Conv2D(20, (1,5), strides = (1,2),  activation='relu', kernel_regularizer=l2(0.015)),
    BatchNormalization(),
    MaxPooling2D((1, 50), strides = (1,50)),
    Permute((3,2,1)), #SwapAxes
    #TFEB
    Conv2D(10 ,(3,3), strides = (1,1) , padding = 'same' , activation = 'relu', kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    MaxPooling2D((2,2) , strides = (2,2)),
    Conv2D(14 ,(3,3), strides = (1,1) , padding = 'same' , activation = 'relu', kernel_regularizer=l2(0.01)),
    Conv2D(22 ,(3,3), strides = (1,1) , padding = 'same' , activation = 'relu', kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    MaxPooling2D((2,2), strides = (2,2)),
    Conv2D(31 ,(3,3), strides = (1,1) , padding = 'same' , activation = 'relu', kernel_regularizer=l2(0.01)),
    Conv2D(35 ,(3,3), strides = (1,1) , padding = 'same' , activation = 'relu', kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    MaxPooling2D((2,2), strides = (2,2)),
    Conv2D(41 ,(3,3), strides = (1,1) , padding = 'same' , activation = 'relu', kernel_regularizer=l2(0.01)),
    Conv2D(69 ,(3,3), strides = (1,1) , padding = 'same' , activation = 'relu', kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    MaxPooling2D((2,2), strides = (2,2)),
    BatchNormalization(),
    Dropout(0.2),
    Conv2D(8 , (1,1), strides = (1,1) , activation = 'relu', kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    AvgPool2D((1,4), strides = (1,4)),
    Flatten(),
    # Reshape((-1, 8)),
    # LSTM(8 , activation='relu', return_sequences=True),
    # BatchNormalization(),
    # GlobalAvgPool1D(),
    Dense(8 , activation = 'relu'),
    Dense(8 , activation = 'softmax')#Output
])

pruned_model_cnn.compile(optimizer='adam', loss=keras.losses.CategoricalCrossentropy(from_logits=True), metrics=['categorical_accuracy'])
pruned_model_cnn.summary()

In [None]:
model_pruned = pruned_model_cnn.fit(x_train , y_train , epochs = 50 , validation_data = (x_test , y_test))

In [None]:
plotLearningCurve(model_pruned,50)

In [None]:
predictions = pruned_model_cnn.predict(x_test)

# If using one-hot encoded labels, convert predictions to class indices
predicted_classes = np.argmax(predictions, axis=1)  # Convert to class indices
true_classes = np.argmax(y_test, axis=1)  # Convert true labels to class indices

# Calculate accuracy
accuracy = accuracy_score(true_classes, predicted_classes)
print(f"Test Accuracy: {accuracy:.2f}")