In [1]:
import torch.nn as nn
import torch
import torch.nn.functional as F
from tqdm import tqdm

class MaqamCNN1(nn.Module):
    def __init__(self):
        super(MaqamCNN1, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=30, out_channels=36, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(36)
        self.pool1 = nn.MaxPool2d(kernel_size=1, stride=2)
        self.dropout1 = nn.Dropout(p=0.2)
        
        self.conv2 = nn.Conv2d(in_channels=36, out_channels=20, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(20)
        self.pool2 = nn.MaxPool2d(kernel_size=1, stride=2)
        self.dropout2 = nn.Dropout(p=0.1)

        self.fc1 = nn.Linear(8460, 256)
        self.dropout3 = nn.Dropout(p=0.1)

        self.fc2 = nn.Linear(512, 265)
        self.dropout4 = nn.Dropout(p=0.1)

        self.fc3 = nn.Linear(256, 128)
        self.dropout5 = nn.Dropout(p=0.1)

        self.fc4 = nn.Linear(128, 8)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.unsqueeze(-1)
        x = F.relu(self.conv1(x))
        x = self.bn1(x)
        x = self.pool1(x)
        x = self.dropout1(x)
        
        # x = F.relu(self.conv2(x))
        # x = self.bn2(x)
        # x = self.pool2(x)
        # x = self.dropout2(x)
        
        x = x.view(x.size(0), -1)
        
        x = F.relu(self.fc1(x))
        x = self.dropout3(x)
        
        # x = F.relu(self.fc2(x))
        # x = self.dropout4(x)

        x = self.fc3(x)
        x = self.dropout5(x)

        x = self.fc4(x)
        x = F.softmax(x, dim=1)
        return x


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class CNN_LSTM(nn.Module):
    def __init__(self):
        super(CNN_LSTM, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=30, out_channels=64, kernel_size=(3,3), padding="same")
        self.bn1 = nn.BatchNorm2d(64)
        self.pool1 = nn.MaxPool2d(kernel_size=(1,1))
        self.dropout1 = nn.Dropout(p=0.1)
        
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(1,1), padding="valid")
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=(1,1))
        self.dropout2 = nn.Dropout(p=0.2)

        self.lstm = nn.LSTM(input_size=64, hidden_size=128, num_layers=2, batch_first=True)
        self.dropout6 = nn.Dropout(p=0.25)

        self.fc1 = nn.Linear(128, 64)
        self.dropout3 = nn.Dropout(p=0.2)

        self.fc2 = nn.Linear(64, 32)
        self.dropout4 = nn.Dropout(p=0.2)

        self.fc3 = nn.Linear(32, 8)
        self.dropout5 = nn.Dropout(p=0.2)

    def forward(self, x):
        x = x.unsqueeze(-1)
        x = F.relu(self.conv1(x))
        x = self.bn1(x)
        x = self.pool1(x)
        x = self.dropout1(x)
        
        x = F.relu(self.conv2(x))
        x = self.bn2(x)
        x = self.pool2(x)
        x = self.dropout2(x)

        # Reshape the CNN output to match LSTM input
        batch_size, channels, height, width = x.size()
        x = x.view(batch_size, channels, height * width)
        x = x.permute(0, 2, 1)  # Permute to (batch_size, sequence_length, input_size)

        x, _ = self.lstm(x)

        x = self.fc1(x[:, -1, :])
        
        x = F.relu(self.fc2(x))
        x = self.dropout3(x)
        
        x = F.relu(self.fc3(x))
        # x = self.dropout4(x)

        # x = self.fc3(x)
        # x = self.dropout5(x)
        x = F.softmax(x, dim=1)
        return x


In [3]:
import torch.nn as nn
import torch.nn.functional as F

class MFCC_LSTM(nn.Module):
    def __init__(self):
        super(MFCC_LSTM, self).__init__()
        
        self.lstm1 = nn.LSTM(input_size=20, hidden_size=1024, num_layers=1, batch_first=True)
        self.dropout1 = nn.Dropout(p=0.3)
        self.lstm2 = nn.LSTM(input_size=1024, hidden_size=512, num_layers=1, batch_first=True)
        self.dropout2 = nn.Dropout(p=0.3)
        self.lstm3 = nn.LSTM(input_size=512, hidden_size=256, num_layers=1, batch_first=True)
        self.dropout3 = nn.Dropout(p=0.2)
        
        self.fc1 = nn.Linear(256, 512)
        self.dropout4 = nn.Dropout(p=0.2)

        self.fc2 = nn.Linear(512, 256)
        self.dropout5 = nn.Dropout(p=0.2)

        self.fc3 = nn.Linear(256, 64)
        self.dropout6 = nn.Dropout(p=0.2)

        self.fc4 = nn.Linear(64, 8)

    def forward(self, x):
        x = torch.transpose(x, 1, 2)
        
        x, _ = self.lstm1(x)
        x = self.dropout1(x)

        x, _ = self.lstm2(x)
        x = self.dropout2(x)

        x, _ = self.lstm3(x)
        x = self.dropout3(x)

        x = x[:,-1,:]
        
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout4(x)

        x = self.fc2(x)
        x = F.relu(x)
        x = self.dropout5(x)

        x = self.fc3(x)
        x = F.relu(x)
        x = self.dropout6(x)

        
        x = self.fc4(x)
        x = F.softmax(x, dim=1)
        return x


