In [1]:
# ============================================================
# 0. IMPORTS
# ============================================================
from kan import KAN
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import pennylane as qml
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler, RobustScaler,MinMaxScaler
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, balanced_accuracy_score, average_precision_score,
    matthews_corrcoef, cohen_kappa_score, brier_score_loss, roc_auc_score
)
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import KFold
import random
import copy
# ============================================================
# 1. SEEDING (REPRODUCIBILITY)
# ============================================================
batch_size = 32
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [2]:
# ============================================================
# 2. DATA LOADING & PREPROCESSING
# ============================================================

df = pd.read_csv("IndianLiverPatientDataset(ILPD).csv")
df = df.copy()

# Encode categorical
df["Gender"] = df["Gender"].map({"Male": 0, "Female": 1})

# Remap label: {1,2} -> {1,0}
df["Sickness"] = df["Sickness"].replace(2, 0)

# Handle NaN
# df["A/G"] = df["A/G"].fillna(df["A/G"].mean())

# Separate positive / negative
pos = df[df["Sickness"] == 1]
neg = df[df["Sickness"] == 0]

# Balanced test set
n_test = min(len(pos), len(neg)) // 2
test = pd.concat([
    pos.sample(n=n_test, random_state=seed),
    neg.sample(n=n_test, random_state=seed)
]).reset_index(drop=True)

# Drop specific indices (logic from original code)
drop_idx = [3, 31, 35, 89, 104, 106, 114, 115, 116, 124, 130, 132, 135, 139, 143, 150, 151, 157, 161]
test = test.drop([i for i in drop_idx if i in test.index], errors="ignore")

train = df.drop(test.index).reset_index(drop=True)
test = test.drop_duplicates().reset_index(drop=True)
train = train.drop_duplicates().reset_index(drop=True)

# Handle NaN
ag_mean = train["A/G"].mean()
train["A/G"] = train["A/G"].fillna(ag_mean)
test["A/G"]  = test["A/G"].fillna(ag_mean)

# Split X / y
X_train_df, y_train_df = train.drop(columns=["Sickness"]), train["Sickness"]
X_test_df, y_test_df = test.drop(columns=["Sickness"]), test["Sickness"]



# Scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train_df)
X_test = scaler.transform(X_test_df)

# Torch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)

X_train = X_train*((np.pi/2))
X_test  = X_test*((np.pi/2))

# Labels
y_train = torch.tensor(y_train_df.values, dtype=torch.float32).unsqueeze(1)
y_test = torch.tensor(y_test_df.values, dtype=torch.float32).unsqueeze(1)


# CHIA VALIDATION (C·∫Øt ƒëu√¥i 25% gi·ªëng Keras validation_split=0.25)
# L∆∞u √Ω: Keras c·∫Øt 25% cu·ªëi c√πng l√†m val TR∆Ø·ªöC KHI shuffle
dataset_size = len(X_train)
val_size = int(0.25 * dataset_size)
train_size = dataset_size - val_size

# Slicing th·ªß c√¥ng thay v√¨ d√πng random_split
X_train_split = X_train[:train_size]
y_train_split = y_train[:train_size]
X_val_split   = X_train[train_size:]
y_val_split   = y_train[train_size:]

