### Research Paper Referred Link:
##### Attention based convolutional recurrent neural network for environmental sound classification (https://www.sciencedirect.com/science/article/pii/S0925231220313618)

# Importing Libraries

In [None]:
import librosa
import os
import numpy as np
import tensorflow as tf
from scipy.signal import stft
from keras import layers, regularizers,optimizers,callbacks
from keras.utils import to_categorical
from keras.callbacks import ModelCheckpoint
from keras.models import load_model
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

In [None]:
# File Paths
input_dir='./Datasets/ESC-10/'
model_path='./Models/CRNN.h5'

# Functions for Preprocessing

In [None]:
def load_audio_file(file_path, sr=44100):
    """
    Description: Load an audio file from the specified file path.
    Arguments:
        file_path (str): Path to the audio file.
        sr (int): Desired sampling rate.
    Returns:
        audio (np.ndarray): Loaded audio data.
    """
    audio, _ = librosa.load(file_path, sr=sr)
    return audio

def extract_log_gammatone_spectrogram(signal, sample_rate):
    """
    Decription: Extract the log-gammatone spectrogram from an audio signal.
    Arguments:
        signal (np.ndarray): Audio signal.
        sample_rate (int): Sampling rate of the audio signal.
    Returns:
        frames (np.ndarray): Extracted log-gammatone spectrogram frames.
        frames.shape[0] (int): Number of Labels to be added
    """
    signal=librosa.util.normalize(signal)
    epsilon = 1e-10  
    window_size = int(0.023 * sample_rate)
    hop_length = window_size // 2
    _, _, stft_data = stft(signal, window='hamming', nperseg=window_size, noverlap=hop_length)
    
    energy_spectrogram = np.abs(stft_data)**2
    
    num_filters = 128
    gammatone_spectrogram = gammatone_filter_bank(energy_spectrogram, sample_rate, num_filters)
    log_spectrogram = np.log10(gammatone_spectrogram + epsilon)
    frame_length = 128
    frame_hop = int(frame_length * 0.5)
    num_frames = (log_spectrogram.shape[1] - frame_length) // frame_hop + 1
    frames = np.zeros((num_frames, frame_length,frame_length, 2))
    for i in range(num_frames):
        start = i * frame_hop
        end = start + frame_length
        frame = log_spectrogram[:, start:end]
        
        delta = np.diff(frame, axis=1)
        delta = np.pad(delta, ((0, 0), (0, 1)), mode='constant')
        frames[i, :, :] = np.dstack((frame, delta))
    return frames, frames.shape[0]
def gammatone_filter_bank(spectrogram, sample_rate, num_filters):
    """
    Description: Apply a gammatone filter bank to a spectrogram.
    Arguments:
        spectrogram (np.ndarray): Input spectrogram.
        sample_rate (int): Sampling rate of the audio signal.
        num_filters (int): Number of filters in the filter bank.
    Returns:
        gammatone_spectrogram (np.ndarray): Gammatone-filtered spectrogram.
    """
    num_bins = spectrogram.shape[0]
    nyquist = sample_rate / 2
    frequencies = np.linspace(0, nyquist, num_bins)
    center_frequencies = erb_to_hertz(np.linspace(hertz_to_erb(frequencies[0]), hertz_to_erb(frequencies[-1]), num_filters))
    
    gammatone_spectrogram = np.zeros((num_filters, spectrogram.shape[1]))
    
    for i in range(num_filters):
        filter_output = gammatone_filter(spectrogram, sample_rate, center_frequencies[i])
        gammatone_spectrogram[i, :] = np.sum(np.abs(filter_output)**2, axis=0)
    
    return gammatone_spectrogram

