# ANFIS

Experimented with this, but didn't perform well so didn't use it in the final model or report discussion.
 
## Prepare the Data
### Imports

In [1]:
from pathlib import Path
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, fbeta_score, confusion_matrix, classification_report


### Load Data

In [2]:
# Folder path
path = Path("../data/selected")

# Load data (already split)
X_train = pd.read_csv(path / "X_train_selected.csv")
X_test = pd.read_csv(path / "X_test_selected.csv")
y_train = pd.read_csv(path / "y_train.csv")
y_test = pd.read_csv(path / "y_test.csv")

# Drop the unwanted index-like column
if "Unnamed: 0" in X_train.columns:
    X_train = X_train.drop(columns=["Unnamed: 0"])
if "Unnamed: 0" in X_test.columns:
    X_test = X_test.drop(columns=["Unnamed: 0"])
y_train = y_train["Diagnosis"]
y_test = y_test["Diagnosis"]

print(X_train.shape)
print(y_train.shape)
print(X_train.columns)

(1934, 27)
(1934,)
Index(['num__ADL', 'num__AlcoholConsumption', 'num__CholesterolHDL',
       'num__CholesterolLDL', 'num__FunctionalAssessment', 'num__MMSE',
       'num__SleepQuality', 'cat__BehavioralProblems_0',
       'cat__BehavioralProblems_1', 'cat__CardiovascularDisease_0',
       'cat__CardiovascularDisease_1', 'cat__Depression_0',
       'cat__Depression_1', 'cat__Diabetes_0', 'cat__Diabetes_1',
       'cat__Disorientation_0', 'cat__Disorientation_1',
       'cat__EducationLevel_0', 'cat__EducationLevel_1',
       'cat__EducationLevel_2', 'cat__EducationLevel_3', 'cat__Hypertension_0',
       'cat__Hypertension_1', 'cat__MemoryComplaints_0',
       'cat__MemoryComplaints_1', 'cat__Smoking_0', 'cat__Smoking_1'],
      dtype='object')


### Select Features

In [3]:
fis_features = [
    "num__MMSE",
    "num__FunctionalAssessment",
    "num__ADL",
]

    # "num__CholesterolHDL",
    # "num__SystolicBP",

X_train_fis = X_train[fis_features].copy()
X_test_fis = X_test[fis_features].copy()

## Prepare the Model

DECISIONS:
- k = 8
- Gaussian
- First order consequent
- use k-means to develop rule base 
- HP:
    - n_epochs = 30
    - lr = 0.01
    - Full-batch

