# **0.0 Loading Dataset**

In [1]:
!git clone https://github.com/edwinkmusaasizi/Machine-Learning.git

Cloning into 'Machine-Learning'...
remote: Enumerating objects: 37, done.[K
remote: Counting objects: 100% (37/37), done.[K
remote: Compressing objects: 100% (33/33), done.[K
remote: Total 37 (delta 8), reused 7 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (37/37), 321.68 KiB | 6.07 MiB/s, done.
Resolving deltas: 100% (8/8), done.


In [2]:
%cd Machine-Learning
%cd data
%cd interim
!ls

/content/Machine-Learning
/content/Machine-Learning/data
/content/Machine-Learning/data/interim
cleaned_mental_health_data.csv


# **2.0 Teacher Model(DNN)**

3.1 Data Processing

In [10]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder

# Load data
df = pd.read_csv("cleaned_mental_health_data.csv")

# Define adherence labels based on questionnaire responses
non_adherence_columns = [
    "Do you ever forget to take your medication?",
    "Are you careless at times about taking your medication?",
    "When you feel better, do you sometimes stop taking your medication?",
    "Sometimes if you feel worse when you take the medication, do you stop taking it?",
    "I take my medication only when I am sick"
]

df["adherence"] = np.where(df[non_adherence_columns].eq("Yes").any(axis=1), 0, 1)

# Drop redundant columns
df = df.drop(columns=non_adherence_columns + ["If you have any further comments about medication or this questionnaire, please write them below"])

# Identify all categorical columns
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
print("Categorical columns to encode:", categorical_cols)

# Encode all categorical features
for col in categorical_cols:
    le = LabelEncoder()
    df[col] = le.fit_transform(df[col].astype(str))

# Split features and labels
X = df.drop(columns="adherence").values
y = df["adherence"].values

# Split data into train, validation, test (70-15-15)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, stratify=y, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42)

# Normalize numerical features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

# Convert to PyTorch tensors
train_dataset = TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train))
val_dataset = TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val))
test_dataset = TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test))

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

