In [1]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os, pathlib, glob, random
import numpy as np
import matplotlib.pyplot as plt 
from sklearn.metrics import confusion_matrix
import scipy
from scipy import io

In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

cuda:0


In [3]:
batch_size = 32
output_nodes = 2
learning_rate = 0.003

In [4]:
import os
import random
import scipy.io
import numpy as np
from torch.utils.data import Dataset, DataLoader

# Paths for both datasets
train_data_paths = [
    r"/kaggle/input/jpd-df2-lfcc/LFCC_T1/train",  # Language 1
    r"/kaggle/input/jpd-df2-lfcc-t2/LFCC/train"   # Language 2
]
validation_data_paths = [
    r"/kaggle/input/jpd-df2-lfcc/LFCC_T1/val",
    r"/kaggle/input/jpd-df2-lfcc-t2/LFCC/val"
]

class MixedPtDataset(Dataset):
    def __init__(self, directories):
        """Load features from multiple directories."""
        self.files = []
        self.class_to_idx = {}

        for directory in directories:
            classes = sorted(entry.name for entry in os.scandir(directory) if entry.is_dir())
            
            # Assign class indices if not already assigned
            for c in classes:
                if c not in self.class_to_idx:
                    self.class_to_idx[c] = len(self.class_to_idx)

            for c in classes:
                c_dir = os.path.join(directory, c)
                c_files = [(os.path.join(c_dir, f), self.class_to_idx[c]) for f in os.listdir(c_dir)]
                self.files.extend(c_files)

        random.shuffle(self.files)

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        filepath, label = self.files[idx]
        try:
            mat_vals = scipy.io.loadmat(filepath)
            data = mat_vals['final'].T
            max_len = 800
            if max_len > data.shape[0]:
                pad_width = max_len - data.shape[0]
                data = np.pad(data, pad_width=((0, pad_width), (0, 0)), mode='constant')
            else:
                data = data[:max_len, :]
        except Exception as e:
            print(f"Error loading file {filepath}: {str(e)}")
            return None
        return data, label

# Combine both datasets
train_dataset = MixedPtDataset(train_data_paths)
val_dataset = MixedPtDataset(validation_data_paths)

class PtDataLoader(DataLoader):
    def __init__(self, directories, batch_size, shuffle=True):
        dataset = MixedPtDataset(directories)
        super().__init__(dataset, batch_size=batch_size, shuffle=shuffle)

# Load mixed datasets
batch_size = 32
train_dataloader = PtDataLoader(directories=train_data_paths, batch_size=batch_size)
val_dataloader = PtDataLoader(directories=validation_data_paths, batch_size=batch_size)

train_count = len(train_dataset)
val_count = len(val_dataset)

print(f"Training samples: {train_count}\nValidation samples: {val_count}")


Training samples: 195996
Validation samples: 42004


In [5]:
print(train_count)
# print(test_count)
print(val_count)

195996
42004


In [6]:
import torch
from torch import nn
from torch.nn import Parameter
import torch.nn.functional as F

In [7]:
# Define the parameters
input_size = 20
hidden_size = 256
num_layers = 2
num_classes = 2
# drop_amount = 0.25  # You can choose an appropriate dropout rate

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim

In [9]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os, pathlib, glob, random
import numpy as np
import matplotlib.pyplot as plt 
import seaborn as sns
from sklearn.metrics import confusion_matrix
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from datasets import load_dataset
from transformers.models.whisper.modeling_whisper import WhisperModel, WhisperEncoder
from transformers.models.whisper.configuration_whisper import WhisperConfig
from typing import Optional, Tuple, Union
import torch
import librosa 
import matplotlib.pyplot as plt
import numpy as np
import os, glob, pickle
import scipy.io as sio
from tqdm import tqdm
import multiprocessing as mp 
import torch.optim as optim

In [10]:
# BiLSTM

In [11]:
drop_amount = 0.255

class BiLSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(BiLSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(p=drop_amount)
        self.fc = nn.Linear(hidden_size*2, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device=x.device, dtype=torch.double)
        c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device=x.device, dtype=torch.double)
        out, _ = self.lstm(x, (h0, c0))
        out = self.dropout(out)
        # Extract the output of the last time step from both directions
        last_hidden_state = torch.cat((out[:, -1, :self.hidden_size], out[:, 0, self.hidden_size:]), dim=1)
        output = self.fc(last_hidden_state)
        return output