def gammatone_filter(signal, sample_rate, center_frequency):
    """
    Description: Apply a gammatone filter to a signal.
    Arguments:
        signal (np.ndarray): Input signal.
        sample_rate (int): Sampling rate of the audio signal.
        center_frequency (float): Center frequency of the gammatone filter.
    Returns:
        filter_output (np.ndarray): Filtered signal.
    """
    num_samples = signal.shape[1]
    time = np.arange(num_samples) / sample_rate
    
    b, a = gammatone_coefficients(sample_rate, center_frequency)
    filter_output = np.zeros_like(signal)
    
    for i in range(num_samples):
        t = time[i]
        envelope = t**3 * np.exp(-2 * np.pi * 2.1 * t)
        filter_output[:, i] = envelope * signal[:, i]
    
    return filter_output

def gammatone_coefficients(sample_rate, center_frequency):
    """
    Description: Compute the coefficients for a gammatone filter.
    Arguments:
        sample_rate (int): Sampling rate of the audio signal.
        center_frequency (float): Center frequency of the gammatone filter.
    Returns:
        b (list): Numerator coefficients of the filter transfer function.
        a (list): Denominator coefficients of the filter transfer function.
    """
    t = 1 / sample_rate
    bandwidth = 24.7 * (4.37 * center_frequency / 1000 + 1)
    beta = 1.019 * 2 * np.pi * bandwidth / sample_rate
    
    b0 = t**2 * 2 * np.pi * center_frequency / sample_rate
    b1 = 0
    b2 = -b0
    a0 = 1 + beta
    a1 = -2 * np.cos(center_frequency * 2 * np.pi * t) / a0
    a2 = (1 - beta) / a0

    b = [b0, b1, b2]
    a = [a0, a1, a2]

    return b, a

def hertz_to_erb(frequency):
    """
    Description: Convert a frequency value from Hertz to ERB scale.
    Arguments:
        frequency (float): Frequency value in Hertz.
    Returns:
        erb (float): Frequency value in ERB scale.
    """
    return 9.265 * np.log(1 + (frequency / 24.7) * 0.00437)

def erb_to_hertz(erb):
    """
    Description: Convert a frequency value from ERB scale to Hertz.
    Arguments:
        erb (float): Frequency value in ERB scale.
    Returns:
        frequency (float): Frequency value in Hertz.
    """
    return 24.7 * (np.exp(erb / 9.265) - 1) / 0.00437

def count_files():
    """
    Description: Counts Number of Files in the dataset
    Arguments:
        None
    Returns:
        count (int): Number of files in the dataset
    """
    count = 0
    for root_dir, cur_dir, files in os.walk(input_dir):
        count += len(files)
    return count

def load_audio_files(directory_path, sr=44100):
    """
    Description: Load audio files from a directory and extract log-gammatone spectrograms.
    Arguments:
        directory_path (str): Path to the directory containing audio files.
        sr (int): Desired sampling rate.
    Returns:
        data (np.ndarray): Extracted log-gammatone spectrograms.
        labels (np.ndarray): Corresponding labels for each spectrogram.
    """
    data=np.empty((0,128,128,2))
    labels = []
    label_map = {}
    label_idx = 0    
    ctr=0
    for label in os.listdir(directory_path):
        path = os.path.join(directory_path, label)
        for file in os.listdir(path):
            if file.endswith('.ogg'):
                label = os.path.basename(path)
                if label not in label_map:
                    label_map[label] = label_idx
                    label_idx += 1
                label_id = label_map[label]
                file_path = os.path.join(path, file)
                audio = load_audio_file(file_path, sr=sr)
                log_gammatone_spectrogram, label_count = extract_log_gammatone_spectrogram(audio, sample_rate=sr)
                data=np.concatenate([data,log_gammatone_spectrogram])
                for i in range(label_count):
                    labels.append(label_id)
                ctr+=1
    labels = np.asarray(labels)
    return data, labels

def preprocess_audio(audio_path, sr=44100):
    """
    Description: Preprocess an audio file by loading it and extracting the log-gammatone spectrogram.
    Arguments:
        audio_path (str): Path to the audio file.
        sr (int): Desired sampling rate.
    Returns:
        spectrogram (np.ndarray): Extracted log-gammatone spectrogram.
    """
    audio = load_audio_file(audio_path, sr=sr)
    spectrogram = extract_log_gammatone_spectrogram(audio, sample_rate=sr)
    return spectrogram

