<a href="https://colab.research.google.com/github/Huynhhieuvan/wmpdc-classifier/blob/main/WMPDC_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KernelDensity
from sklearn.preprocessing import RobustScaler
from google.colab import files
import pandas as pd

# Upload data file from local machine
uploaded = files.upload()
file_name = list(uploaded.keys())[0]
data = pd.read_csv(file_name)

# Automatically prepare data with RobustScaler and outlier clipping
X = data.iloc[:, :-1].values  # Features
y_str = data.iloc[:, -1].values  # Labels as strings
# Automatically create label_map and reverse_label_map from unique labels
unique_labels = np.unique(y_str)
label_map = {label: idx for idx, label in enumerate(unique_labels)}
reverse_label_map = {idx: label for label, idx in label_map.items()}
y = np.array([label_map[label] for label in y_str])  # Convert labels to numeric indices
scaler = RobustScaler()
X = scaler.fit_transform(X)
X = np.clip(X, -1.5, 1.5)

# 80/20 train-test split and data augmentation
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
noise = np.random.normal(0, 0.01, X_train.shape)
X_train = X_train + noise

# Adaptive KDE function with feature weighting
def adaptive_kde(data, eval_points, h_range, eigenvalues, q, alpha=0.9):
    eval_points = np.array(eval_points).reshape(-1)
    best_h = h_range[0]
    best_score = -np.inf
    # Feature weights with slower decay
    weights = np.exp(-np.arange(q) / (2 * q)) * (eigenvalues[:q] / np.sum(eigenvalues[:q]))
    for h in h_range:
        pilot_kde = KernelDensity(kernel='gaussian', bandwidth=h).fit(data.reshape(-1, 1))
        pilot_dens = np.exp(pilot_kde.score_samples(data.reshape(-1, 1)))
        geom_mean = np.exp(np.mean(np.log(pilot_dens + 1e-10)))
        lambda_i = (pilot_dens / geom_mean) ** (-alpha)
        h_local = h * lambda_i
        densities = np.zeros(len(eval_points))
        for i, point in enumerate(eval_points):
            kernels = np.exp(-0.5 * ((point - data) / h_local)**2) / (h_local * np.sqrt(2 * np.pi))
            densities[i] = np.mean(kernels) * weights[0]
        score = np.mean(densities)
        if score > best_score:
            best_score = score
            best_h = h
    pilot_kde = KernelDensity(kernel='gaussian', bandwidth=best_h).fit(data.reshape(-1, 1))
    pilot_dens = np.exp(pilot_kde.score_samples(data.reshape(-1, 1)))
    geom_mean = np.exp(np.mean(np.log(pilot_dens + 1e-10)))
    lambda_i = (pilot_dens / geom_mean) ** (-alpha)
    h_local = best_h * lambda_i
    densities = np.zeros(len(eval_points))
    for i, point in enumerate(eval_points):
        kernels = np.exp(-0.5 * ((point - data) / h_local)**2) / (h_local * np.sqrt(2 * np.pi))
        densities[i] = np.mean(kernels) * weights[0]
    return densities

