# Import Requiremets

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchaudio
import torchvision.transforms as transforms
import os
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import torchvision.models as models
import torch.nn.functional as F
import time
import librosa
from skopt import gp_minimize
from skopt.space import Real
from skopt.utils import use_named_args

# Check for CUDA availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
import torch
import torch.nn as nn
from collections import defaultdict


class Add(nn.Module):
    '''
    Adds two tensors and returns the result
    '''
    def __init__(self,activation=None):
        super(Add, self).__init__()
        self.activation = activation
        self.digital = True
        
    def forward(self, x):
        if len(x) != 2:
            print('ERR: Num tensors to add',len(x))
            raise
#         return torch.stack(x,dim=0).sum(dim=0)
        if self.activation is not None:
            return self.activation(torch.stack(x,dim=0).sum(dim=0))
        else:
            return torch.stack(x,dim=0).sum(dim=0)
        
def model_summary(M, pt_191=False):
    """
    This function provides summary of all the named classes in the model.
    Use arguments pt_191=True for pytorch 1.9.1 usage, default pt_191 = False
    Returns a dictionary of class names and usage count.
    """
    def zero(): return 0
    cdict = defaultdict(zero)
    

    for n,m in M.named_modules(remove_duplicate=True):
        if isinstance(m,nn.Conv2d):
            if M.get_submodule(n.rsplit('.',1)[0]).__class__.__name__ == 'CART':
                cdict['CART_'+m.__class__.__name__]+=1
                
            else:
                cdict[m.__class__.__name__]+=1
                
            
        elif isinstance(m,(nn.ReLU,Add)) and hasattr(m,'digital'):
            if m.digital:
                cdict[m.__class__.__name__]+=1
                
            else:
                cdict['CART_'+m.__class__.__name__]+=1
                
        else:
             cdict[m.__class__.__name__]+=1
        
            
    w_size=0        
    for p in M.parameters():
        w_size+=p.shape.numel()
    cdict['Parameters'] = str(w_size/1e6)+'M'   
        
    return dict(cdict)

# Class AudioDataset

In [None]:
class AudioDataset(Dataset):
    def __init__(self, directory, desired_duration, sample_rate=44100):
        self.directory = directory
        self.classes = sorted(os.listdir(directory))
        self.audio_files = []
        self.desired_duration = desired_duration
        self.sample_rate=sample_rate

        for i, class_name in enumerate(self.classes):
            class_path = os.path.join(directory, class_name)
            for audio_file in os.listdir(class_path):
                self.audio_files.append((os.path.join(class_path, audio_file), i))

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        audio_file, label = self.audio_files[idx]
        waveform, sample_rate = librosa.load(audio_file, sr=None) 
        spectrogram = self._compute_spectrogram(waveform, sample_rate)
        return spectrogram, label
    
    def _compute_spectrogram(self,waveform, sample_rate):
        if len(waveform) != self.desired_duration * self.sample_rate:
            waveform = librosa.resample(waveform, orig_sr=len(waveform), target_sr=self.sample_rate)

        if len(waveform) < self.desired_duration * self.sample_rate:
            pad_size = self.desired_duration * self.sample_rate - len(waveform)
            waveform = np.pad(waveform, (0, pad_size))
        elif len(waveform) > self.desired_duration * self.sample_rate:
            waveform = waveform[:self.desired_duration * self.sample_rate]

        spectrogram = librosa.feature.melspectrogram(y=waveform, sr=sample_rate)
        # Convert to decibel scale
        spectrogram = librosa.power_to_db(spectrogram, ref=np.max)
        # Normalize spectrogram values
        
        spectrogram = (spectrogram - np.min(spectrogram)) / (np.max(spectrogram) - np.min(spectrogram))
        return spectrogram

## Define data directories

In [None]:
train_dir = 'data/train'
validation_dir = 'data/validate'
test_dir = 'data/test'

## Load datasets

In [None]:
desired_duration = 6  # Duration in seconds
target_sample_rate = 16000

train_dataset = AudioDataset(train_dir, desired_duration=desired_duration,sample_rate=target_sample_rate)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

validation_dataset = AudioDataset(validation_dir,desired_duration=desired_duration,sample_rate=target_sample_rate)
validation_loader = DataLoader(validation_dataset, batch_size=8, shuffle=False)

test_dataset = AudioDataset(test_dir, desired_duration=desired_duration,sample_rate=target_sample_rate)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# YAMNET

In [None]:
import torch.nn.init as init