In [4]:
class AnfisTS:
    """
    Takagi–Sugeno ANFIS with:
      - Gaussian membership functions
      - First-order consequents
      - K-means rule initialisation
      - Hybrid learning: LS for consequents + GD for premises
    """

    def __init__(self,
                 n_inputs,
                 n_rules=8,
                 n_epochs=30,
                 lr=0.01,
                 min_sigma=1e-2,
                 random_state=None):
        self.n_inputs = n_inputs
        self.n_rules = n_rules
        self.n_epochs = n_epochs
        self.lr = lr
        self.min_sigma = min_sigma
        self.random_state = random_state

        # Premise parameters (to be initialised in fit)
        self.centers_ = None    # shape (R, d)
        self.sigmas_ = None     # shape (R, d)

        # Consequent parameters (to be learned)
        # shape (R, d+1): [bias, w1, ..., wd] for each rule
        self.consequents_ = None

        # For monitoring
        self.loss_history_ = []

    # Internal helpers 

    def _check_X(self, X):
        X = np.asarray(X, dtype=float)
        if X.ndim != 2:
            raise ValueError("X must be a 2D array of shape (n_samples, n_features)")
        if X.shape[1] != self.n_inputs:
            raise ValueError(
                f"Expected {self.n_inputs} features, got {X.shape[1]}"
            )
        return X

    def _init_premise_params(self, X):
        """
        Initialise Gaussian MF centres via K-means,
        and sigmas via global feature std.
        """
        X = self._check_X(X)

        # K-means to get rule centres
        km = KMeans(n_clusters=self.n_rules, random_state=self.random_state)
        km.fit(X)
        centers = km.cluster_centers_  # (R, d)

        # Global std per feature; reused for all rules
        global_std = X.std(axis=0, ddof=0)
        # Avoid zero std
        global_std[global_std < self.min_sigma] = self.min_sigma
        sigmas = np.tile(global_std, (self.n_rules, 1))  # (R, d)

        self.centers_ = centers
        self.sigmas_ = sigmas

    def _gauss_membership(self, X):
        """
        Compute Gaussian membership values μ_ri(x_ni)
        for all samples, rules and features.

        Returns:
            mu: array, shape (N, R, d)
        """
        X = self._check_X(X)
        if self.centers_ is None or self.sigmas_ is None:
            raise RuntimeError("Premise parameters are not initialised.")

        eps = 1e-6
        centers = self.centers_
        sigmas = self.sigmas_

        N, d = X.shape
        R, d2 = centers.shape
        assert d == d2

        X_exp = X[:, None, :]       # (N, 1, d)
        C_exp = centers[None, :, :] # (1, R, d)
        S_exp = sigmas[None, :, :]  # (1, R, d)

        diff2 = (X_exp - C_exp) ** 2
        mu = np.exp(-diff2 / (2.0 * (S_exp ** 2 + eps)))  # (N, R, d)
        return mu

    def _compute_rule_weights(self, X):
        """
        Compute unnormalised rule firing strengths w_r(x_n)
        and their sum S_n for each sample.

        Returns:
            w: (N, R)
            S: (N,)
        """
        mu = self._gauss_membership(X)  # (N, R, d)
        w = mu.prod(axis=2)             # product over features → (N, R)
        S = w.sum(axis=1) + 1e-6        # (N,), avoid exact zero
        return w, S

    def _ls_consequents(self, X, y, w, S):
        """
        Least squares estimation of first-order TS consequents.

        For each sample n:
            y_hat_n = (Σ_r w_rn y_rn) / S_n
            where y_rn = a_r0 + Σ_i a_ri x_ni

        We rewrite this as a linear regression problem:
            z_n = S_n * y_n ≈ Σ_r w_rn y_rn
        and regress z on features [w_rn, w_rn x_n1, ..., w_rn x_nd] for all rules.
        """
        X = self._check_X(X)
        y = np.asarray(y, dtype=float).ravel()
        N, d = X.shape
        Nw, R = w.shape
        assert N == Nw

        # Design matrix
        X1 = np.concatenate([np.ones((N, 1)), X], axis=1)  # (N, d+1)
        # For each rule r, column block is w_rn * [1, x_1, ..., x_d]
        Phi_rule = w[:, :, None] * X1[:, None, :]          # (N, R, d+1)
        Phi = Phi_rule.reshape(N, R * (d + 1))             # (N, R*(d+1))

        # Target z = S * y
        z = S * y

        # Solve LS: Phi θ ≈ z
        theta, *_ = np.linalg.lstsq(Phi, z, rcond=None)
        consequents = theta.reshape(R, d + 1)              # (R, d+1)
        return consequents

    def _forward_output(self, X):
        """
        Compute ANFIS output ŷ(X) given current parameters.
        """
        X = self._check_X(X)
        if self.consequents_ is None:
            raise RuntimeError("Consequents are not estimated yet. Call fit() first.")

        w, S = self._compute_rule_weights(X)               # (N, R), (N,)
        X1 = np.concatenate([np.ones((X.shape[0], 1)), X], axis=1)  # (N, d+1)

        # y_rn = a_r0 + Σ_i a_ri x_ni
        y_rule = X1 @ self.consequents_.T                  # (N, R)

        # ŷ_n = Σ_r w_rn y_rn / S_n
        y_hat = (w * y_rule).sum(axis=1) / S               # (N,)
        return y_hat

    def _compute_premise_grads(self, X, y):
        """
        Compute MSE loss and gradients of loss w.r.t. centres and sigmas.
        """
        X = self._check_X(X)
        y = np.asarray(y, dtype=float).ravel()
        N, d = X.shape
        R, d2 = self.centers_.shape
        assert d == d2

        eps = 1e-6
        centers = self.centers_
        sigmas = self.sigmas_
        consequents = self.consequents_

        # Memberships, weights
        X_exp = X[:, None, :]       # (N, 1, d)
        C_exp = centers[None, :, :] # (1, R, d)
        S_exp = sigmas[None, :, :]  # (1, R, d)
        diff = X_exp - C_exp        # (N, R, d)
        diff2 = diff ** 2

        mu = np.exp(-diff2 / (2.0 * (S_exp ** 2 + eps)))  # (N, R, d)
        w = mu.prod(axis=2)                               # (N, R)
        S = w.sum(axis=1) + eps                           # (N,)

        # Forward TS output
        X1 = np.concatenate([np.ones((N, 1)), X], axis=1)     # (N, d+1)
        y_rule = X1 @ consequents.T                           # (N, R)
        y_hat = (w * y_rule).sum(axis=1) / S                  # (N,)

        e = y_hat - y
        loss = np.mean(e ** 2)

        # dL/dy_hat
        dL_dyhat = 2.0 * e / N                                # (N,)

        # dy_hat/dw_r = (y_r - y_hat) / S
        tmp = (y_rule - y_hat[:, None]) / S[:, None]          # (N, R)
        dL_dw = dL_dyhat[:, None] * tmp                       # (N, R)

        # dL/dmu_ri = dL/dw_r * dw_r/dmu_ri
        # w_r = Π_i mu_ri ⇒ dw_r/dmu_ri = w_r / mu_ri
        dL_dmu = dL_dw[:, :, None] * w[:, :, None] / (mu + eps)  # (N, R, d)

        # dμ/dc_ri = μ * (x_i - c_ri) / σ_ri^2
        dmu_dc = mu * (diff / (S_exp ** 2 + eps))
        # dμ/dσ_ri = μ * (x_i - c_ri)^2 / σ_ri^3
        dmu_dsigma = mu * (diff2 / (S_exp ** 3 + eps))

        # Sum over samples n → gradients per rule and feature
        dL_dc = (dL_dmu * dmu_dc).sum(axis=0)       # (R, d)
        dL_dsigma = (dL_dmu * dmu_dsigma).sum(axis=0)  # (R, d)

        return loss, dL_dc, dL_dsigma

    def fit(self, X, y, verbose=True):
        """
        Train ANFIS using hybrid learning:
          - K-means initialisation for premises
          - Per-epoch LS update for consequents
          - Per-epoch gradient descent for premise parameters
        """
        X = self._check_X(X)
        y = np.asarray(y, dtype=float).ravel()

        # 1. Initialise premise parameters by K-means on X
        if self.centers_ is None or self.sigmas_ is None:
            self._init_premise_params(X)

        for epoch in range(self.n_epochs):
            # 2. LS step for consequents
            w, S = self._compute_rule_weights(X)
            self.consequents_ = self._ls_consequents(X, y, w, S)

            # 3. Gradient step for premise parameters
            loss, grad_c, grad_sigma = self._compute_premise_grads(X, y)
            self.loss_history_.append(loss)

            # Update centres and sigmas
            self.centers_ -= self.lr * grad_c
            self.sigmas_ -= self.lr * grad_sigma
            # Keep sigmas positive and not too small
            self.sigmas_ = np.clip(self.sigmas_, self.min_sigma, None)

            # if verbose:
                # print(f"Epoch {epoch+1}/{self.n_epochs} - MSE: {loss:.6f}")

        return self

    def predict(self, X):
        """
        Predict continuous ANFIS outputs (risk scores).
        """
        X = self._check_X(X)
        return self._forward_output(X)