In [4]:
import torch.nn as nn
import torch.nn.functional as F

class MFCC_LSTM2(nn.Module):
    def __init__(self):
        super(MFCC_LSTM2, self).__init__()
        
        self.lstm1 = nn.LSTM(input_size=20, hidden_size=1024, num_layers=1, batch_first=True)
        self.dropout1 = nn.Dropout(p=0.2)

        self.fc3 = nn.Linear(1024, 256)
        self.dropout6 = nn.Dropout(p=0.2)

        self.fc4 = nn.Linear(256, 8)

    def forward(self, x):
        x = torch.transpose(x, 1, 2)
        # Apply the first LSTM layer
        x, _ = self.lstm1(x)
        x = self.dropout1(x)
        
        x = self.fc3(x)
        x = F.relu(x)
        x = self.dropout6(x)

        # Take the last hidden state of the last LSTM layer
        x = x[:, -1, :]
        
        x = self.fc4(x)
        x = F.softmax(x, dim=1)
        
        return x


In [5]:
class ANNModel(nn.Module):
    def __init__(self):
        super(ANNModel, self).__init__()

        self.fc1 = nn.Linear(12920, 1024)
        self.bn1 = nn.BatchNorm1d(1024)
        self.dropout1 = nn.Dropout(p=0.2)
        
        self.fc2 = nn.Linear(1024, 64)
        self.bn2 = nn.BatchNorm1d(64)
        self.dropout2 = nn.Dropout(p=0.2)
        
        self.fc3 = nn.Linear(256, 64)
        self.bn3 = nn.BatchNorm1d(64)
        self.dropout3 = nn.Dropout(p=0.1)
        
        self.fc4 = nn.Linear(1024, 256)
        self.bn4 = nn.BatchNorm1d(256)
        self.dropout4 = nn.Dropout(p=0.2)
        
        self.fc5 = nn.Linear(256, 64)
        self.bn5 = nn.BatchNorm1d(64)
        self.dropout5 = nn.Dropout(p=0.1)
        
        self.fc6 = nn.Linear(512, 256)
        self.bn6 = nn.BatchNorm1d(256)
        self.dropout6 = nn.Dropout(p=0.0)
        
        self.fc7 = nn.Linear(256, 32)
        self.bn7 = nn.BatchNorm1d(32)
        self.dropout7 = nn.Dropout(p=0.0)

        self.output_layer = nn.Linear(64, 8)

    def forward(self, x):
        # Flatten the input data to [batch_size, 30 * 469]
        x = x.view(x.size(0), -1)
        
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout1(x)

        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout2(x)

        # x = self.fc3(x)
        # x = self.bn3(x)
        # x = F.relu(x)
        # x = self.dropout3(x)

        # x = self.fc4(x)
        # x = self.bn4(x)
        # x = F.relu(x)
        # x = self.dropout4(x)

        # x = self.fc5(x)
        # x = self.bn5(x)
        # x = F.relu(x)
        # x = self.dropout5(x)

        # x = self.fc6(x)
        # x = self.bn6(x)
        # x = F.relu(x)
        # x = self.dropout6(x)

        # x = self.fc7(x)
        # x = self.bn7(x)
        # x = F.relu(x)
        # x = self.dropout7(x)

        x = self.output_layer(x)
        x = F.softmax(x, dim=1)
        return x