def wmpdc_classifier(X_train, y_train, x_0, variance_threshold=0.91, gamma=0.001, max_iter=100, alpha=0.9):
    n, p = X_train.shape
    k = len(np.unique(y_train))

    # Step 1: No additional normalization required
    z_0 = x_0

    # Step 2: PCA with cumulative variance >= variance_threshold
    cov_matrix = np.cov(X_train.T)
    eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
    idx = np.argsort(eigenvalues)[::-1]
    eigenvalues = eigenvalues[idx]
    eigenvectors = eigenvectors[:, idx]
    cum_var = np.cumsum(eigenvalues) / np.sum(eigenvalues)
    q = np.argmax(cum_var >= variance_threshold) + 1
    print(f"Number of principal components retained: {q} (cumulative variance: {cum_var[q-1]:.2%})")
    P = eigenvectors[:, :q]
    Y = P.T @ X_train.T
    y_0 = P.T @ z_0

    # Prepare Y_full
    Y_full = np.hstack((y_0.reshape(-1, 1), Y))

    # Step 4: Density estimation
    densities = []
    for i in range(k):
        group_indices = np.where(y_train == i)[0]
        Y_group = Y[:, group_indices]
        if Y_group.shape[1] < 2:
            raise ValueError(f"Group {i} has too few samples.")

        f_components = []
        for feat in range(q):
            data_feat = Y_group[feat, :]
            n_group = len(data_feat)
            sigma = np.std(data_feat)
            h_base = 1.06 * sigma * n_group**(-1/5)
            h_range = [0.2 * h_base, 0.5 * h_base, h_base]
            f_components.append((data_feat, h_range))

        densities.append(f_components)

    # Step 5: Prior probabilities
    U = np.zeros((k, n+1))
    U[:, 0] = 1.0 / k
    for j in range(n):
        group_idx = y_train[j]
        U[group_idx, j+1] = 1.0

    m = 0
    WMPDC_m = np.inf
    V_prev = np.zeros((k, q))
    for i in range(k):
        mu_sq = U[i, :]**2
        sum_mu_sq = np.sum(mu_sq)
        V_prev[i, :] = np.sum(mu_sq[:, np.newaxis] * Y_full.T, axis=0) / sum_mu_sq

    while WMPDC_m >= gamma and m < max_iter:
        for j in range(n+1):
            dists = np.linalg.norm(V_prev - Y_full[:, j][np.newaxis, :], axis=1)
            for i in range(k):
                if dists[i] == 0:
                    U[:, j] = 0
                    U[i, j] = 1
                    break
                else:
                    ratios = dists[i] / dists
                    denom = np.sum(ratios**2)
                    U[i, j] = 1.0 / denom

        V = np.zeros((k, q))
        for i in range(k):
            mu_sq = U[i, :]**2
            sum_mu_sq = np.sum(mu_sq)
            V[i, :] = np.sum(mu_sq[:, np.newaxis] * Y_full.T, axis=0) / sum_mu_sq

        WMPDC_m = 0
        for j in range(n+1):
            d_prev = np.linalg.norm(V_prev - Y_full[:, j][np.newaxis, :], axis=1)
            d_curr = np.linalg.norm(V - Y_full[:, j][np.newaxis, :], axis=1)
            WMPDC_m += np.sum(U[:, j] * np.abs(d_curr - d_prev))
        WMPDC_m /= (n + 1)

        V_prev = V
        m += 1

    print(f"Number of iterations for prior probabilities computation: {m}")

    u = U[:, 0]

    # Step 6: Classify
    posteriors = []
    for i in range(k):
        f_i_y0 = 1.0
        for feat in range(q):
            data_feat, h_range = densities[i][feat]
            density = adaptive_kde(data_feat, [y_0[feat]], h_range, eigenvalues, q, alpha=alpha)[0]
            f_i_y0 *= density + 1e-10
        posteriors.append(u[i] * f_i_y0)

    predicted_group = np.argmax(posteriors)

    return predicted_group

# Predict on test set
predictions = []
for idx, x_0 in enumerate(X_test):
    predicted = wmpdc_classifier(X_train, y_train, x_0, variance_threshold=0.91, gamma=0.001, max_iter=100, alpha=0.9)
    predictions.append(predicted)

# Evaluate model performance
accuracy = np.mean(predictions == y_test)
print(f"Accuracy: {accuracy * 100:.2f}%")
print(f"Number of test samples: {len(y_test)}")
print(f"Correct predictions: {np.sum(predictions == y_test)}")
print(f"Predicted labels: {[reverse_label_map[p] for p in predictions]}")
print(f"True labels: {[reverse_label_map[t] for t in y_test]}")

Saving Wine dataset.csv to Wine dataset (1).csv
Number of principal components retained: 8 (cumulative variance: 92.83%)
Number of iterations for prior probabilities computation: 6
Number of principal components retained: 8 (cumulative variance: 92.83%)
Number of iterations for prior probabilities computation: 6
Number of principal components retained: 8 (cumulative variance: 92.83%)
Number of iterations for prior probabilities computation: 6
Number of principal components retained: 8 (cumulative variance: 92.83%)
Number of iterations for prior probabilities computation: 6
Number of principal components retained: 8 (cumulative variance: 92.83%)
Number of iterations for prior probabilities computation: 6
Number of principal components retained: 8 (cumulative variance: 92.83%)
Number of iterations for prior probabilities computation: 6
Number of principal components retained: 8 (cumulative variance: 92.83%)
Number of iterations for prior probabilities computation: 6
Number of principal c