In [12]:
batch_size = 32
output_nodes = 2
learning_rate = 0.003

In [13]:
model = BiLSTMClassifier(input_size, hidden_size, num_layers, num_classes)
model.to(device, dtype=torch.double)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(model)

BiLSTMClassifier(
  (lstm): LSTM(20, 256, num_layers=2, batch_first=True, bidirectional=True)
  (dropout): Dropout(p=0.255, inplace=False)
  (fc): Linear(in_features=512, out_features=2, bias=True)
)


In [14]:
drop_amount = 0.255

class CNNClassifier(nn.Module):
    def __init__(self, input_channels, num_classes):
        super(CNNClassifier, self).__init__()
        self.conv1 = nn.Conv1d(input_channels, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, stride=1, padding=1)
        self.dropout = nn.Dropout(p=drop_amount)
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        # Assuming input x shape is (batch_size, sequence_length, input_channels)
        x = x.permute(0, 2, 1)  # Change shape to (batch_size, input_channels, sequence_length)

        x = self.conv1(x)
        x = F.relu(x)
        x = F.max_pool1d(x, kernel_size=2)

        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool1d(x, kernel_size=2)

        x = self.conv3(x)
        x = F.relu(x)
        x = F.adaptive_max_pool1d(x, 1)  # Pool to a fixed size (1)

        x = x.view(x.size(0), -1)  # Flatten the tensor for fully connected layer
        x = self.dropout(x)
        output = self.fc(x)
        return output

In [15]:
batch_size = 32
output_nodes = 2
learning_rate = 0.003

input_channels = 20  # You can change this depending on your input data
num_classes = 2  # Adjust according to your classification task

In [16]:
model = CNNClassifier(input_channels, num_classes)
model.to(device, dtype=torch.double)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(model)

CNNClassifier(
  (conv1): Conv1d(20, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv2): Conv1d(64, 128, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv3): Conv1d(128, 256, kernel_size=(3,), stride=(1,), padding=(1,))
  (dropout): Dropout(p=0.255, inplace=False)
  (fc): Linear(in_features=256, out_features=2, bias=True)
)


In [17]:
drop_amount = 0.255

class BiGRUAudioClassifier(nn.Module):
    def __init__(self,input_size, num_classes, hidden_units, num_layers):
        super(BiGRUAudioClassifier, self).__init__()
        self.input_size = input_size
        self.num_classes = num_classes
        self.hidden_units = hidden_units
        self.num_layers = num_layers

        self.bigru = nn.GRU(input_size=input_size, hidden_size=hidden_units, num_layers=num_layers, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(p=drop_amount)
        # self.fc = nn.Linear(hidden_units, num_classes)
        self.fc = nn.Linear(hidden_units * 2, num_classes)

    def forward(self, x):
        # x: (batch_size, sequence_length, num_features)

        # Pass the input through the bi-GRU layers
        output, _ = self.bigru(x)
        output = self.dropout(output)
        # Extract the last hidden state (concatenate forward and backward hidden states)
        last_hidden_state = torch.cat((output[:, -1, :self.hidden_units], output[:, 0, self.hidden_units:]), dim=1)
        # Apply the fully connected layer for classification
        output = self.fc(last_hidden_state)

        return output

In [18]:
input_size = 20
hidden_size = 256
num_layers = 2
num_classes = 2
model = BiGRUAudioClassifier(input_size, num_classes, hidden_size, num_layers)
model.to(device, dtype=torch.double)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(model)

BiGRUAudioClassifier(
  (bigru): GRU(20, 256, num_layers=2, batch_first=True, bidirectional=True)
  (dropout): Dropout(p=0.255, inplace=False)
  (fc): Linear(in_features=512, out_features=2, bias=True)
)


In [19]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Define the Basic Block for ResNet
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

# Define the ResNet architecture
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0], stride=1)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Function to create ResNet-50 model
def resnet50(num_classes=2):
    return ResNet(BasicBlock, [3, 4, 6, 3], num_classes)

In [20]:
model = resnet50()
num_epochs=10
model.to(device)