In [6]:
class ANNModel2(nn.Module):
    def __init__(self):
        super(ANNModel2, self).__init__()

        self.fc1 = nn.Linear(20, 1024)
        self.bn1 = nn.BatchNorm1d(1024)
        self.dropout1 = nn.Dropout(p=0.2)
        
        self.fc2 = nn.Linear(1024, 512)
        self.bn2 = nn.BatchNorm1d(512)
        self.dropout2 = nn.Dropout(p=0.2)
        
        self.fc3 = nn.Linear(512, 256)
        self.bn3 = nn.BatchNorm1d(256)
        self.dropout3 = nn.Dropout(p=0.1)
        
        self.fc4 = nn.Linear(256, 128)
        self.bn4 = nn.BatchNorm1d(128)
        self.dropout4 = nn.Dropout(p=0)
        
        self.fc5 = nn.Linear(128, 64)
        self.bn5 = nn.BatchNorm1d(64)
        self.dropout5 = nn.Dropout(p=0)

        self.output_layer = nn.Linear(64, 8)

    def forward(self, x):
        # Flatten the input data to [batch_size, 30 * 469]
        # x = x.view(x.size(0), -1)
        x = x.float()
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout1(x)

        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout2(x)

        x = self.fc3(x)
        x = self.bn3(x)
        x = F.relu(x)
        x = self.dropout3(x)

        x = self.fc4(x)
        x = self.bn4(x)
        x = F.relu(x)
        x = self.dropout4(x)

        x = self.fc5(x)
        x = self.bn5(x)
        x = F.relu(x)
        x = self.dropout5(x)

        x = self.output_layer(x)
        x = F.softmax(x, dim=1)
        return x

In [7]:
import os
import torchaudio
import torch
from torch.utils.data import Dataset
import torch.nn as nn
import torch.nn.functional as F
import librosa
import numpy as np
import pickle
from sklearn.model_selection import train_test_split
from tensorboardX import SummaryWriter
from features_extractor import*

class MaqamDataset(Dataset):
    def __init__(self, mode='train', transform=None, cache_file='1.pkl'):
        self.data_dir = "/home/faisal/Desktop/MAQAMAT/Maqam478/Dataset2"
        # "/home/faisal/Desktop/datasetfull"
        # "/home/faisal/Desktop/MAQAMAT/mp3_maqamat/Readers/Saed_ghamdi"
        # "/home/faisal/Desktop/MAQAMAT/Maqam478/Dataset2"
        # "/home/faisal/Desktop/MAQAMAT/Fullynewdataset"
        # "/home/faisal/Desktop/MAQAMAT/fullqualitydataset"
        self.mode = mode
        self.transform = transform
        test_val_size = 0.3
        #['Ajam', 'Bayat', 'Hijaz', 'Kurd', 'Nahawand', 'Rast', 'Saba', 'Seka']
        self.maqams = ['Ajam', 'Bayat', 'Hijaz', 'Kurd', 'Nahawand', 'Rast', 'Saba', 'Seka']
        self.audio_list = self._load_audio_list()
        if mode == 'train':
            train_list, val_list = train_test_split(self.audio_list, test_size=test_val_size, random_state=42, stratify=[label for (_, label) in self.audio_list])
            self.audio_list = train_list
            self.features = compute_features(self.audio_list, cache_file="train_features.pkl")
        elif mode == 'val':
            train_list, set2 = train_test_split(self.audio_list, test_size=test_val_size, random_state=42, stratify=[label for (_, label) in self.audio_list])
            val_set, test_set = train_test_split(set2, test_size=0.5, random_state=42, stratify=[label for (_, label) in set2])
            self.audio_list = val_set
            self.features = compute_features(self.audio_list, cache_file="val_features.pkl")
        elif mode == 'test':
            train_list, set2 = train_test_split(self.audio_list, test_size=test_val_size, random_state=42, stratify=[label for (_, label) in self.audio_list])
            val_set, test_set = train_test_split(set2, test_size=0.5, random_state=42, stratify=[label for (_, label) in set2])
            self.audio_list = test_set
            self.features = compute_features(self.audio_list, cache_file="test_features.pkl")
        
        self.cache_file = cache_file
        self.data = self._load_data_from_cache_or_compute()

    def _load_audio_list(self):
        audio_list = []
        for i, maqam in enumerate(self.maqams):
            label_dir = os.path.join(self.data_dir, maqam)
            audio_list += [(os.path.join(label_dir, audio_name), i) for audio_name in os.listdir(label_dir) if audio_name.endswith('.wav')]
        return audio_list

    def __len__(self):
        return len(self.audio_list)

    def __getitem__(self, idx):
        audio_path, label_idx = self.audio_list[idx]
        waveform, sample_rate = torchaudio.load(audio_path)
        waveform = waveform[0] # only keep the first channel
        if self.transform:
            waveform = self.transform(waveform)
        
        # mfcc = self.compute_mfcc(waveform).T
        # mfcc = torch.from_numpy(mfcc).float()
        # features = get_all_features(audio_path)
        # features = torch.from_numpy(features).float()
        features = self.features[idx]
        return features, label_idx
  
    def pad_to_max_length(self, max_length):
        for i in range(len(self)):
            padded_data = F.pad(self.data[i][0], (0, max_length - len(self.data[i][0])), 'constant', 0)
            self.data[i] = (padded_data, self.data[i][1])

    def compute_mfcc(self, waveform):
        # Compute the MFCC of the waveform
        n_fft = 1024*16
        hop_length = 256*8
        n_mels = 100
        sr = 48000
        n_mfcc = 20
        waveform = waveform.numpy()  # Convert PyTorch tensor to NumPy array
        mfcc = librosa.feature.mfcc(y=waveform, sr=sr)
        mfcc = np.transpose(mfcc)
        mfcc = mfcc.astype(np.float32)  # Ensure data type is compatible with np.issubdtype()
        return mfcc
    
    def _load_data_from_cache_or_compute(self):
        if os.path.isfile(self.cache_file):
            print(f'Loading data from cache file: {self.cache_file}')
            with open(self.cache_file, 'rb') as f:
                return pickle.load(f)
        else:
            print(f'Cache file not found. Computing data from scratch and saving to cache file: {self.cache_file}')
            data = [self.__getitem__(i) for i in range(len(self))]
            with open(self.cache_file, 'wb') as f:
                pickle.dump(data, f)
            return data