# Functions for Model Creation

In [None]:

def calculate_attention_map(M):
    """
    Description: Calculate the attention map from the given feature map.
    Arguments:
        M (tf.Tensor): Feature map tensor.
    Returns:
        attention_map (tf.Tensor): Calculated attention map.
    """
    conv_output = tf.keras.layers.Conv2D(filters=1, kernel_size=(3, 3), padding='same')(M)
    
    avg_pool_output = tf.keras.layers.AveragePooling2D(pool_size=(conv_output.shape[1], 1))(conv_output)
    
    attention_map = tf.keras.activations.softmax(avg_pool_output, axis=1)
    
    return attention_map

def apply_attention(M, attention_map):
    """
    Description: Apply attention to the given feature map using the attention map.
    Arguments:
        M (tf.Tensor): Feature map tensor.
        attention_map (tf.Tensor): Attention map tensor.
    Returns:
        M0 (tf.Tensor): Feature map with applied attention.
    """
    M0 = tf.multiply(M, attention_map)
    
    return M0

def calculate_attention_weights(inputs):
    """
    Description: Calculate the attention weights from the given inputs.
    Arguments:
        inputs (tf.Tensor): Input tensor.
    Returns:
        weights (tf.Tensor): Calculated attention weights.
    """
    hidden = layers.Dense(units=256, activation='tanh')(inputs)
    
    weights = tf.exp(hidden) / tf.reduce_sum(tf.exp(hidden), axis=1, keepdims=True)
    
    return weights

def compute_feature_vector(ht, attention_weights):
    """
    Description: Compute the weighted sum of the frame-level RNN features.
    Arguments:
        ht (tf.Tensor): RNN features tensor.
        attention_weights (tf.Tensor): Attention weights tensor.
    Returns:
        feature_vector (tf.Tensor): Computed feature vector.
    """
    feature_vector = tf.reduce_sum(tf.multiply(ht, attention_weights), axis=1)
    
    return feature_vector