def init_weights_he(module):
    if isinstance(module, (nn.Conv1d, nn.Linear)):
        init.kaiming_normal_(module.weight.data, mode='fan_out', nonlinearity='relu')
        if module.bias is not None:
            init.constant_(module.bias.data, 0)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class YAMNet(nn.Module):
    def __init__(self, num_classes, num_samples):
        super(YAMNet, self).__init__()
        self.num_classes = num_classes
        self.num_samples = num_samples

        # Define the layers
        self.conv1 = nn.Sequential(
            nn.Conv1d(128, 32, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm1d(32, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv1 = nn.Sequential(
            nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1, groups=32),
            # nn.BatchNorm1d(32, eps=1e-4),
            nn.Conv1d(64, 64, kernel_size=1),
            nn.BatchNorm1d(64, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv2 = nn.Sequential(
            nn.Conv1d(64, 128, kernel_size=3, stride=2, padding=1, groups=64),
            # nn.BatchNorm1d(64, eps=1e-4),
            nn.Conv1d(128, 128, kernel_size=1),
            nn.BatchNorm1d(128, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )

        self.separable_conv3 = nn.Sequential(
            nn.Conv1d(128, 128, kernel_size=3, stride=1, padding=1, groups=128),
            nn.Conv1d(128, 128, kernel_size=1),
            nn.BatchNorm1d(128, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv4 = nn.Sequential(
            nn.Conv1d(128, 256, kernel_size=3, stride=2, padding=1, groups=128),
            nn.Conv1d(256, 256, kernel_size=1),
            nn.BatchNorm1d(256, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )

        self.separable_conv5 = nn.Sequential(
            nn.Conv1d(256, 256, kernel_size=3, stride=1, padding=1, groups=256),
            nn.Conv1d(256, 256, kernel_size=1),
            nn.BatchNorm1d(256, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv6 = nn.Sequential(
            nn.Conv1d(256, 512, kernel_size=3, stride=2, padding=1, groups=256),
            # nn.BatchNorm1d(256),
            nn.Conv1d(512, 512, kernel_size=1),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv7 = nn.Sequential(
            nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1, groups=512),
            # nn.BatchNorm1d(512),
            nn.Conv1d(512, 512, kernel_size=1),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv8 = nn.Sequential(
            nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1, groups=512),
            # nn.BatchNorm1d(512),
            nn.Conv1d(512, 512, kernel_size=1),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv9 = nn.Sequential(
            nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1, groups=512),
            # nn.BatchNorm1d(512),
            nn.Conv1d(512, 512, kernel_size=1),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv10 = nn.Sequential(
            nn.Conv1d(512, 1024, kernel_size=3, stride=2, padding=1, groups=512),
            nn.Conv1d(1024, 1024, kernel_size=1),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv11 = nn.Sequential(
            nn.Conv1d(1024, 1024, kernel_size=3, stride=2, padding=1, groups=1024),
            nn.Conv1d(1024, 1024, kernel_size=1),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(inplace=True)
        )
        self.global_pool = nn.AdaptiveAvgPool1d(1)
        self.classifier = nn.Linear(1024, num_classes)

        self.apply(init_weights_he)

    def forward(self, x):
        # print(x.shape)
        # x = x.view(-1, 1, self.num_samples)
        # print("after", x.shape)

        # Apply the convolutional layers
        x = F.relu(self.conv1(x))
        x = self.separable_conv1(x)
        x = self.separable_conv2(x)
        x = self.separable_conv3(x)
        x = self.separable_conv4(x)
        x = self.separable_conv5(x)
        x = self.separable_conv6(x)
        x = self.separable_conv7(x)
        x = self.separable_conv8(x)
        x = self.separable_conv9(x)
        x = self.separable_conv10(x)
        x = self.separable_conv11(x)
        
        # Global average pooling
        x = self.global_pool(x)
        
        # Flatten the output
        x = x.view(x.size(0), -1)
        
        # Classifier
        x = self.classifier(x)
        return x


In [None]:
# Function to calculate the number of parameters in the model
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Train Function

In [None]:
def train_model(model, train_loader,val_loader, criterion, optimizer, num_epochs=10):
    train_loss_history = []
    train_acc_history = []
    val_loss_history = []
    val_acc_history = []
    
    start_time = time.time()

    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            outputs_softmax = F.softmax(outputs, dim=1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        epoch_train_loss = running_loss / len(train_loader.dataset)
        epoch_train_acc = correct / total
        train_loss_history.append(epoch_train_loss)
        train_acc_history.append(epoch_train_acc)

        print(f"Train Loss: {epoch_train_loss:.4f}, Train Accuracy: {epoch_train_acc:.4f}")

        # Validation
        model.eval()
        val_running_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for val_inputs, val_labels in val_loader:
                val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)
                val_outputs = model(val_inputs)
                val_loss = criterion(val_outputs, val_labels)
                val_running_loss += val_loss.item() * val_inputs.size(0)
                _, val_predicted = torch.max(val_outputs, 1)
                val_total += val_labels.size(0)
                val_correct += (val_predicted == val_labels).sum().item()

        epoch_val_loss = val_running_loss / len(val_loader.dataset)
        epoch_val_acc = val_correct / val_total
        val_loss_history.append(epoch_val_loss)
        val_acc_history.append(epoch_val_acc)

        print(f"Validation Loss: {epoch_val_loss:.4f}, Validation Accuracy: {epoch_val_acc:.4f}")

    end_time = time.time()  # Record end time
    training_time = end_time - start_time
    print(f"Training Time: {training_time:.2f} seconds")

    # Plotting
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(1, num_epochs + 1), train_loss_history, label='Train Loss')
    plt.plot(range(1, num_epochs + 1), val_loss_history, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training and Validation Loss')

    plt.subplot(1, 2, 2)
    plt.plot(range(1, num_epochs + 1), train_acc_history, label='Train Accuracy')
    plt.plot(range(1, num_epochs + 1), val_acc_history, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Training and Validation Accuracy')

    plt.show()

    # Calculate model size and number of parameters
    model_size_mb = sum(p.numel() for p in model.parameters()) / (1024 * 1024)
    num_parameters = count_parameters(model)
    print(f"Model Size: {model_size_mb:.2f} MB")
    print(f"Number of Parameters: {num_parameters}")

    # Save the trained model
    torch.save(model.state_dict(), 'YAMNETRawAudio_spec.pth')
    torch.save(model, "YAMNETRawAudio_spec.pt")

    return model


In [None]:

def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    y_true = []
    y_pred = []
    inference_start_time = time.time()

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

    inference_end_time = time.time()
    inference_time = inference_end_time - inference_start_time
    print(f"Inference Time: {inference_time:.4f} seconds")
    
    test_accuracy = correct / total
    print('Test Accuracy:', test_accuracy)

    # Confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    print("Confusion Matrix:")
    print(conf_matrix)

    # Classification report
    print("Classification Report:")
    print(classification_report(y_true, y_pred, target_names=test_dataset.classes))

    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=test_dataset.classes, yticklabels=test_dataset.classes)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix (Test)')
    plt.show()

    # Plot classification report
    plt.figure(figsize=(8, 6))
    sns.heatmap(pd.DataFrame.from_dict(classification_report(y_true, y_pred, target_names=test_dataset.classes, output_dict=True)), annot=True, cmap='Blues')
    plt.xlabel('Metrics')
    plt.ylabel('Classes')
    plt.title('Classification Report (Test)')
    plt.show()

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Bayesian optimization

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class YAMNet(nn.Module):
    def __init__(self, num_classes, num_samples, kernel_size):
        super(YAMNet, self).__init__()
        self.num_classes = num_classes
        self.num_samples = num_samples

        # Define the layers
        self.conv1 = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=kernel_size, stride=2, padding=1),
            nn.BatchNorm1d(32, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv1 = nn.Sequential(
            nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1, groups=32),
            # nn.BatchNorm1d(32, eps=1e-4),
            nn.Conv1d(64, 64, kernel_size=1),
            nn.BatchNorm1d(64, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv2 = nn.Sequential(
            nn.Conv1d(64, 128, kernel_size=3, stride=2, padding=1, groups=64),
            # nn.BatchNorm1d(64, eps=1e-4),
            nn.Conv1d(128, 128, kernel_size=1),
            nn.BatchNorm1d(128, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )

        self.separable_conv3 = nn.Sequential(
            nn.Conv1d(128, 128, kernel_size=3, stride=1, padding=1, groups=128),
            nn.Conv1d(128, 128, kernel_size=1),
            nn.BatchNorm1d(128, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv4 = nn.Sequential(
            nn.Conv1d(128, 256, kernel_size=3, stride=2, padding=1, groups=128),
            nn.Conv1d(256, 256, kernel_size=1),
            nn.BatchNorm1d(256, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )

        self.separable_conv5 = nn.Sequential(
            nn.Conv1d(256, 256, kernel_size=3, stride=1, padding=1, groups=256),
            nn.Conv1d(256, 256, kernel_size=1),
            nn.BatchNorm1d(256, eps=1e-4),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv6 = nn.Sequential(
            nn.Conv1d(256, 512, kernel_size=3, stride=2, padding=1, groups=256),
            # nn.BatchNorm1d(256),
            nn.Conv1d(512, 512, kernel_size=1),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv7 = nn.Sequential(
            nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1, groups=512),
            # nn.BatchNorm1d(512),
            nn.Conv1d(512, 512, kernel_size=1),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv8 = nn.Sequential(
            nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1, groups=512),
            # nn.BatchNorm1d(512),
            nn.Conv1d(512, 512, kernel_size=1),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv9 = nn.Sequential(
            nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1, groups=512),
            # nn.BatchNorm1d(512),
            nn.Conv1d(512, 512, kernel_size=1),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv10 = nn.Sequential(
            nn.Conv1d(512, 1024, kernel_size=3, stride=2, padding=1, groups=512),
            nn.Conv1d(1024, 1024, kernel_size=1),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(inplace=True)
        )
        self.separable_conv11 = nn.Sequential(
            nn.Conv1d(1024, 1024, kernel_size=3, stride=2, padding=1, groups=1024),
            nn.Conv1d(1024, 1024, kernel_size=1),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(inplace=True)
        )
        self.global_pool = nn.AdaptiveAvgPool1d(1)
        self.classifier = nn.Linear(1024, num_classes)

        self.apply(init_weights_he)

    def forward(self, x):
        # print(x.shape)
        # x = x.view(-1, 1, self.num_samples)
        x=x.unsqueeze(1)
        # print("after", x.shape)

        # Apply the convolutional layers
        x = F.relu(self.conv1(x))
        x = self.separable_conv1(x)
        x = self.separable_conv2(x)
        x = self.separable_conv3(x)
        x = self.separable_conv4(x)
        x = self.separable_conv5(x)
        x = self.separable_conv6(x)
        x = self.separable_conv7(x)
        x = self.separable_conv8(x)
        x = self.separable_conv9(x)
        x = self.separable_conv10(x)
        x = self.separable_conv11(x)
        
        # Global average pooling
        x = self.global_pool(x)
        
        # Flatten the output
        x = x.view(x.size(0), -1)
        
        # Classifier
        x = self.classifier(x)
        return x


In [None]:
def validate_model(model, val_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

In [None]:
from skopt.space import Real, Integer
from skopt import gp_minimize
from functools import partial

# Define hyperparameter space for Bayesian optimization
search_space = [
                Integer(1, 4, name='kernel_size'),
                Integer(1, 2, name='stride'),
                ]

sample_rate = 44100
results_file = "yam_optim.txt"

# Perform Bayesian optimization
@use_named_args(search_space)
def optimize_model(kernel_size, stride):
    # Define model architecture and other necessary components
    model = YAMNet(num_classes=len(train_dataset.classes),
                   num_samples=desired_duration * sample_rate,
                   kernel_size=kernel_size, stride=stride, padding=1).to(device)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0003)
    
    # Train the model
    trained_model = train_model(model, train_loader, validation_loader, criterion, optimizer, num_epochs=50)
    val_accuracy = validate_model(trained_model, validation_loader)
    
    print(f"Learning Rate: 0.0003,kernel_size:{kernel_size},stride:{stride},padding:1, Validation Accuracy: {val_accuracy}")

    with open(results_file, 'a') as f:
        f.write(f"Learning Rate: 0.0003,kernel_size: {kernel_size}, stride:{stride},padding:1,Validation Accuracy: {val_accuracy}\n")
    # Return the validation accuracy as the optimization target
    return -val_accuracy 

# Set the number of optimization iterations
n_calls = 20

# Run the optimization
res_gp = gp_minimize(partial(optimize_model),
                     search_space,
                     n_calls=n_calls,
                     random_state=42)

# Get best hyperparameters
best_params = dict(zip(['kernel_size', 'stride',], res_gp.x))
print("Best hyperparameters:", best_params)

# Train model with best hyperparameters
best_accuracy = -res_gp.fun
print("Best accuracy:", best_accuracy)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create the model

In [None]:
model = YAMNet(num_classes=3, num_samples=6*16000).to(device)
print(model)

In [None]:
model_summary(model)

# Initialize model, criterion, and optimizer

In [None]:
res_gp.x[0]

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0002, weight_decay = 0.0001)


# Train the model

In [None]:
train_model(model, train_loader, validation_loader, criterion, optimizer, num_epochs=100)

# Evaluate the model

In [None]:
evaluate_model(model, test_loader)