In [8]:
import torch
from torch.utils.data import DataLoader
import librosa
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence

max_length = 1440000

def MFCC_plot(mfcc):
        plt.figure(figsize=(10, 4))
        mfcc = mfcc.detach().numpy()
        mfcc = mfcc.mean(axis=2).T
        librosa.display.specshow(mfcc, x_axis='time')
        plt.colorbar()
        plt.title('MFCC')
        plt.tight_layout()
        plt.show()

def custom_collate(batch):
    inputs, labels = zip(*batch)
    max_frames = max([m.shape[1] for m in inputs])
    padded_mfcc = []
    for m in inputs:
        pad_width = ((0, 0), (0, max_frames - m.shape[1]))
        padded_m = np.pad(m, pad_width=pad_width, mode='constant')
        padded_mfcc.append(padded_m)

    padded_mfcc = torch.from_numpy(np.array(padded_mfcc)).float()
    labels = torch.tensor(labels)
    return padded_mfcc, labels



In [9]:
# Define training and validation datasets with specified test size


# Define training and validation data loaders
batch_size = 64

In [10]:
# torch.cuda.init()
# torch.cuda.empty_cache()
option = 3 #1-CNN 2-LSTM 3-ANN

In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm
if(option == 1):
    train_dataset = MaqamDataset(mode='train', cache_file = '2.pkl')
    val_dataset = MaqamDataset(mode='val', cache_file = 'validation.pkl')
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=custom_collate)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, collate_fn=custom_collate)
    l = 0.001
    # Lists to store accuracy and loss values
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []
    print("_________________________________________________________")
    print("Learning rate = ", l)
    # Rest of the model definition and data loaders

    # Initialize model and define loss function and optimizer
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    model = MaqamCNN1().to(device)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=l)

    # Train the model for a specified number of epochs
    num_epochs = 40
    patience = 40  # Number of epochs to wait for improvement before early stopping
    best_val_loss = float('inf')
    best_model_state_dict = None
    no_improvement_epochs = 0

    print("Starting training")
    for epoch in range(num_epochs):
        # Set the model to training mode for the current epoch
        model.train()

        # Training loop
        running_loss = 0.0
        correct_predictions = 0
        total_samples = 0

        for i, data in enumerate(tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False)):
            inputs, targets = data  # MFCCs and labels
            targets = targets.to(device)
            inputs = inputs.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            # Update the loss and accuracy metrics
            running_loss += loss.item()
            _, predicted_labels = torch.max(outputs, 1)
            correct_predictions += (predicted_labels == targets).sum().item()
            total_samples += len(targets)

        # Calculate and print average loss and accuracy for the current epoch
        avg_loss = running_loss / len(train_loader)
        avg_accuracy = 100 * correct_predictions / total_samples
        print(f'Epoch {epoch + 1}/{num_epochs}: Train Loss={avg_loss:.5f}, Train Accuracy={avg_accuracy:.5f}%')

        # Validation loop
        model.eval()
        val_loss = 0.0
        total_correct = 0
        total_samples = 0

        with torch.no_grad():
            for data in tqdm(val_loader, desc='Validation', leave=False):
                inputs, targets = data  # MFCCs and labels
                targets = targets.to(device)
                inputs = inputs.to(device)
                outputs = model(inputs)
                val_loss += criterion(outputs, targets).item() * len(targets)

                _, predicted_labels = torch.max(outputs, 1)
                total_correct += (predicted_labels == targets).sum().item()
                total_samples += len(targets)

        val_loss /= len(val_dataset)
        val_acc = 100 * total_correct / total_samples

        train_losses.append(avg_loss)
        train_accuracies.append(avg_accuracy)
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)

        print(f'Epoch {epoch + 1}/{num_epochs} validation: val_loss={val_loss:.5f}, val_acc={val_acc:.5f}%')

        # Check for early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state_dict = model.state_dict()
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print("Early stopping. No improvement in validation loss for {} epochs.".format(patience))
            break

    # Load the best model state dict
    if best_model_state_dict is not None:
        model.load_state_dict(best_model_state_dict)

    # Test the model on the test dataset
    test_dataset = MaqamDataset(mode='test', cache_file='test.pkl')
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate)

    model.eval()
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for data in tqdm(test_loader, desc='Testing', leave=False):
            inputs, targets = data
            targets = targets.to(device)
            inputs = inputs.to(device)
            outputs = model(inputs)

            _, predicted_labels = torch.max(outputs, 1)
            total_correct += (predicted_labels == targets).sum().item()
            total_samples += len(targets)

    test_acc = 100 * total_correct / total_samples
    print(f'Test Accuracy: {test_acc:.5f}%')

    # Save the trained model
    torch.save(model.state_dict(), 'CNN1.pth')
    plt.figure()
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Train Loss')
    plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title(f'Learning Rate: {l}')
    plt.savefig(f'CNN loss_plot.png')
    plt.close()

    plt.figure()
    plt.plot(range(1, len(train_accuracies) + 1), train_accuracies, label='Train Accuracy')
    plt.plot(range(1, len(val_accuracies) + 1), val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title(f'Learning Rate: {l}')
    plt.savefig(f'CNN accuracy_plot.png')
    plt.close()

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm
if(option == 2):
    l = 0.001
    # Lists to store accuracy and loss values
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []
    train_dataset = MaqamDataset(mode='train', cache_file = 'train.pkl')
    val_dataset = MaqamDataset(mode='val', cache_file = 'val.pkl')
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=custom_collate)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate)
    print("_________________________________________________________")
    print("Learning rate = ", l)
    # Rest of the model definition and data loaders

    # Initialize model and define loss function and optimizer
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    model = MFCC_LSTM().to(device)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=l)

    # Train the model for a specified number of epochs
    num_epochs = 40
    patience = 5  # Number of epochs to wait for improvement before early stopping
    best_val_loss = float('inf')
    best_model_state_dict = None
    no_improvement_epochs = 0

    print("Starting training")
    for epoch in range(num_epochs):
        # Set the model to training mode for the current epoch
        model.train()

        # Training loop
        running_loss = 0.0
        correct_predictions = 0
        total_samples = 0

        for i, data in enumerate(tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False)):
            inputs, targets = data  # MFCCs and labels
            targets = targets.to(device)
            inputs = inputs.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            # Update the loss and accuracy metrics
            running_loss += loss.item()
            _, predicted_labels = torch.max(outputs, 1)
            correct_predictions += (predicted_labels == targets).sum().item()
            total_samples += len(targets)

        # Calculate and print average loss and accuracy for the current epoch
        avg_loss = running_loss / len(train_loader)
        avg_accuracy = 100 * correct_predictions / total_samples
        print(f'Epoch {epoch + 1}/{num_epochs}: Train Loss={avg_loss:.5f}, Train Accuracy={avg_accuracy:.5f}%')

        # Validation loop
        model.eval()
        val_loss = 0.0
        total_correct = 0
        total_samples = 0

        with torch.no_grad():
            for data in tqdm(val_loader, desc='Validation', leave=False):
                inputs, targets = data  # MFCCs and labels
                targets = targets.to(device)
                inputs = inputs.to(device)
                outputs = model(inputs)
                val_loss += criterion(outputs, targets).item() * len(targets)

                _, predicted_labels = torch.max(outputs, 1)
                total_correct += (predicted_labels == targets).sum().item()
                total_samples += len(targets)

        val_loss /= len(val_dataset)
        val_acc = 100 * total_correct / total_samples

        train_losses.append(avg_loss)
        train_accuracies.append(avg_accuracy)
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)

        print(f'Epoch {epoch + 1}/{num_epochs} validation: val_loss={val_loss:.5f}, val_acc={val_acc:.5f}%')

        # Check for early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state_dict = model.state_dict()
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print("Early stopping. No improvement in validation loss for {} epochs.".format(patience))
            break

    # Load the best model state dict
    if best_model_state_dict is not None:
        model.load_state_dict(best_model_state_dict)

    # Test the model on the test dataset
    test_dataset = MaqamDataset(mode='test', cache_file='1.pkl')
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate)

    model.eval()
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for data in tqdm(test_loader, desc='Testing', leave=False):
            inputs, targets = data
            targets = targets.to(device)
            inputs = inputs.to(device)
            outputs = model(inputs)

            _, predicted_labels = torch.max(outputs, 1)
            total_correct += (predicted_labels == targets).sum().item()
            total_samples += len(targets)

    test_acc = 100 * total_correct / total_samples
    print(f'Test Accuracy: {test_acc:.5f}%')

    # Save the trained model
    torch.save(model.state_dict(), 'lstm.pth')
    plt.figure()
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Train Loss')
    plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title(f'Learning Rate: {l}')
    plt.savefig(f'LSTM loss_plot_lr_{l}.png')
    plt.close()

    plt.figure()
    plt.plot(range(1, len(train_accuracies) + 1), train_accuracies, label='Train Accuracy')
    plt.plot(range(1, len(val_accuracies) + 1), val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title(f'Learning Rate: {l}')
    plt.savefig(f'LSTM accuracy_plot_lr_{l}.png')
    plt.close()


In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm

writer = SummaryWriter('logs/')
if(option == 3):
    l = 0.0001
    # Lists to store accuracy and loss values
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []

    train_dataset = MaqamDataset(mode='train', cache_file = 'train.pkl')
    val_dataset = MaqamDataset(mode='val', cache_file = 'val.pkl')
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
    print("_________________________________________________________")
    print("Learning rate = ", l)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = ANNModel2().to(device)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=l)

    num_epochs = 45
    patience = 45
    best_val_loss = float('inf')
    best_model_state_dict = None
    no_improvement_epochs = 0

    print("Starting training")
    for epoch in range(num_epochs):
        model.train()
        # new_lr = (num_epochs-epoch)/(num_epochs*10000)
        # for param_group in optimizer.param_groups:
        #     param_group['lr'] = new_lr
        running_loss = 0.0
        correct_predictions = 0
        total_samples = 0
        writer.add_scalar('Learning Rate', optimizer.param_groups[0]['lr'], epoch)
        for i, data in enumerate(tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False)):
            inputs, targets = data  # MFCCs and labels
            targets = targets.to(device)
            inputs = inputs.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted_labels = torch.max(outputs, 1)
            correct_predictions += (predicted_labels == targets).sum().item()
            total_samples += len(targets)

        avg_loss = running_loss / len(train_loader)
        avg_accuracy = 100 * correct_predictions / total_samples
        train_losses.append(avg_loss)
        train_accuracies.append(avg_accuracy)

        # Log training loss and accuracy
        writer.add_scalar('Train Loss', avg_loss, epoch)
        writer.add_scalar('Train Accuracy', avg_accuracy, epoch)

        print(f'Epoch {epoch + 1}/{num_epochs}: Train Loss={avg_loss:.5f}, Train Accuracy={avg_accuracy:.5f}%')

        model.eval()
        val_loss = 0.0
        total_correct = 0
        total_samples = 0

        with torch.no_grad():
            for data in tqdm(val_loader, desc='Validation', leave=False):
                inputs, targets = data  # MFCCs and labels
                targets = targets.to(device)
                inputs = inputs.to(device)
                outputs = model(inputs)
                val_loss += criterion(outputs, targets).item() * len(targets)

                _, predicted_labels = torch.max(outputs, 1)
                total_correct += (predicted_labels == targets).sum().item()
                total_samples += len(targets)

        val_loss /= len(val_dataset)
        val_acc = 100 * total_correct / total_samples
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)
        print(f'Epoch {epoch + 1}/{num_epochs} validation: val_loss={val_loss:.5f}, val_acc={val_acc:.5f}%')
        # Log validation loss and accuracy
        writer.add_scalar('Validation Loss', val_loss, epoch)
        writer.add_scalar('Validation Accuracy', val_acc, epoch)
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state_dict = model.state_dict()
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print("Early stopping. No improvement in validation loss for {} epochs.".format(patience))
            break

        for name, param in model.named_parameters():
            writer.add_histogram(name, param, epoch)
            
    if best_model_state_dict is not None:
        model.load_state_dict(best_model_state_dict)

    # model3_path = "ann71.pth"
    # model.load_state_dict(torch.load(model3_path))
    # Test the model on the test dataset
    test_dataset = MaqamDataset(mode='test', cache_file='test.pkl')
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model.eval()
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for data in tqdm(test_loader, desc='Testing', leave=False):
            inputs, targets = data
            targets = targets.to(device)
            inputs = inputs.to(device)
            outputs = model(inputs)

            _, predicted_labels = torch.max(outputs, 1)
            total_correct += (predicted_labels == targets).sum().item()
            total_samples += len(targets)

    test_acc = 100 * total_correct / total_samples
    print(f'Test Accuracy: {test_acc:.5f}%')
    writer.close()
    # Save the trained model
    torch.save(model.state_dict(), 'ANN.pth')

    plt.figure()
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Train Loss')
    plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title(f'Learning Rate: {l}')
    plt.savefig(f'ANN loss_plot_lr_{l}.png')
    plt.close()

    plt.figure()
    plt.plot(range(1, len(train_accuracies) + 1), train_accuracies, label='Train Accuracy')
    plt.plot(range(1, len(val_accuracies) + 1), val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title(f'Learning Rate: {l}')
    plt.savefig(f'ANN accuracy_plot_lr_{l}.png')
    plt.close()


Cache file not found. Computing features from scratch and saving to cache file: train_features.pkl


Cache file not found. Computing data from scratch and saving to cache file: train.pkl
Cache file not found. Computing features from scratch and saving to cache file: val_features.pkl
Cache file not found. Computing data from scratch and saving to cache file: val.pkl


  return torch._C._cuda_getDeviceCount() > 0


_________________________________________________________
Learning rate =  0.0001
Starting training


                                                 

RuntimeError: mat1 and mat2 shapes cannot be multiplied (64x20 and 37x1024)

In [None]:
path = "/home/faisal/Desktop/best/ANN.pth"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model1 = ANNModel().to(device)
model1.load_state_dict(torch.load(path))
# Test the model on the test dataset
test_dataset = MaqamDataset(mode='test', cache_file='test2.pkl')
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate)

