In [46]:
import torch
import torch.nn as nn
import torch.optim as optim

class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, output_dim)  # output_dim=5
        self.relu = nn.ReLU()
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        return self.fc3(x)


In [47]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler

# Load resampled training set
train_res = pd.read_csv("/Users/ellaforss/git/RTDS_group/data/Xy_train_resampled.csv")
X_train_res = train_res.drop('num', axis=1).values
y_train_res = train_res['num'].values.astype(int)

# Load original test set
X_test_df = pd.read_csv("/Users/ellaforss/git/RTDS_group/data/X_test.csv")
y_test = pd.read_csv("/Users/ellaforss/git/RTDS_group/data/y_test.csv").values.astype(int)

X_test = X_test_df.values



In [48]:
#Define DQN Environment
import gym
from gym import spaces

class HeartDiseaseEnv(gym.Env):
    def __init__(self, X, y):
        super(HeartDiseaseEnv, self).__init__()
        self.X = X
        self.y = y
        self.n_samples = X.shape[0]
        self.current_index = 0

        self.observation_space = spaces.Box(
            low=np.min(X), high=np.max(X), shape=(X.shape[1],), dtype=np.float32
        )
        self.action_space = spaces.Discrete(5)  # 5 classes (0-4)

    def reset(self):
        self.current_index = np.random.randint(0, self.n_samples)
        return self.X[self.current_index]

    def step(self, action):
        true_label = self.y[self.current_index]
        reward = 1 if action == true_label else -1
        done = True

        self.current_index = np.random.randint(0, self.n_samples)
        next_state = self.X[self.current_index]
        return next_state, reward, done, {}


In [49]:
# 3. Define DQN
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [50]:
#4. Initialize model, optimizer, loss
input_dim = X_train_res.shape[1]
output_dim = 5  # num classes: 0-4

dqn = DQN(input_dim=input_dim, output_dim=output_dim)
optimizer = optim.Adam(dqn.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()


In [51]:
#5. Training loop (supervised classification-style)
# -------------------------------
NUM_EPOCHS = 50
BATCH_SIZE = 64

for epoch in range(NUM_EPOCHS):
    # Shuffle training data
    perm = np.random.permutation(len(X_train_res))
    X_train_shuffled = X_train_res[perm]
    y_train_shuffled = y_train_res[perm]

    for i in range(0, len(X_train_res), BATCH_SIZE):
        X_batch = torch.FloatTensor(X_train_shuffled[i:i+BATCH_SIZE])
        y_batch = torch.LongTensor(y_train_shuffled[i:i+BATCH_SIZE])

        # Forward pass
        outputs = dqn(X_batch)

        # Compute loss
        loss = criterion(outputs, y_batch)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # Optional: print progress
    if (epoch+1) % 5 == 0:
        print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Loss: {loss.item():.4f}")



Epoch 5/50, Loss: 1.0061
Epoch 10/50, Loss: 1.0414
Epoch 15/50, Loss: 0.6572
Epoch 20/50, Loss: 0.8643
Epoch 25/50, Loss: 0.6482
Epoch 30/50, Loss: 0.8275
Epoch 35/50, Loss: 0.3592
Epoch 40/50, Loss: 0.6334
Epoch 45/50, Loss: 0.3598
Epoch 50/50, Loss: 0.3865


In [52]:
#Save Model
torch.save(dqn.state_dict(), "dqn_heart_classification.pth")


In [53]:
#7. Evaluate on Test Set
dqn.eval()
y_pred = []

with torch.no_grad():
    for i in range(X_test_scaled.shape[0]):
        state = torch.FloatTensor(X_test_scaled[i]).unsqueeze(0)
        q_values = dqn(state)
        action = q_values.argmax().item()
        y_pred.append(action)

from sklearn.metrics import accuracy_score, f1_score
print("Test Accuracy:", accuracy_score(y_test, y_pred))
print("Macro F1 Score:", f1_score(y_test, y_pred, average="macro"))


Test Accuracy: 0.5489130434782609
Macro F1 Score: 0.2914646464646465


In [73]:
# EVALUATE BIAS AND FAIRNESS
# 1. Imports
import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score

# -------------------------------
# 2. Load saved model and data
# Load DQN model definition (must match training architecture)
class DQN(torch.nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.fc1 = torch.nn.Linear(input_dim, 64)
        self.fc2 = torch.nn.Linear(64, 32)
        self.fc3 = torch.nn.Linear(32, output_dim)
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Paths to data
resampled_path = "/Users/ellaforss/git/RTDS_group/data/Xy_train_resampled.csv"

# Load resampled training set (DQN was trained on this)
train_res = pd.read_csv(resampled_path)
X_train_res = train_res.drop('num', axis=1)
y_train_res = train_res['num'].values.astype(int)

# Sensitive features
sex = X_train_res['sex_Male'].values   # 0 or 1
age = X_train_res['age'].values

# Drop target from features for input
X_train_res = X_train_res.values

# Load saved scaler if available, or fit new one
scaler = StandardScaler()
X_train_res_scaled = scaler.fit_transform(X_train_res)  # should match training scaling

# -------------------------------
# 3. Load DQN model
input_dim = X_train_res_scaled.shape[1]
output_dim = 5   # 5 classes: 0-4
dqn = DQN(input_dim=input_dim, output_dim=output_dim)
dqn.load_state_dict(torch.load("dqn_heart_classification.pth"))
dqn.eval()

# -------------------------------
# 4. Predict
with torch.no_grad():
    inputs = torch.tensor(X_train_res_scaled, dtype=torch.float32)
    outputs = dqn(inputs)
    y_pred = torch.argmax(outputs, axis=1).numpy()

# -------------------------------
# 5. Define fairness evaluation functions
def evaluate_fairness(y_true, y_pred, sensitive_feature, feature_name):
    groups = np.unique(sensitive_feature)
    print(f"\nFairness metrics by {feature_name}:")
    for group in groups:
        idx = (sensitive_feature == group)
        # Skip group if only one class present
        if len(np.unique(y_true[idx])) < 2:
            print(f"  Group {group}: Not enough classes to compute F1. Skipping.")
            continue
        acc = accuracy_score(y_true[idx], y_pred[idx])
        f1 = f1_score(y_true[idx], y_pred[idx], average="macro")
        print(f"  Group {group}: Accuracy={acc:.3f}, Macro F1={f1:.3f}")

# -------------------------------
# 6. Evaluate bias/fairness
evaluate_fairness(y_train_res, y_pred, sex, "Sex")
evaluate_fairness(y_train_res, y_pred, age, "Age")



Fairness metrics by Sex:
  Group -1.9344942598098125: Accuracy=0.857, Macro F1=0.818
  Group -1.9211352444724512: Not enough classes to compute F1. Skipping.
  Group -1.8969763235653012: Not enough classes to compute F1. Skipping.
  Group -1.8842658526655005: Not enough classes to compute F1. Skipping.
  Group -1.8392938119197837: Not enough classes to compute F1. Skipping.
  Group -1.8384319098157296: Not enough classes to compute F1. Skipping.
  Group -1.8276028734850072: Not enough classes to compute F1. Skipping.
  Group -1.8025942045667431: Not enough classes to compute F1. Skipping.
  Group -1.788691489614054: Accuracy=1.000, Macro F1=1.000
  Group -1.7877358700489292: Not enough classes to compute F1. Skipping.
  Group -1.7651721266384162: Not enough classes to compute F1. Skipping.
  Group -1.73624022879166: Not enough classes to compute F1. Skipping.
  Group -1.621472435132025: Not enough classes to compute F1. Skipping.
  Group -1.59957728057179: Not enough classes to comput