In [2]:
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt

from tqdm import tqdm

from sklearn.model_selection import train_test_split
from sklearn import neighbors
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import f1_score

#from ucimlrepo import fetch_ucirepo, list_available_datasets

from csv import reader
from math import sqrt
plt.rcParams['figure.figsize'] = [16, 10]

import random
from random import seed
from random import randrange
import requests
import io
    
# Downloading the csv file from your GitHub account

url = "https://raw.githubusercontent.com/Zuluke/Projetos-AM/main/Projeto_Chico/Dataset/mfeat-fac" # Make sure the url is the raw version of the file on GitHub
download = requests.get(url).content
fac = pd.read_csv(io.StringIO(download.decode('utf-8')),sep="\s+")

url = "https://raw.githubusercontent.com/Zuluke/Projetos-AM/main/Projeto_Chico/Dataset/mfeat-fou" # Make sure the url is the raw version of the file on GitHub
download = requests.get(url).content
fou = pd.read_csv(io.StringIO(download.decode('utf-8')),sep="\s+")

url = "https://raw.githubusercontent.com/Zuluke/Projetos-AM/main/Projeto_Chico/Dataset/mfeat-zer" # Make sure the url is the raw version of the file on GitHub
download = requests.get(url).content
zer = pd.read_csv(io.StringIO(download.decode('utf-8')),sep="\s+")

In [5]:
print(fac.head(),'\n')
print(fou.head(),'\n')
print(zer.head(),'\n')

    98  236  531  673  607  647   2   9  3  6  ...  474  536  628.1  632  \
0  121  193  607  611  585  665   7   9  2  4  ...  520  458    570  634   
1  115  141  590  605  557  627  12   6  3  3  ...  535  498    572  656   
2   90  122  627  692  607  642   0   6  4  5  ...  576  549    628  621   
3  157  167  681  666  587  666   8   6  1  4  ...  594  525    568  653   
4  128  224  799  690  653  620  16  22  8  9  ...  692  529    562  573   

   18.3  36  8.6  15.3  12.4  13.6  
0    15  32   11    13    15    11  
1    20  35   16    14    13     6  
2    16  35    7    12    15     9  
3    16  35   10    15    13    13  
4    24  17    7    10    14     6  

[5 rows x 216 columns] 

   0.06588172  0.19731169  0.10382563  0.27036171  0.61607757  0.03585575  \
0    0.049142    0.175971    0.105515    0.227095    0.599280    0.041217   
1    0.034172    0.227649    0.108766    0.127697    0.612494    0.056554   
2    0.062336    0.217979    0.080243    0.289592    0.546316   