model.eval()
total_correct = 0
total_samples = 0
acc_by_maqam_idx = [0, 1, 2, 3, 4, 5, 6, 7]
acc_by_maqam_ss = [0, 0, 0, 0, 0, 0, 0, 0]
acc_by_maqam_cc = [0, 0, 0, 0, 0, 0, 0, 0]
p = []
t = []
with torch.no_grad():
    for data in tqdm(test_loader, desc='Testing', leave=False):
        inputs, targets = data
        targets = targets.to(device)
        inputs = inputs.to(device)
        outputs = model(inputs)

        _, predicted_labels = torch.max(outputs, 1)
        t += targets
        p += predicted_labels
        total_correct += (predicted_labels == targets).sum().item()
        total_samples += len(targets)
        for i in acc_by_maqam_idx:
            ss = 0
            cc = 0
            a = [-1 if x != i else i for x in targets]
            for counter in range(len(a)):
                if a[counter] != -1:
                    ss +=1
                    if(a[counter]== predicted_labels[counter]):
                        cc +=1
            acc_by_maqam_ss[i] += ss
            acc_by_maqam_cc[i] += cc


test_acc = 100 * total_correct / total_samples
print(f'Test Accuracy: {test_acc:.5f}%')

classes = ['Ajam', 'Bayat', 'Hijaz', 'Kurd', 'Nahawand', 'Rast', 'Saba', 'Seka']
print(f'Accuracy by maqam:')
for i in range(8):
    print(classes[i] + " accurracy = ", acc_by_maqam_cc[i]/acc_by_maqam_ss[i])

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

