In [1]:
import pandas as pd
import numpy as np

In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import librosa
from keras.utils import to_categorical

In [3]:
num_classes = 11
input_shape = (128, 384, 1)

In [4]:
train_data = pd.read_csv("train_data.csv")

In [5]:
val_data = pd.read_csv("val_data.csv")

In [46]:
class AudioDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, csv_file, batch_size=32, num_classes=11, sample_rate=22050, duration=10.0, shuffle=True, n_mels=128):
        self.csv_file = csv_file
        self.batch_size = batch_size
        self.num_classes = num_classes
        self.sample_rate = sample_rate
        self.duration = duration
        self.shuffle = shuffle
        self.n_mels  = n_mels
        
        # Read the CSV file
        self.data = pd.read_csv(csv_file)
        
        # Get the unique class labels
        self.classes = sorted(self.data['class_label'].unique())
        
        # Create a dictionary to map class labels to integers
        self.class_to_int = dict(zip(self.classes, range(len(self.classes))))
        
        # Shuffle the data if requested
        if self.shuffle:
            self.data = self.data.sample(frac=1).reset_index(drop=True)
    
    def __len__(self):
        # Return the number of batches
        return int(np.ceil(len(self.data) / float(self.batch_size)))
    
    def __getitem__(self, idx):
        # Get the batch of file paths and labels
        batch_data = self.data[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_data = batch_data.reset_index()
        
        # Initialize the arrays for the audio data and labels
        batch_x = np.zeros((len(batch_data), self.n_mels, 431,1))
        batch_y = np.zeros((len(batch_data), self.num_classes))
        
        # Load the audio files and their corresponding labels
        for i, row in batch_data.iterrows():
            file_path = row['file_path']
            class_label = row['class_label']
            
            # Load the audio file
            signal, sr = librosa.load(file_path, sr=self.sample_rate, mono=True)
            
            # Pad or truncate the signal to the desired length
            signal = librosa.util.fix_length(signal, size=self.sample_rate * self.duration)
            
            # Convert the audio file to spectrogram
            S = librosa.feature.melspectrogram(y=signal, sr=sr)
            S_dB = np.array(librosa.power_to_db(S, ref=np.max))
            S_dB = S_dB.reshape(S_dB.shape[0],S_dB.shape[1],1)
            
            # Save the audio data and label to the batch arrays
            batch_x[i, :] = S_dB
            batch_y[i, :] = to_categorical(self.class_to_int[class_label], num_classes=self.num_classes)
            
#         batch_x.reshape(self.batch_size,batch_x.shape[0],batch_x.shape[1])
        return batch_x, batch_y

In [47]:
train_generator = AudioDataGenerator('train_data.csv', batch_size=32, num_classes=11, sample_rate=22050, duration=10, shuffle=True, n_mels=128)

In [48]:
val_generator = AudioDataGenerator('val_data.csv', batch_size=32, num_classes=11, sample_rate=22050, duration=10, shuffle=True, n_mels=128)

# Model creation

In [73]:
image_size_y=128
image_size_x = 384
patch_size_y = 128  # Size of the patches to be extract from the input images
patch_size_x = 3
num_patches = (image_size_y*image_size_x) // (patch_size_y*patch_size_x)
print(num_patches)

128


In [78]:
# Hyperparameters

learning_rate = 0.001
weight_decay = 0.0
batch_size = 32
num_epochs = 20
image_size_y = 384  # We'll resize input images to this size
image_size_x = 128
patch_size_y = 128  # Size of the patches to be extract from the input images
patch_size_x = 16
num_patches = (image_size_y*image_size_x) // (patch_size_y*patch_size_x)
projection_dim = 128
num_heads = 20
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [256, 128]  # Size of the dense layers of the final classifier

In [52]:
# Data Augmentation

data_augmentation_resize = keras.Sequential(
    [
        layers.Resizing(image_size_y, image_size_x),
    ],
    name="data_augmentation",
)

In [53]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation='relu')(x)
    return x

In [82]:
class Patches(layers.Layer):
    def __init__(self, patch_size_x,_patch_size_y):
        super().__init__()
        self.patch_size_x = patch_size_x
        self.patch_size_y = patch_size_y

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size_y, self.patch_size_x, 1],
            strides=[1, self.patch_size_y, self.patch_size_x, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches

In [66]:
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super().__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )
        self.reshape_patches = layers.Reshape((num_patches, projection_dim))

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.reshape_patches(self.projection(patch))+ self.position_embedding(positions)
        return encoded

In [67]:
def create_vit_classifier():
    inputs = layers.Input(shape=input_shape)
    # Augment data.
    augmented = data_augmentation_resize(inputs)
    # Create patches.
    patches = Patches(patch_size_x,patch_size_y)(augmented)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(encoded_patches, encoded_patches)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x2, hidden_units=transformer_units, dropout_rate=0.1)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)
    # Add MLP.
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.2)
    # Classify outputs.
    logits = layers.Dense(num_classes)(features)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=logits)
    return model

In [68]:
# Defining metrics

from keras import backend as K

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

In [69]:
def run_experiment(model):
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=[
            'accuracy',
            f1_m,
            precision_m, 
            recall_m
        ],
    )


    history = model.fit(
        train_generator,
        batch_size=batch_size,
        epochs=num_epochs
    )

    loss, accuracy, f1_score, precision, recall = model.evaluate(val_generator)
    print('Validation accuracy:', accuracy)
    print('Validation loss:', loss)
    print('Validation f1:', f1_score)
    print('Validation precision:', precision)
    print('Validation recall:', recall)


    return history

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1" 

In [83]:
vit_classifier = create_vit_classifier()

In [84]:
vit_classifier.summary()

Model: "model_7"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_10 (InputLayer)          [(None, 384, 128, 1  0           []                               
                                )]                                                                
                                                                                                  
 data_augmentation (Sequential)  (None, 384, 128, 1)  0          ['input_10[0][0]']               
                                                                                                  
 patches_9 (Patches)            (None, None, 2048)   0           ['data_augmentation[5][0]']      
                                                                                                  
 patch_encoder_7 (PatchEncoder)  (None, 24, 128)     262272      ['patches_9[0][0]']        

 dense_151 (Dense)              (None, 24, 256)      33024       ['add_122[0][0]']                
                                                                                                  
 dense_152 (Dense)              (None, 24, 128)      32896       ['dense_151[0][0]']              
                                                                                                  
 add_123 (Add)                  (None, 24, 128)      0           ['dense_152[0][0]',              
                                                                  'add_122[0][0]']                
                                                                                                  
 multi_head_attention_62 (Multi  (None, 24, 128)     1318528     ['add_123[0][0]',                
 HeadAttention)                                                   'add_123[0][0]']                
                                                                                                  
 add_124 (

In [85]:
history = run_experiment(vit_classifier)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20

KeyboardInterrupt: 