In [None]:
class KFCM_K_W_1:
    def __init__(self, c, m=1.6, epochs=100, tol=1e-6, seed=0):
        # Hyperparameters
        self.m = m
        self.epochs = epochs
        self.tol = tol

        # c: number of clusters
        self.c = c

        self._zero = 1e-20
        self._rng = np.random.default_rng(seed)
        self._epoch = 0

        self._x = None
        self._y = None
        self._n = None
        self._p = None
        self._s = None
        self._u = None
        self._u_m = None
        self._u_m_kernel = None
        self._kernel = None
        self._denominator_j = None
        self._g = None
        self._j_new = None
        self._j_old = None
        return

    def fit(self, X, y=None):
        start_time = time()
        # Normalization
        scaler = MinMaxScaler()
        self._x = scaler.fit_transform(X)[:, :, np.newaxis]
        self._y = np.asarray(y)[:, np.newaxis]

        # n: number of instances
        # p: number of features
        self._n, self._p, _ = self._x.shape

        # Arrays initializations
        # s: array of width parameters 1/s^2, from Step 7
        self._s = np.ones((self.c, self._p, 1))
        self._u = np.zeros((self._n, self.c, 1))

        # Prototype selection, Step 8
        g_idx = self._rng.integers(0, self._n, self.c)
        self._g = self._x[g_idx]

        self._update_kernel()

        # Compute the membership degree, Step 10
        self._update_u()

        # Compute the objective function, Step 11
        self._update_j()

        for epoch in range(1, self.epochs + 1):
            self._epoch = epoch
            # Step 13
            self._j_old = self._j_new

            # Step 16
            self._update_s()

            # Step 19
            self._update_g()

            # Step 22
            self._update_u()

            # Step 23
            self._update_j()
            if abs(self._j_new - self._j_old) < self.tol:
                break
        end_time = time()
        print(f"Execution time: {round((end_time - start_time) / 60, 2)} minutes")

        return

    def predict(self, X):
        return

    def evaluate(self, metric):
        metrics = {
            "accuracy": self._evaluate_accuracy,
            "MPC": self._evaluate_modified_partition_coefficient,
            "rand": self._evaluate_adjusted_rand_score,
            "error": self._evaluate_error,
        }
        metric_function = metrics.get(metric, "error")
        return metric_function()

    def _update_kernel(self):
        """Equation 12."""
        # Step 1: Calculate squared Euclidean distances
        squared_distances = (self._x - self._g.T) ** 2

        # Step 2: Apply weights
        weighted_distances = squared_distances * self._s.T

        # Step 3: Sum across dimensions
        summed_distances = weighted_distances.sum(axis=1)

        # Step 4: Apply the exponential function
        exponential = np.exp(summed_distances * (-1 / 2))
        self._kernel = exponential[:, :, np.newaxis]
        return

    def _update_u(self):
        """Equation 16b."""

        self._denominator_j = 2 - 2 * self._kernel
        self._denominator_j = np.maximum(self._denominator_j, self._zero)

        numerator = np.swapaxes(self._denominator_j, 1, 2)

        division = (numerator / self._denominator_j) ** (1 / (self.m - 1))

        self._u = division.sum(axis=1) ** -1
        self._u = self._u[:, :, np.newaxis]

        self._u_m = self._u**self.m
        self._u_m_kernel = self._u_m * self._kernel
        return

    def _update_j(self):
        """Equation 13."""
        self._j_new = (self._u_m * self._denominator_j).sum()
        print(f"Epoch: {self._epoch:03d} | Objective function J: {self._j_new:.8f}")
        return

    def _update_s(self):
        """Equation 14b."""
        squared_distances = (self._x - self._g.T) ** 2

        squared_distances = np.swapaxes(squared_distances, 1, 2)

        denominator = (self._u_m_kernel * squared_distances).sum(axis=0)[
            :, :, np.newaxis
        ]

        denominator = np.log(denominator)

        numerator = denominator.sum(axis=1)[:, :, np.newaxis] * self._p**-1

        subtracted = numerator - denominator

        self._s = np.exp(subtracted)

        self._update_kernel()
        return

    def _update_g(self):
        """Equation 15b."""
        x = np.swapaxes(self._x, 1, 2)

        numerator = (self._u_m_kernel * x).sum(axis=0)

        denominator = self._u_m_kernel.sum(axis=0)

        self._g = (numerator / denominator)[:, :, np.newaxis]

        self._update_kernel()
        return

    def _evaluate_accuracy(self):
        pred = np.argmax(self._u, axis=1)
        y_with_pred = pd.DataFrame(np.concatenate((self._y, pred), axis=1))
        y_with_pred["value"] = 1
        pivot_table = pd.pivot_table(
            y_with_pred, columns=[0], index=[1], values="value", aggfunc="sum"
        )
        pivot_table = pivot_table.fillna(0).values

        n_i = pivot_table.sum(axis=1)[:, np.newaxis]
        p_ij = pivot_table / n_i
        p_i = p_ij.max(axis=1)[:, np.newaxis]
        acc = (n_i * p_i).sum(axis=0) / self._n
        return acc[0]

    def _evaluate_modified_partition_coefficient(self):
        pc = np.sum(self._u**2) / self._n
        mpc = 1 - (self.c / (self.c - 1)) * (1 - pc)
        return mpc

    def _evaluate_adjusted_rand_score(self):
        y_pred = np.argmax(self._u, axis=1)
        return adjusted_rand_score(self._y[:, 0], y_pred[:, 0])

    def _evaluate_error(self):
        raise ValueError("Metric not implemented")