option = 1
combined_tensor = torch.stack(t)
combined_tensor1 = torch.stack(p)
true = combined_tensor.cpu()
predicted = combined_tensor1.cpu()
if option == 1:
    # Assuming you have predictions and true labels from your model
    # Replace predicted_labels and true_labels with your actual data

    # Calculate the confusion matrix
    cm = confusion_matrix(true, predicted)

    # Create a pandas DataFrame to display the confusion matrix with class names
    class_names = ['Ajam', 'Bayat', 'Hijaz',
                    'Kurd', 'Nahawand', 'Rast', 'Saba', 'Seka']
    cm_df = pd.DataFrame(cm, index=class_names, columns=class_names)

    # Display the confusion matrix using seaborn heatmap
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted Class')
    plt.ylabel('True Class')
    plt.title('Confusion Matrix')

    # Save the confusion matrix plot as a PNG image
    plt.savefig('confusion_matrix.jpeg', format='jpeg')

    # Show the plot (optional)
    plt.show()

    # Save the confusion matrix to a CSV file
    cm_df.to_csv('confusion_matrix.csv')
option=2
if option == 2:

    # Calculate the confusion matrix
    cm = confusion_matrix(true, predicted)

    # Calculate the probability representation of the confusion matrix
    cm_probability = cm / cm.sum()

    # Create a pandas DataFrame to display the confusion matrix with class names
    class_names = ['Ajam', 'Bayat', 'Hijaz',
                    'Kurd', 'Nahawand', 'Rast', 'Saba', 'Seka']
    cm_df_probability = pd.DataFrame(
        cm_probability, index=class_names, columns=class_names)

    # Display the confusion matrix with probabilities using seaborn heatmap
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm_df_probability, annot=True, fmt='.2f', cmap='Blues')
    plt.xlabel('Predicted Class')
    plt.ylabel('True Class')
    plt.title('Confusion Matrix (Probability)')

    # Save the confusion matrix plot as a PNG image
    plt.savefig('confusion_matrix_probability.png', format='png')

    # Show the plot (optional)
    plt.show()
    print("Option2")