# T·∫°o DataLoader
train_loader = DataLoader(TensorDataset(X_train_split, y_train_split), batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(TensorDataset(X_val_split, y_val_split), batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

# from sklearn.preprocessing import QuantileTransformer

# qt = QuantileTransformer(
#     n_quantiles=100,
#     output_distribution="normal",
#     random_state=42
# )
# X_train = qt.fit_transform(X_train_df)
# X_test = qt.fit_transform(X_test_df)
# X_train = torch.tanh(torch.tensor(X_train, dtype=torch.float32))
# X_test = torch.tanh(torch.tensor(X_test, dtype=torch.float32))
# X_train = torch.clamp(X_train, -3, 3)
# X_test  = torch.clamp(X_test,  -3, 3)
# üî• R·∫§T QUAN TR·ªåNG: ·ªïn ƒë·ªãnh spline cho KAN
# X_train = torch.tanh(X_train)
# X_test  = torch.tanh(X_test)

In [3]:
# # ============================================================
# # CLASS WEIGHT: EFFECTIVE NUMBER OF SAMPLES (CVPR 2019)
# # ============================================================

# def effective_num_weight(n_samples, beta=0.3):
#     return (1 - beta) / (1 - beta ** n_samples)

# # y_train ƒë√£ l√† tensor ‚Üí chuy·ªÉn sang numpy
# y_train_np = y_train.numpy().flatten()

# n0 = (y_train_np == 0).sum()
# n1 = (y_train_np == 1).sum()

# w0 = effective_num_weight(n0, beta=0.3)
# w1 = effective_num_weight(n1, beta=0.3)

# # Normalize ƒë·ªÉ mean = 1 (r·∫•t quan tr·ªçng)
# mean_w = (w0 + w1) / 2

# class_weight_dict = {
#     0: float(w0 / mean_w),
#     1: float(w1 / mean_w)
# }

# print("Effective class weights:", class_weight_dict)


In [5]:
# ============================================================
# 3. QUANTUM LAYER
# ============================================================

n_qubits = 2
n_layers = 1

dev = qml.device("default.qubit", wires=n_qubits, seed=seed)

@qml.qnode(dev, interface="torch")
def qnode(inputs, weights):
    qml.AngleEmbedding(inputs, wires=range(n_qubits))
    qml.BasicEntanglerLayers(weights, wires=range(n_qubits))
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

weight_shapes = {"weights": (n_layers, n_qubits)}

In [6]:
# @qml.qnode(dev, interface="torch")
# def qnode(inputs, weights):
#     for l in range(n_layers):
#         qml.AngleEmbedding(inputs, wires=range(n_qubits))
#         for i in range(n_qubits):
#             qml.RY(weights[l, i], wires=i)
#         for i in range(n_qubits - 1):
#             qml.CNOT(wires=[i, i + 1])
#     return [qml.expval(qml.PauliY(i)) for i in range(n_qubits)]
# weight_shapes = {"weights": (n_layers, n_qubits)}


In [None]:
import torch
import torch.nn as nn

class BinaryFocalLoss(nn.Module):
    def __init__(self, alpha=0.3, gamma=1.0, reduction="mean", eps=1e-7):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction
        self.eps = eps

    def forward(self, y_pred, y_true):
        y_true = y_true.float()
        y_pred = torch.clamp(y_pred, self.eps, 1.0 - self.eps)

        pos_loss = - self.alpha * (1 - y_pred) ** self.gamma * y_true * torch.log(y_pred)
        neg_loss = - (1 - self.alpha) * y_pred ** self.gamma * (1 - y_true) * torch.log(1 - y_pred)

        loss = pos_loss + neg_loss

        if self.reduction == "mean":
            return loss.mean()
        return loss.sum()


In [8]:
# ============================================================
# 5. MODEL ARCHITECTURE
# ============================================================

class HybridKANQuantumModel(nn.Module):
    def __init__(self, input_dim):
        super().__init__()

        # ======================
        # KAN NETWORK
        # ======================
        self.kan = KAN(
            width=[input_dim,7,5, n_qubits],  # b·∫°n c√≥ th·ªÉ ch·ªânh
            grid=5,       # s·ªë ƒëi·ªÉm spline
            k=3,          # spline b·∫≠c 3 (cubic)
            seed=seed
        )

        # ======================
        # QUANTUM LAYER
        # ======================
        self.q_layer = qml.qnn.TorchLayer(qnode, weight_shapes)

        # ======================
        # OUTPUT LAYER
        # ======================
        self.fc_out = nn.Linear(n_qubits, 1)
        nn.init.xavier_uniform_(self.fc_out.weight)
        nn.init.zeros_(self.fc_out.bias)

    def forward(self, x):
        x = self.kan(x)        # ‚¨ÖÔ∏è KAN thay MLP
        x = self.q_layer(x)    # ‚¨ÖÔ∏è Quantum layer
        x = self.fc_out(x)
        return torch.sigmoid(x)


In [9]:
# ============================================================
# 6. CLASS WEIGHTS (Balanced + Keras-style smoothing)
# ============================================================

# Chuy·ªÉn y_train v·ªÅ numpy
y_train_np = y_train.numpy().flatten()

# Balanced class weights
class_weights = compute_class_weight(
    class_weight="balanced",
    classes=np.unique(y_train_np),
    y=y_train_np
)

# Keras-style smoothing
alpha_smooth = 0.7   # 0.7‚Äì0.9 ƒë·ªÅu h·ª£p l√Ω
class_weight_dict = {
    i: float(1 + alpha_smooth * (w - 1))
    for i, w in enumerate(class_weights)
}

print("Class Weights Dict:", class_weight_dict)


Class Weights Dict: {0: 1.4775193798449613, 1: 0.7980327868852459}


In [10]:
# ============================================================
# 7. TRAINING SETUP
# ============================================================

model = HybridKANQuantumModel(X_train.shape[1])

optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = BinaryFocalLoss(alpha=0.3, gamma= 1.0)

# Early stopping
patience = 1000
best_loss = np.inf
counter = 0
best_state = None


checkpoint directory created: ./model
saving model version 0.0


In [11]:

# ============================================================
# 8. TRAINING LOOP
# ============================================================

epochs = 200
for epoch in range(epochs):
    # --- TRAINING PHASE ---
    model.train()
    train_loss = 0

    for x, y in train_loader:
        optimizer.zero_grad()
        out = model(x)
        
        # 1. T√≠nh raw loss (d·∫°ng vector, v√¨ reduction='none' trong criterion)
        raw_loss = criterion(out, y) 
        
        # 2. L·∫•y tr·ªçng s·ªë cho t·ª´ng m·∫´u trong batch t·ª´ class_weight_dict
        batch_weights = torch.tensor([class_weight_dict[int(yi.item())] for yi in y], dtype=torch.float32)
        
        # 3. Nh√¢n tr·ªçng s·ªë v√† t√≠nh trung b√¨nh ƒë·ªÉ ra m·ªôt s·ªë th·ª±c (Scalar)
        # unsqueeze(1) gi√∫p batch_weights c√≥ k√≠ch th∆∞·ªõc [32, 1] kh·ªõp v·ªõi raw_loss
        loss = (raw_loss * batch_weights.unsqueeze(1)).mean()
        
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * x.size(0)

    train_loss /= len(train_loader.dataset)

    # --- VALIDATION PHASE (ƒê√É S·ª¨A L·ªñI) ---
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for x, y in val_loader:
            out = model(x)
            
            # T√≠nh raw loss (vector)
            raw_loss = criterion(out, y)
            
            # L·∫•y tr·ªçng s·ªë t∆∞∆°ng t·ª± nh∆∞ training ƒë·ªÉ t√≠nh loss c√¥ng b·∫±ng
            batch_weights = torch.tensor([class_weight_dict[int(yi.item())] for yi in y], dtype=torch.float32)
            
            # Nh√¢n tr·ªçng s·ªë v√† t√≠nh trung b√¨nh (Scalar)
            loss = (raw_loss * batch_weights.unsqueeze(1)).mean()
            
            # C·ªông d·ªìn loss (l√∫c n√†y loss.item() ho·∫°t ƒë·ªông b√¨nh th∆∞·ªùng v√¨ loss l√† scalar)
            val_loss += loss.item() * x.size(0)

    val_loss /= len(val_loader.dataset)
    print(f"Epoch {epoch+1}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}")

    # # --- EARLY STOPPING ---
    # if val_loss < best_loss:
    #     best_loss = val_loss
    #     best_state = model.state_dict()
    #     counter = 0
    # else:
    #     counter += 1
    #     if counter >= patience:
    #         print("Early stopping")
    #         break

# Load l·∫°i tr·ªçng s·ªë t·ªët nh·∫•t sau khi training xong
# model.load_state_dict(best_state)

Epoch 1: train_loss=0.1811, val_loss=0.1777
Epoch 2: train_loss=0.1741, val_loss=0.1692
Epoch 3: train_loss=0.1680, val_loss=0.1613
Epoch 4: train_loss=0.1632, val_loss=0.1542
Epoch 5: train_loss=0.1585, val_loss=0.1480
Epoch 6: train_loss=0.1546, val_loss=0.1426
Epoch 7: train_loss=0.1518, val_loss=0.1377
Epoch 8: train_loss=0.1492, val_loss=0.1337
Epoch 9: train_loss=0.1477, val_loss=0.1303
Epoch 10: train_loss=0.1460, val_loss=0.1272
Epoch 11: train_loss=0.1447, val_loss=0.1245
Epoch 12: train_loss=0.1436, val_loss=0.1217
Epoch 13: train_loss=0.1417, val_loss=0.1192
Epoch 14: train_loss=0.1402, val_loss=0.1169
Epoch 15: train_loss=0.1387, val_loss=0.1148
Epoch 16: train_loss=0.1374, val_loss=0.1126
Epoch 17: train_loss=0.1357, val_loss=0.1104
Epoch 18: train_loss=0.1336, val_loss=0.1090
Epoch 19: train_loss=0.1323, val_loss=0.1073
Epoch 20: train_loss=0.1306, val_loss=0.1062
Epoch 21: train_loss=0.1293, val_loss=0.1056
Epoch 22: train_loss=0.1285, val_loss=0.1043
Epoch 23: train_los

In [12]:

# ============================================================
# 9. EVALUATION
# ============================================================

model.eval()
y_probs, y_true = [], []

with torch.no_grad():
    for x, y in test_loader:
        out = model(x)
        y_probs.extend(out.numpy())
        y_true.extend(y.numpy())

y_probs = np.array(y_probs)
y_true = np.array(y_true)
y_pred = (y_probs >= 0.5).astype(int)

tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

print("\n=== METRICS ===")
print(f"Accuracy: {accuracy_score(y_true, y_pred):.4f}")
print(f"Precision: {precision_score(y_true, y_pred):.4f}")
print(f"Recall: {recall_score(y_true, y_pred):.4f}")
print(f"Specificity: {tn / (tn + fp):.4f}")
print(f"F1-score: {f1_score(y_true, y_pred):.4f}")
print(f"Balanced Accuracy: {balanced_accuracy_score(y_true, y_pred):.4f}")
print(f"ROC AUC: {roc_auc_score(y_true, y_probs):.4f}")
print(f"PR AUC: {average_precision_score(y_true, y_probs):.4f}")
print(f"MCC: {matthews_corrcoef(y_true, y_pred):.4f}")
print(f"Cohen Kappa: {cohen_kappa_score(y_true, y_pred):.4f}")
print(f"Brier Score: {brier_score_loss(y_true, y_probs):.4f}")


=== METRICS ===
Accuracy: 0.9041
Precision: 0.8511
Recall: 1.0000
Specificity: 0.7879
F1-score: 0.9195
Balanced Accuracy: 0.8939
ROC AUC: 0.9422
PR AUC: 0.9350
MCC: 0.8189
Cohen Kappa: 0.8028
Brier Score: 0.0860


In [13]:
# # ======================
# # KAN VISUALIZATION
# # ======================
# model.kan.plot()


In [14]:


# def plot_learned_function(model, X, feature_idx, n_points=200):
#     model.eval()

#     x_min = X[:, feature_idx].min()
#     x_max = X[:, feature_idx].max()
#     x_vals = torch.linspace(x_min, x_max, n_points)

#     X_base = X.mean(dim=0).repeat(n_points, 1)
#     X_base[:, feature_idx] = x_vals

#     with torch.no_grad():
#         y_vals = model(X_base).squeeze()

#     return x_vals.cpu().numpy(), y_vals.cpu().numpy()


In [15]:
# x, y = plot_learned_function(model, X_train, feature_idx=2)

# plt.plot(x, y)
# plt.xlabel("Feature 2 (scaled)")
# plt.ylabel("Model output")
# plt.title("Learned function for feature 2")
# plt.show()


### Importance sensitivity 

In [16]:
# def kan_feature_importance_sensitivity(model, X, eps=1e-2):
#     model.eval()
#     base_output = model(X).detach()

#     input_dim = X.shape[1]
#     importance = torch.zeros(input_dim)

#     for i in range(input_dim):
#         X_perturbed = X.clone()
#         X_perturbed[:, i] += eps

#         pert_output = model(X_perturbed).detach()
#         importance[i] = torch.mean(torch.abs(pert_output - base_output))

#     importance = importance / importance.sum()
#     return importance
# importance = kan_feature_importance_sensitivity(
#     model,
#     X_train
# )

# for name, score in zip(X_train_df.columns, importance):
#     print(f"{name}: {score:.3f}")


In [17]:
# # ============================
# # SIMPLIFY KAN (PRUNING)
# # ============================
# pruned_model = HybridKANQuantumModel(X_train.shape[1])
# pruned_model.load_state_dict(best_state)
# pruned_model.eval()

# with torch.no_grad():
#     _ = pruned_model(X_train)
# pruned_model.kan.prune()
# pruned_model.kan.plot()

# plt.savefig(
#     "kan_structure.png",
#     dpi=1000,                 # chu·∫©n thesis / paper
#     bbox_inches="tight"
# )
# plt.close()

In [18]:
# x, y = plot_learned_function(model, X_train, feature_idx=2)

# plt.plot(x, y)
# plt.xlabel("Feature 2 (scaled)")
# plt.ylabel("Model output")
# plt.title("Learned function for feature 2")
# plt.show()