Categorical columns to encode: Index(['sex', 'Religion', 'marital status', 'education status', 'residence',
       'substance use', 'comorbidity',
       'It is unnatural for my mind and body to be controlled by medication?',
       'My thoughts are clearer on medication',
       'By staying on medication, I can prevent getting sick',
       'I feel weird, like a ‘zombie’ on medication',
       'Medication makes me feel tired and sluggish',
       'Some of your symptoms are made by your mind.', 'You are mentally well',
       'You do not need medication', 'Your stay in the hospital is necessary',
       'The doctor is right in prescribing medication for you.',
       'You do not need to be seen by a doctor or psychiatrist',
       'If someone said you have a nervous or mental illness, they would be right',
       'None of the unusual things you are experiencing are due to an illness.',
       '. Loss of energy or drive', 'Feeling unmotivated or numb',
       'Daytime sedation or drowsi

3.2  Define the Teacher Model Architecture



In [11]:
import torch.nn as nn

class TeacherModel(nn.Module):
    def __init__(self, input_dim):
        super(TeacherModel, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.layers(x)

# Initialize model
input_dim = X_train.shape[1]
teacher = TeacherModel(input_dim)

3.3 Training Loop

In [12]:
import torch.optim as optim
from sklearn.metrics import accuracy_score, roc_auc_score

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(teacher.parameters(), lr=0.001)

# Early stopping parameters
best_val_loss = float('inf')
patience = 5
counter = 0

# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    # Training
    teacher.train()
    train_loss = 0.0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = teacher(inputs).squeeze()
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * inputs.size(0)

    # Validation
    teacher.eval()
    val_loss = 0.0
    val_preds, val_true = [], []
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = teacher(inputs).squeeze()
            loss = criterion(outputs, labels)
            val_loss += loss.item() * inputs.size(0)
            val_preds.extend(outputs.numpy())
            val_true.extend(labels.numpy())

    # Calculate metrics
    train_loss = train_loss / len(train_loader.dataset)
    val_loss = val_loss / len(val_loader.dataset)
    val_auc = roc_auc_score(val_true, val_preds)
    val_preds_binary = (np.array(val_preds) > 0.5).astype(int)
    val_acc = accuracy_score(val_true, val_preds_binary)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val AUC: {val_auc:.4f} | Val Acc: {val_acc:.4f}")

    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        counter = 0
        torch.save(teacher.state_dict(), "best_teacher.pth")
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping!")
            break

Epoch 1/100
Train Loss: 0.7060 | Val Loss: 0.6836 | Val AUC: 0.6389 | Val Acc: 0.5000
Epoch 2/100
Train Loss: 0.6753 | Val Loss: 0.6621 | Val AUC: 0.6806 | Val Acc: 0.6667
Epoch 3/100
Train Loss: 0.6529 | Val Loss: 0.6412 | Val AUC: 0.7083 | Val Acc: 0.6667
Epoch 4/100
Train Loss: 0.6379 | Val Loss: 0.6200 | Val AUC: 0.7222 | Val Acc: 0.6667
Epoch 5/100
Train Loss: 0.6201 | Val Loss: 0.6027 | Val AUC: 0.7222 | Val Acc: 0.6667
Epoch 6/100
Train Loss: 0.5891 | Val Loss: 0.5912 | Val AUC: 0.7361 | Val Acc: 0.6667
Epoch 7/100
Train Loss: 0.5650 | Val Loss: 0.5845 | Val AUC: 0.7361 | Val Acc: 0.6667
Epoch 8/100
Train Loss: 0.5460 | Val Loss: 0.5821 | Val AUC: 0.7222 | Val Acc: 0.6667
Epoch 9/100
Train Loss: 0.5540 | Val Loss: 0.5811 | Val AUC: 0.7222 | Val Acc: 0.6667
Epoch 10/100
Train Loss: 0.5286 | Val Loss: 0.5822 | Val AUC: 0.7361 | Val Acc: 0.6667
Epoch 11/100
Train Loss: 0.4947 | Val Loss: 0.5765 | Val AUC: 0.7361 | Val Acc: 0.6667
Epoch 12/100
Train Loss: 0.4821 | Val Loss: 0.5740 |

 3.4 Evaluation

In [15]:
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix

def calculate_specificity(y_true, y_pred):
    """Calculate specificity (true negative rate)."""
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    specificity = tn / (tn + fp)
    return specificity

# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    # Training
    teacher.train()
    train_loss = 0.0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = teacher(inputs).squeeze()
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * inputs.size(0)

    # Validation
    teacher.eval()
    val_loss = 0.0
    val_preds, val_true = [], []
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = teacher(inputs).squeeze()
            loss = criterion(outputs, labels)
            val_loss += loss.item() * inputs.size(0)
            val_preds.extend(outputs.numpy())
            val_true.extend(labels.numpy())

    # Calculate metrics
    train_loss = train_loss / len(train_loader.dataset)
    val_loss = val_loss / len(val_loader.dataset)

    # Convert predictions to binary (0 or 1)
    val_preds_binary = (np.array(val_preds) > 0.5).astype(int)

    # Calculate precision, recall, F1-score, specificity, and AUC
    val_precision = precision_score(val_true, val_preds_binary)
    val_recall = recall_score(val_true, val_preds_binary)
    val_f1 = f1_score(val_true, val_preds_binary)
    val_specificity = calculate_specificity(val_true, val_preds_binary)
    val_auc = roc_auc_score(val_true, val_preds)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")
    print(f"Val Precision: {val_precision:.4f} | Val Recall: {val_recall:.4f}")
    print(f"Val F1-Score: {val_f1:.4f} | Val Specificity: {val_specificity:.4f}")
    print(f"Val AUC: {val_auc:.4f}")

    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        counter = 0
        torch.save(teacher.state_dict(), "best_teacher.pth")
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping!")
            break

print(f"Learning Rate: {optimizer.param_groups[0]['lr']}")

Epoch 1/100
Train Loss: 0.4075 | Val Loss: 0.6106
Val Precision: 0.0000 | Val Recall: 0.0000
Val F1-Score: 0.0000 | Val Specificity: 1.0000
Val AUC: 0.7500
Early stopping!
Learning Rate: 0.001


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Test Evaluation

In [16]:
# Load the best saved model
teacher.load_state_dict(torch.load("best_teacher.pth"))

# Test evaluation
teacher.eval()
test_preds, test_true = [], []
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = teacher(inputs).squeeze()
        test_preds.extend(outputs.numpy())
        test_true.extend(labels.numpy())

# Convert predictions to binary (0 or 1)
test_preds_binary = (np.array(test_preds) > 0.5).astype(int)

# Calculate metrics
test_precision = precision_score(test_true, test_preds_binary)
test_recall = recall_score(test_true, test_preds_binary)
test_f1 = f1_score(test_true, test_preds_binary)
test_specificity = calculate_specificity(test_true, test_preds_binary)
test_auc = roc_auc_score(test_true, test_preds)

print("Test Metrics:")
print(f"Test Precision: {test_precision:.4f} | Test Recall: {test_recall:.4f}")
print(f"Test F1-Score: {test_f1:.4f} | Test Specificity: {test_specificity:.4f}")
print(f"Test AUC: {test_auc:.4f}")

Test Metrics:
Test Precision: 0.0000 | Test Recall: 0.0000
Test F1-Score: 0.0000 | Test Specificity: 1.0000
Test AUC: 0.4444


  teacher.load_state_dict(torch.load("best_teacher.pth"))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


# **3.0 Contrastive Learning**

---



3.1 Data Preparation for Contrastive Learning

We need to create pairs of samples for contrastive learning. Each pair consists of:

Two samples from the same class (positive pair).

Two samples from different classes (negative pair).

In [17]:
import torch
from torch.utils.data import Dataset, DataLoader

class ContrastiveDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x1 = self.X[idx]
        y1 = self.y[idx]

        # Randomly select a positive or negative pair
        if torch.rand(1) > 0.5:
            # Positive pair: same class
            idx2 = torch.randint(0, len(self.X), (1,)).item()
            while self.y[idx2] != y1:
                idx2 = torch.randint(0, len(self.X), (1,)).item()
            label = 1  # Positive pair label
        else:
            # Negative pair: different class
            idx2 = torch.randint(0, len(self.X), (1,)).item()
            while self.y[idx2] == y1:
                idx2 = torch.randint(0, len(self.X), (1,)).item()
            label = 0  # Negative pair label

        x2 = self.X[idx2]
        return x1, x2, label

# Create contrastive datasets
train_contrastive_dataset = ContrastiveDataset(X_train, y_train)
val_contrastive_dataset = ContrastiveDataset(X_val, y_val)

# Create DataLoaders
batch_size = 32
train_contrastive_loader = DataLoader(train_contrastive_dataset, batch_size=batch_size, shuffle=True)
val_contrastive_loader = DataLoader(val_contrastive_dataset, batch_size=batch_size)

3.2 Define the Contrastive Loss

We’ll use the NT-Xent (Normalized Temperature-Scaled Cross Entropy) loss, which is commonly used in contrastive learning.

In [18]:
import torch.nn.functional as F

class ContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.5):
        super(ContrastiveLoss, self).__init__()
        self.temperature = temperature

    def forward(self, z1, z2, labels):
        # Normalize the embeddings
        z1 = F.normalize(z1, dim=1)
        z2 = F.normalize(z2, dim=1)

        # Compute similarity matrix
        sim_matrix = torch.matmul(z1, z2.T) / self.temperature

        # Positive pairs are on the diagonal
        pos_pairs = torch.diag(sim_matrix)

        # Negative pairs are off-diagonal
        neg_pairs = sim_matrix[~torch.eye(sim_matrix.size(0), dtype=bool)]

        # Compute contrastive loss
        pos_loss = -torch.log(torch.exp(pos_pairs) / torch.exp(sim_matrix).sum(dim=1))
        neg_loss = -torch.log(1 - torch.exp(neg_pairs) / torch.exp(sim_matrix).sum(dim=1))

        # Combine losses
        loss = (pos_loss.mean() + neg_loss.mean()) / 2
        return loss

3.3  Define the Encoder Model

The encoder model will learn meaningful representations of the input data. We’ll use a simple feedforward neural network.

In [19]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, output_dim=64):
        super(Encoder, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.layers(x)

# Initialize encoder
input_dim = X_train.shape[1]
encoder = Encoder(input_dim)

3.4 Training Loop with Contrastive Learning

In [23]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

# Define ContrastiveDataset
class ContrastiveDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)  # Convert to FloatTensor
        self.y = torch.FloatTensor(y)  # Convert to FloatTensor

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x1 = self.X[idx]
        y1 = self.y[idx]

        # Randomly select a positive or negative pair
        if torch.rand(1) > 0.5:
            # Positive pair: same class
            idx2 = torch.randint(0, len(self.X), (1,)).item()
            while self.y[idx2] != y1:
                idx2 = torch.randint(0, len(self.X), (1,)).item()
            label = 1  # Positive pair label
        else:
            # Negative pair: different class
            idx2 = torch.randint(0, len(self.X), (1,)).item()
            while self.y[idx2] == y1:
                idx2 = torch.randint(0, len(self.X), (1,)).item()
            label = 0  # Negative pair label

        x2 = self.X[idx2]
        return x1, x2, label

# Define ContrastiveLoss
class ContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.5):
        super(ContrastiveLoss, self).__init__()
        self.temperature = temperature

    def forward(self, z1, z2, labels):
        # Normalize the embeddings
        z1 = F.normalize(z1, dim=1)
        z2 = F.normalize(z2, dim=1)

        # Compute similarity matrix
        sim_matrix = torch.matmul(z1, z2.T) / self.temperature

        # Positive pairs are on the diagonal
        pos_pairs = torch.diag(sim_matrix)

        # Negative pairs are off-diagonal
        mask = ~torch.eye(sim_matrix.size(0), dtype=bool, device=sim_matrix.device)
        neg_pairs = sim_matrix[mask].reshape(sim_matrix.size(0), -1)

        # Compute contrastive loss
        pos_loss = -torch.log(torch.exp(pos_pairs) / torch.exp(sim_matrix).sum(dim=1))
        neg_loss = -torch.log(1 - torch.exp(neg_pairs) / torch.exp(sim_matrix).sum(dim=1).unsqueeze(1))

        # Combine losses
        loss = (pos_loss.mean() + neg_loss.mean()) / 2
        return loss