ResNet(
  (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), p

In [21]:
from tqdm import tqdm
import torch
from torch.autograd import Variable

# Initialize lists and variables
train_accuracy_list = []
train_loss_list = []
valid_accuracy_list = []

num_epochs = 10
max_acc = 0
pred_labels = []
act_labels = []

for epoch in range(num_epochs):
    
    # 🛠️ Training Phase
    model.train()
    train_accuracy = 0.0
    train_loss = 0.0
    
    print(f"\nEpoch {epoch + 1}/{num_epochs} - Training...")

    for batch_idx, (images, labels) in enumerate(tqdm(train_dataloader, desc="Training Batches")):
        if torch.cuda.is_available():
            images = images.cuda()
            labels = labels.cuda()
        
        optimizer.zero_grad()

        # Reshape and convert tensors to float
        images = images.unsqueeze(1).float()

        # Forward pass
        outputs = model(images)

        # Loss calculation
        loss = loss_function(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Accumulate loss and accuracy
        train_loss += loss.cpu().data * images.size(0)
        _, prediction = torch.max(outputs.data, 1)
        train_accuracy += int(torch.sum(prediction == labels.data))
        
    # Average loss and accuracy over the training set
    train_accuracy /= train_count
    train_loss /= train_count
    
    train_accuracy_list.append(train_accuracy)
    train_loss_list.append(train_loss)

    # 🛠️ Validation Phase
    model.eval()
    valid_accuracy = 0.0
    pred = []
    lab = []

    print(f"Epoch {epoch + 1}/{num_epochs} - Validation...")

    # Use torch.no_grad() for faster inference during validation
    with torch.no_grad():
        for i, (images, labels) in enumerate(tqdm(val_dataloader, desc="Validation Batches")):
            if torch.cuda.is_available():
                images = images.cuda()
                labels = labels.cuda()

            # Reshape and convert tensors to float
            images = images.unsqueeze(1).float()

            # Forward pass
            outputs = model(images)
            
            _, prediction = torch.max(outputs.data, 1)
            
            valid_accuracy += int(torch.sum(prediction == labels.data))

            pred.extend(prediction.tolist())
            lab.extend(labels.tolist())

    # Average validation accuracy
    valid_accuracy /= val_count
    valid_accuracy_list.append(valid_accuracy)

    # Save the best model
    if valid_accuracy > max_acc:
        pred_labels = pred
        act_labels = lab
        max_acc = valid_accuracy
        torch.save(model, 'best_model.pth')
    
    # Display epoch summary
    print(f"Epoch: {epoch + 1}/{num_epochs}   "
          f"Train Loss: {train_loss:.4f}   "
          f"Train Accuracy: {train_accuracy:.4f}   "
          f"Validation Accuracy: {valid_accuracy:.4f}")

# ✅ Final results
print("\nFinished Training")
print(f"Maximum Validation Accuracy: {max_acc:.4f}")



Epoch 1/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [38:13<00:00,  2.67it/s]


Epoch 1/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [07:25<00:00,  2.95it/s]


Epoch: 1/10   Train Loss: 0.6946   Train Accuracy: 0.5022   Validation Accuracy: 0.4991

Epoch 2/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [15:16<00:00,  6.68it/s]


Epoch 2/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [02:16<00:00,  9.61it/s]


Epoch: 2/10   Train Loss: 0.6947   Train Accuracy: 0.5008   Validation Accuracy: 0.4999

Epoch 3/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [14:18<00:00,  7.13it/s]


Epoch 3/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [02:03<00:00, 10.66it/s]


Epoch: 3/10   Train Loss: 0.6947   Train Accuracy: 0.5005   Validation Accuracy: 0.4999

Epoch 4/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [14:11<00:00,  7.19it/s]


Epoch 4/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [02:06<00:00, 10.34it/s]


Epoch: 4/10   Train Loss: 0.6946   Train Accuracy: 0.5013   Validation Accuracy: 0.5028

Epoch 5/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [14:22<00:00,  7.10it/s]


Epoch 5/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [02:08<00:00, 10.20it/s]


Epoch: 5/10   Train Loss: 0.6946   Train Accuracy: 0.5020   Validation Accuracy: 0.4995

Epoch 6/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [14:50<00:00,  6.88it/s]


Epoch 6/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [02:14<00:00,  9.80it/s]


Epoch: 6/10   Train Loss: 0.6947   Train Accuracy: 0.5012   Validation Accuracy: 0.4954

Epoch 7/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [14:59<00:00,  6.81it/s]


Epoch 7/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [02:15<00:00,  9.70it/s]


Epoch: 7/10   Train Loss: 0.6946   Train Accuracy: 0.5013   Validation Accuracy: 0.5036

Epoch 8/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [15:03<00:00,  6.78it/s]


Epoch 8/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [02:11<00:00,  9.97it/s]


Epoch: 8/10   Train Loss: 0.6946   Train Accuracy: 0.5011   Validation Accuracy: 0.5029

Epoch 9/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [14:37<00:00,  6.98it/s]


Epoch 9/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [02:14<00:00,  9.79it/s]


Epoch: 9/10   Train Loss: 0.6947   Train Accuracy: 0.5017   Validation Accuracy: 0.4993

Epoch 10/10 - Training...


Training Batches: 100%|██████████| 6125/6125 [15:08<00:00,  6.74it/s]


Epoch 10/10 - Validation...


Validation Batches: 100%|██████████| 1313/1313 [02:13<00:00,  9.84it/s]

Epoch: 10/10   Train Loss: 0.6945   Train Accuracy: 0.5019   Validation Accuracy: 0.4993

Finished Training
Maximum Validation Accuracy: 0.5036





In [22]:
# # Load the best model
# best_model = torch.load('model.pth')

# # Put the best_model in evaluation mode
# best_model.eval()

# # Initialize variables to store results
# test_accuracy = 0.0
# pred_labels = []
# act_labels = []

# # Pass validation data through the best model
# for i, (images, labels) in enumerate(test_dataloader):
#     if torch.cuda.is_available():
#         images = Variable(images.cuda())
#         labels = Variable(labels.cuda())
   
#     images = images.unsqueeze(1)
#     images = images.float()
#     outputs = best_model(images)
#     _, prediction = torch.max(outputs.data, 1)
   
#     test_accuracy += int(torch.sum(prediction == labels.data))
   
#     pred_labels.extend(prediction.tolist())
#     act_labels.extend(labels.tolist())

# # Calculate testing accuracy
# test_accuracy = test_accuracy / test_count

# # Print the testing accuracy
# print("testing Accuracy:", test_accuracy)

In [23]:
# # Calculate the confusion matrix
# import seaborn as sns
# conf_mat = confusion_matrix(act_labels, pred_labels)
# # Plot confusion matrix heat map
# sns.heatmap(conf_mat, cmap="flare",annot=True, fmt = "g", 
#             cbar_kws={"label":"color bar"},
#             xticklabels=train_dataset.classes,
#             yticklabels=train_dataset.classes)
# plt.xlabel("Predicted")
# plt.ylabel("Actual")
# plt.title("Confusion Matrix")
# plt.savefig("ConfusionMatrix_BiLSTM.png")
# plt.show()
# from sklearn.metrics import f1_score
# f1_score = f1_score(pred_labels, act_labels, average='macro')
# print('F1 Score : ', f1_score)


In [24]:
# import numpy as np
# import sklearn.metrics

# """
# Python compute equal error rate (eer)
# ONLY tested on binary classification

# :param label: ground-truth label, should be a 1-d list or np.array, each element represents the ground-truth label of one sample
# :param pred: model prediction, should be a 1-d list or np.array, each element represents the model prediction of one sample
# :param positive_label: the class that is viewed as positive class when computing EER
# :return: equal error rate (EER)
# """
# def compute_eer(label, pred):
#     # all fpr, tpr, fnr, fnr, threshold are lists (in the format of np.array)
#     fpr, tpr, threshold = sklearn.metrics.roc_curve(label, pred)
#     fnr = 1 - tpr

#     # the threshold of fnr == fpr
#     eer_threshold = threshold[np.nanargmin(np.absolute((fnr - fpr)))]

#     # theoretically eer from fpr and eer from fnr should be identical but they can be slightly differ in reality
#     eer_1 = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
#     eer_2 = fnr[np.nanargmin(np.absolute((fnr - fpr)))]

#     # return the mean of eer from fpr and from fnr
#     eer = (eer_1 + eer_2) / 2
#     return eer

# eer = compute_eer(act_labels, pred_labels)
# print('The equal error rate is {:.3f}'.format(eer))