In [None]:
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader

# Define your batch size and other parameters
batch_size = 32
# Add any other parameters you need for your models and test settings

# Load the test dataset
test_dataset = MaqamDataset(mode='test', cache_file='test.pkl')  # Replace with the correct parameters for your dataset
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate)

# Load your models and their weights (assuming you have already defined and loaded them as shown in the previous examples)
model1 = MaqamCNN1().to(device)  # Replace YourModel with the actual model architecture of the first model
model2 = MFCC_LSTM2().to(device)  # Replace YourModel with the actual model architecture of the second model
model3 = ANNModel().to(device)  # Replace YourModel with the actual model architecture of the third model

model1_path = "cnn78.pth"
model2_path = "lstm8359.pth"
model3_path = "ann1.pth"

model1.load_state_dict(torch.load(model1_path))
model2.load_state_dict(torch.load(model2_path))
model3.load_state_dict(torch.load(model3_path))

# Set the models to evaluation mode
model1.eval()
model2.eval()
model3.eval()

# Define the weights for combining the models
weight1 = 0.33
weight2 = 0.33
weight3 = 0.34

# Prepare lists to store the final predictions and corresponding labels
all_predictions = []
all_labels = []

# Prepare lists to store the predictions and labels for each model independently
model1_predictions = []
model2_predictions = []
model3_predictions = []