def build_crnn_model_with_attention(input_shape, num_classes):
    """
    Description: Build a CRNN model with attention mechanism.
    Arguments:
        input_shape (tuple): Shape of the input tensor.
        num_classes (int): Number of output classes.
    Returns:
        model (tf.keras.Model): Built CRNN model with attention mechanism.
    """
    inputs = layers.Input(shape=input_shape)

    cnn = layers.Conv2D(32,(3,5),(1,1), activation='leaky_relu', padding='same',kernel_regularizer=regularizers.L2(0.0001))(inputs)
    cnn = layers.BatchNormalization()(cnn)
    cnn = layers.Conv2D(32,(3,5),(1,1), activation='leaky_relu', padding='same',kernel_regularizer=regularizers.L2(0.0001))(inputs)
    cnn = layers.BatchNormalization()(cnn)
    cnn = layers.MaxPooling2D(strides=(4, 3))(cnn)
    cnn = layers.Conv2D(64, (3, 1),(1,1), activation='leaky_relu', padding='same',kernel_regularizer=regularizers.L2(0.0001))(cnn)
    cnn = layers.BatchNormalization()(cnn)
    cnn = layers.Conv2D(64, (3, 1),(1,1), activation='leaky_relu', padding='same',kernel_regularizer=regularizers.L2(0.0001))(cnn)
    cnn = layers.BatchNormalization()(cnn)
    cnn = layers.MaxPooling2D(strides=(4, 1))(cnn)
    cnn = layers.Conv2D(128, (1, 5),(1,1), activation='leaky_relu', padding='same',kernel_regularizer=regularizers.L2(0.0001))(cnn)
    cnn = layers.BatchNormalization()(cnn)
    cnn = layers.Conv2D(128, (1, 5),(1,1), activation='leaky_relu', padding='same',kernel_regularizer=regularizers.L2(0.0001))(cnn)
    cnn = layers.BatchNormalization()(cnn)
    cnn = layers.MaxPooling2D(strides=(1,3))(cnn)
    cnn = layers.Conv2D(256, (3, 3),(1,1), activation='leaky_relu', padding='same',kernel_regularizer=regularizers.L2(0.0001))(cnn)
    cnn = layers.BatchNormalization()(cnn)
    cnn = layers.Conv2D(256, (3, 3),(1,1), activation='leaky_relu', padding='same',kernel_regularizer=regularizers.L2(0.0001))(cnn)
    cnn = layers.BatchNormalization()(cnn)
    cnn = layers.MaxPooling2D(strides=(2, 2))(cnn)
    
    cnn_attmap = calculate_attention_map(cnn)
    cnn_att=apply_attention(cnn,cnn_attmap)

    rnn = layers.Reshape((-1, 256))(cnn_att)
    rnn = layers.Bidirectional(layers.GRU(256,activation="tanh", return_sequences=True,kernel_regularizer=regularizers.L2(0.0001)))(rnn)
    rnn = layers.Dropout(0.5)(rnn)
    rnn = layers.Bidirectional(layers.GRU(256,activation="tanh", return_sequences=True,kernel_regularizer=regularizers.L2(0.0001)))(rnn)
    rnn = layers.Dropout(0.5)(rnn)
    rnn = layers.Reshape((-1, 256))(rnn)
    
    att_weights = calculate_attention_weights(rnn)
    attended_features = compute_feature_vector(rnn, att_weights)
    
    outputs = layers.Dense(num_classes, activation='softmax')(attended_features)
    
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model


# Train Predict Pipeline Functions

In [None]:
def pipeline_train():
    """
    Description: Pipeline for training a CRNN model on the ESC-10 dataset.
    Arguments:
        None
    Returns:
        None
    """
    batch_size = 16
    epochs = 50
    init_lr = 0.01
    lr_decay = 0.1
    lr_drop = 16

    X, y = load_audio_files(input_dir, sr=44100)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    num_classes = len(np.unique(y_train))
    y_train = to_categorical(y_train, num_classes)
    y_test = to_categorical(y_test, num_classes)

    input_shape = X_train[0].shape
    model = build_crnn_model_with_attention(input_shape, num_classes)
    
    weights = model.get_weights()
    for i in range(len(weights)):
        weights[i] = np.random.normal(loc=0.0, scale=0.05, size=weights[i].shape)
    model.set_weights(weights)
    
    opt = optimizers.SGD(learning_rate=init_lr, decay=lr_decay, momentum=0.9, nesterov=True,clipnorm=1.0)
    model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

    def lr_scheduler(epoch):
        lr = init_lr * (lr_decay ** np.floor((1 + epoch) / lr_drop))
        return lr

    lr_callback = callbacks.LearningRateScheduler(lr_scheduler)
    checkpoint = ModelCheckpoint(model_path, monitor='val_accuracy', verbose=1, save_best_only=True)

    history = model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=0.2, callbacks=[lr_callback,checkpoint])
    
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()
    plt.show()

    loss, accuracy = model.evaluate(X_test, y_test)
    print('Test loss:', loss)
    print('Test accuracy:', accuracy)

In [None]:
def pipeline_predict(filepath):
    """
    Description: Pipeline for predicting the class of an audio file using a pre-trained CRNN model.
    Arguments:
        filepath (str): Path to the audio file.
    Returns:
        None
    """
    model = load_model(model_path)
    spectrogram = preprocess_audio(filepath)
    predictions = model.predict(spectrogram[0])
    print(predictions)

In [None]:
%timeit -n 1 -r 2 pipeline_train()

In [None]:
pipeline_predict('./Datasets/ESC-10/001 - Dog bark/1-30226-A.ogg')