## Train the Model

In [5]:
n_inputs = X_train.shape[1]

anfis = AnfisTS(
    n_inputs=n_inputs,
    n_rules=8,
    n_epochs=30,
    lr=0.01,
    random_state=42,
)

anfis.fit(X_train, y_train, verbose=True)

# Example: get scores
y_train_score = anfis.predict(X_train)
y_test_score = anfis.predict(X_test)


## Select Threshold

In [6]:
# 1. 80/20 split from your training data
X_train_inner, X_val, y_train_inner, y_val = train_test_split(
    X_train,
    y_train,
    test_size=0.2,
    stratify=y_train,
    random_state=42,
)

print("Train inner shape:", X_train_inner.shape)
print("Validation shape:", X_val.shape)

# 2. Train ANFIS on the *inner* training set
n_inputs = X_train_inner.shape[1]

anfis = AnfisTS(
    n_inputs=n_inputs,
    n_rules=8,
    n_epochs=30,
    lr=0.01,
    random_state=42,
)

anfis.fit(X_train_inner, y_train_inner, verbose=True)

# 3. Validation and test scores
y_val_score = anfis.predict(X_val)
y_test_score = anfis.predict(X_test)

# 4. Threshold search on validation scores
thresholds = np.arange(0.0, 1.01, 0.01)

best_f2 = -np.inf
best_thresh = None

for t in thresholds:
    y_val_pred = (y_val_score >= t).astype(int)
    f2 = fbeta_score(y_val, y_val_pred, beta=2)
    if f2 > best_f2:
        best_f2 = f2
        best_thresh = t

print(f"\nBest threshold on validation: {best_thresh:.2f}")
print(f"Validation F2 at best threshold: {best_f2:.4f}")

# 5. Evaluate on test data with chosen threshold
test_auc = roc_auc_score(y_test, y_test_score)
print(f"\nTest ROC AUC (threshold-independent): {test_auc:.4f}")

y_test_pred = (y_test_score >= best_thresh).astype(int)

test_f2 = fbeta_score(y_test, y_test_pred, beta=2)
print(f"Test F2 at threshold {best_thresh:.6f}: {test_f2:.4f}")

print("\nConfusion matrix:")
print(confusion_matrix(y_test, y_test_pred))

print("\nClassification report:")
print(classification_report(y_test, y_test_pred, digits=3))


Train inner shape: (1547, 27)
Validation shape: (387, 27)

Best threshold on validation: 0.00
Validation F2 at best threshold: 0.7072

Test ROC AUC (threshold-independent): 0.6416
Test F2 at threshold 0.000000: 0.7238

Confusion matrix:
[[59 80]
 [11 65]]

Classification report:
              precision    recall  f1-score   support

           0      0.843     0.424     0.565       139
           1      0.448     0.855     0.588        76

    accuracy                          0.577       215
   macro avg      0.646     0.640     0.576       215
weighted avg      0.703     0.577     0.573       215