# Loop through the test data
with torch.no_grad():
    for inputs, labels in test_loader:
        # Assuming inputs and labels are tensors, otherwise, convert them to tensors
        inputs = inputs.to(device)  # Move data to the device (CPU or GPU)
        labels = labels.to(device)

        # Get the predictions from each model
        predictions1 = model1(inputs)
        predictions2 = model2(inputs)
        predictions3 = model3(inputs)

        # Save the predictions of each model for individual accuracy calculation
        model1_predictions.extend(predictions1.argmax(dim=1).cpu().numpy())
        model2_predictions.extend(predictions2.argmax(dim=1).cpu().numpy())
        model3_predictions.extend(predictions3.argmax(dim=1).cpu().numpy())

        # Combine the predictions using the specified weights
        combined_predictions = weight1 * predictions1 + weight2 * predictions2 + weight3 * predictions3

        # Apply softmax to get the probabilities
        probabilities = F.softmax(combined_predictions, dim=1)

        # Get the class with the highest probability as the predicted class
        _, predicted_labels = torch.max(probabilities, 1)

        # Append the predictions and labels to the lists
        all_predictions.extend(predicted_labels.cpu().numpy())  # Convert back to CPU and extract the numpy array
        all_labels.extend(labels.cpu().numpy())  # Convert back to CPU and extract the numpy array

# Calculate the accuracy of each model independently
correct_model1 = sum([1 for pred, true in zip(model1_predictions, all_labels) if pred == true])
correct_model2 = sum([1 for pred, true in zip(model2_predictions, all_labels) if pred == true])
correct_model3 = sum([1 for pred, true in zip(model3_predictions, all_labels) if pred == true])

total_samples = len(all_labels)
acc_model1 = correct_model1 / total_samples * 100
acc_model2 = correct_model2 / total_samples * 100
acc_model3 = correct_model3 / total_samples * 100

print(f'Model 1 Accuracy: {acc_model1:.5f}%')
print(f'Model 2 Accuracy: {acc_model2:.5f}%')
print(f'Model 3 Accuracy: {acc_model3:.5f}%')

# Calculate the combined accuracy
correct_combined = sum([1 for pred, true in zip(all_predictions, all_labels) if pred == true])
acc_combined = correct_combined / total_samples * 100
print(f'Combined Accuracy: {acc_combined:.5f}%')