In [1]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [6]:
!pip install -q torchmetrics

In [34]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torchmetrics.classification import BinaryAccuracy, F1Score, Precision, Recall
from torchmetrics import MeanMetric

In [22]:
import os
print("Current working directory:", os.getcwd())

Current working directory: /content


In [23]:
data = pd.read_csv('/content/drive/MyDrive/embeddings.csv')

In [41]:
data.reset_index(drop=True, inplace=True)

In [25]:
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)

In [39]:
# Defining a custom dataset for Siamese network pairs
class DysphoniaPairsDataset(Dataset):
    def __init__(self, data):
        self.data = data
        self.embeddings = data['embedding'].apply(lambda x: torch.tensor(eval(x), dtype=torch.float32))  # Assuming embeddings are stored as strings
        self.labels = data['category'].map({'healthy': 0, 'patient': 1})

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Get two random samples, ensure one is similar or dissimilar based on idx parity
        row1 = self.data.iloc[idx]
        label1 = self.labels.iloc[idx]

        # Create positive or negative pairs
        if idx % 2 == 0:  # Positive pair
            row2 = self.data[self.labels == label1].sample(1).iloc[0]
            label = 1
        else:  # Negative pair
            row2 = self.data[self.labels != label1].sample(1).iloc[0]
            label = 0

        # Convert to tensors
        embedding1 = torch.tensor(eval(row1['embedding']), dtype=torch.float32)
        embedding2 = torch.tensor(eval(row2['embedding']), dtype=torch.float32)

        return embedding1, embedding2, torch.tensor(label, dtype=torch.float32)



In [27]:
# Create DataLoaders for training and testing
train_dataset = DysphoniaPairsDataset(train_data)
test_dataset = DysphoniaPairsDataset(test_data)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

SAIMESE NETWORK

In [28]:
# Define the sub-network for embedding generation
class EmbeddingNet(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(EmbeddingNet, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )

    def forward(self, x):
        return self.fc(x)

class SiameseNetwork(nn.Module):
    def __init__(self, input_dim, embedding_dim):
        super(SiameseNetwork, self).__init__()
        self.embedding_net = EmbeddingNet(input_dim, embedding_dim)

    def forward(self, input1, input2):
        # Get embeddings for both inputs
        output1 = self.embedding_net(input1)
        output2 = self.embedding_net(input2)
        # Calculate cosine similarity
        similarity = F.cosine_similarity(output1, output2, dim=1)
        return similarity

Updating the loss function

In [29]:
class CosineContrastiveLoss(nn.Module):
    def __init__(self, margin=0.5):
        super(CosineContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, similarity, label):
        # For similar pairs (label == 1), maximize similarity
        # For dissimilar pairs (label == 0), ensure similarity is below margin
        loss = label * (1 - similarity) + (1 - label) * F.relu(similarity - self.margin)
        return loss.mean()

In [30]:
# Set parameters
input_dim = len(eval(data['embedding'].iloc[0]))  # Dimension of embeddings in the CSV
embedding_dim = 32  # Dimension of output embedding
margin = 0.5
epochs = 10
learning_rate = 0.001

# Initialize model, loss, and optimizer
model = SiameseNetwork(input_dim, embedding_dim)
criterion = CosineContrastiveLoss(margin=margin)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


In [40]:
# Initialize metrics
train_loss_metric = MeanMetric()  # For averaging training loss
test_loss_metric = MeanMetric()   # For averaging test loss
accuracy_metric = BinaryAccuracy()  # For calculating accuracy (binary task)
f1_score_metric = F1Score(task='binary', num_classes=2, average='macro')  # F1 score metric
precision_metric = Precision(task='binary', num_classes=2, average='macro')  # Precision metric
recall_metric = Recall(task='binary', num_classes=2, average='macro')

for epoch in range(epochs):
  #training loop
    model.train()
    train_loss_metric.reset()

    for embedding1, embedding2, label in train_loader:
        optimizer.zero_grad()
        # Forward pass
        similarity = model(embedding1, embedding2)
        loss = criterion(similarity, label)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        train_loss_metric.update(loss.item())

    # testing loop
    model.eval()
    test_loss_metric.reset()
    accuracy_metric.reset()
    f1_score_metric.reset()
    precision_metric.reset()
    recall_metric.reset()

    with torch.no_grad():
        for embedding1, embedding2, label in test_loader:
            similarity = model(embedding1, embedding2)
            loss = criterion(similarity, label)
            test_loss_metric.update(loss.item())

            predictions = (similarity > 0.5).float()
            accuracy_metric.update(predictions, label)
            f1_score_metric.update(predictions, label)
            precision_metric.update(predictions, label)
            recall_metric.update(predictions, label)

    # Print metrics
    train_loss = train_loss_metric.compute()
    test_loss = test_loss_metric.compute()
    accuracy = accuracy_metric.compute()
    f1_score = f1_score_metric.compute()
    precision = precision_metric.compute()
    recall = recall_metric.compute()

    print(f"Epoch [{epoch+1}/{epochs}], Training Loss: {train_loss/len(train_loader):.4f}, Test Loss: {test_loss/len(test_loader):.4f}, Accuracy: {accuracy:.4f}")

final_accuracy = accuracy_metric.compute()
final_f1_score = f1_score_metric.compute()
final_precision = precision_metric.compute()
final_recall = recall_metric.compute()
print("Training complete.")
print(f"Accuracy: {final_accuracy:.4f}, Precision: {final_precision:.4f}, Recall: {final_recall:.4f}, F1 Score: {final_f1_score:.4f}")


Epoch [1/10], Training Loss: 0.0011, Test Loss: 0.0045, Accuracy: 0.5003
Epoch [2/10], Training Loss: 0.0011, Test Loss: 0.0044, Accuracy: 0.5003
Epoch [3/10], Training Loss: 0.0010, Test Loss: 0.0037, Accuracy: 0.5488
Epoch [4/10], Training Loss: 0.0009, Test Loss: 0.0036, Accuracy: 0.5623
Epoch [5/10], Training Loss: 0.0009, Test Loss: 0.0037, Accuracy: 0.5725
Epoch [6/10], Training Loss: 0.0009, Test Loss: 0.0036, Accuracy: 0.5945
Epoch [7/10], Training Loss: 0.0009, Test Loss: 0.0036, Accuracy: 0.5725
Epoch [8/10], Training Loss: 0.0008, Test Loss: 0.0036, Accuracy: 0.5759
Epoch [9/10], Training Loss: 0.0009, Test Loss: 0.0035, Accuracy: 0.5860
Epoch [10/10], Training Loss: 0.0009, Test Loss: 0.0036, Accuracy: 0.5341
Training complete.
Accuracy: 0.5341, Precision: 0.5185, Recall: 0.9651, F1 Score: 0.6745