# Define Encoder
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, output_dim=64):
        super(Encoder, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.layers(x)

# Initialize encoder
input_dim = X_train.shape[1]
encoder = Encoder(input_dim).float()  # Ensure model uses Float

# Create contrastive datasets
train_contrastive_dataset = ContrastiveDataset(X_train, y_train)
val_contrastive_dataset = ContrastiveDataset(X_val, y_val)

# Create DataLoaders
batch_size = 32
train_contrastive_loader = DataLoader(train_contrastive_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_contrastive_loader = DataLoader(val_contrastive_dataset, batch_size=batch_size, drop_last=True)

# Loss and optimizer
contrastive_loss = ContrastiveLoss(temperature=0.5)
optimizer = optim.Adam(encoder.parameters(), lr=0.001)

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    encoder.train()
    train_loss = 0.0
    for x1, x2, labels in train_contrastive_loader:
        optimizer.zero_grad()

        # Forward pass
        z1 = encoder(x1)
        z2 = encoder(x2)

        # Compute contrastive loss
        loss = contrastive_loss(z1, z2, labels)

        # Backward pass
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * x1.size(0)

    # Validation
    encoder.eval()
    val_loss = 0.0
    with torch.no_grad():
        for x1, x2, labels in val_contrastive_loader:
            z1 = encoder(x1)
            z2 = encoder(x2)
            loss = contrastive_loss(z1, z2, labels)
            val_loss += loss.item() * x1.size(0)

    # Calculate average losses
    train_loss = train_loss / len(train_contrastive_loader.dataset)
    val_loss = val_loss / len(val_contrastive_loader.dataset)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

Epoch 1/50
Train Loss: 1.4085 | Val Loss: 0.0000
Epoch 2/50
Train Loss: 1.3898 | Val Loss: 0.0000
Epoch 3/50
Train Loss: 1.4124 | Val Loss: 0.0000
Epoch 4/50
Train Loss: 1.3933 | Val Loss: 0.0000
Epoch 5/50
Train Loss: 1.3942 | Val Loss: 0.0000
Epoch 6/50
Train Loss: 1.3854 | Val Loss: 0.0000
Epoch 7/50
Train Loss: 1.3801 | Val Loss: 0.0000
Epoch 8/50
Train Loss: 1.3835 | Val Loss: 0.0000
Epoch 9/50
Train Loss: 1.3995 | Val Loss: 0.0000
Epoch 10/50
Train Loss: 1.3998 | Val Loss: 0.0000
Epoch 11/50
Train Loss: 1.3936 | Val Loss: 0.0000
Epoch 12/50
Train Loss: 1.3963 | Val Loss: 0.0000
Epoch 13/50
Train Loss: 1.3924 | Val Loss: 0.0000
Epoch 14/50
Train Loss: 1.3865 | Val Loss: 0.0000
Epoch 15/50
Train Loss: 1.3857 | Val Loss: 0.0000
Epoch 16/50
Train Loss: 1.3817 | Val Loss: 0.0000
Epoch 17/50
Train Loss: 1.3973 | Val Loss: 0.0000
Epoch 18/50
Train Loss: 1.3879 | Val Loss: 0.0000
Epoch 19/50
Train Loss: 1.3927 | Val Loss: 0.0000
Epoch 20/50
Train Loss: 1.3829 | Val Loss: 0.0000
Epoch 21/

3.5 Use Learned Representations


In [24]:
# Extract learned representations
encoder.eval()
with torch.no_grad():
    z_train = encoder(torch.FloatTensor(X_train))
    z_val = encoder(torch.FloatTensor(X_val))
    z_test = encoder(torch.FloatTensor(X_test))