# Fuzzy KNN modificado


## Datasets

In [11]:
import pandas as pd
from ipynb.fs.full.datasets import *

datasets = {"mnist_numbers" : mnist_numbers(), "mnist_with_clothes" : mnist_with_clothes(), "chinese_mnist" : chinese_mnist(),   \
            "wine" : wine(), "gamma_telescope" : gamma_telescope(),   \
            "image_segmentation": image_segmentation(), "digits_dataset": digits_dataset(), "breast_cancer": load_breast_cancer()}

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n",


## Algoritmo Fuzzy KNN modificado

In [12]:
import numpy as np
from scipy.stats import truncnorm
import json

def sample_truncnorm(mu, sigma, size):
    """
    Draw samples from a truncated normal distribution between 0 and 1.
    """
    a, b = (0 - mu) / sigma, (1 - mu) / sigma
    return truncnorm.rvs(a, b, loc=mu, scale=sigma, size=size)


def fuzzy_knn_modificado(X, fuzzy_y, sample, k, size, m):
    """
    Fuzzy KNN classifier:
    - X:      array-like, shape (n_samples, n_features)
    - fuzzy_y: DataFrame with shape (n_samples, n_classes), each cell JSON-encoded (mean, variance)
    - sample: array-like, shape (n_features,)
    - k:      number of neighbors
    - size:   number of samples per neighbor per class
    - m:      distance exponent parameter

    Returns:
    - u_true: array, fuzzy-membership probabilities for each class
    - classes: list of class labels
    - nn_info: array, shape (k,2) of neighbor indices and distances
    """
    # 1) compute all distances
    dists = np.linalg.norm(X - sample, axis=1)

    # 2) get k nearest neighbors
    nn_idx  = np.argsort(dists)[:k]
    nn_dists = dists[nn_idx]

    # 3) sample fuzzy-membership for each neighbor
    dic = {}
    for idx in nn_idx:
        samples_per_class = []
        for clase in fuzzy_y.columns:
            mu, var = json.loads(fuzzy_y.loc[idx, clase])
            if var < 1e-8:
                v = np.full(size, mu)
            else:
                v = sample_truncnorm(mu, var, size)
            samples_per_class.append(v)
            
        Y = np.vstack(samples_per_class)
        # normalize columns
        Y /= Y.sum(axis=0, keepdims=True)
        dic[idx] = Y
    # print(dic)

    # 4) compute inverse-distance weights without division-by-zero
    exp = 2.0 / (m - 1)
    # print(nn_dists)
    inv_dist = np.full_like(nn_dists, fill_value=1e4, dtype=float)
    # print(inv_dist)
    # only divide where distances != 0
    np.divide(
        1.0,
        nn_dists**exp,
        out=inv_dist,
        where=(nn_dists != 0)
    )
    # print(inv_dist)

    # 5) iterate over all combinations of draws
    cont = np.zeros(k, dtype=int)
    total_u = np.zeros(len(fuzzy_y.columns))

    def inc_counter(counter):
        for i in range(len(counter)):
            if counter[i] < size - 1:
                counter[i] += 1
                return True
            counter[i] = 0
        return False

    while True:
        # accumulate weighted memberships
        # u = np.zeros_like(total_u)
        for i, idx in enumerate(nn_idx):
            total_u += dic[idx][:, cont[i]] * inv_dist[i]
        
        # total_u += u / inv_dist.sum()
        # total_u += u
        if not inc_counter(cont):
            break
        # print(cont)

    # print(cont)
    total_u = total_u / inv_dist.sum()

    # normalize by total number of combinations
    u_true = total_u / (size ** k)

    nn_info = np.vstack((nn_idx, nn_dists)).T
    return u_true, list(fuzzy_y.columns), nn_info

## Intervalos de confianza

In [13]:
def t_student(data,confidence_level):

    """
    Funcion que dados un vector con valores, te calcula el intervalo de confianza mediante la T de student
    data : vector con los valores
    confidence_level : nivel de confianza del intervalo
    """

    import numpy as np
    from scipy.stats import t
    
    # Calculate sample mean and standard deviation
    mean = np.mean(data)
    std_dev = np.std(data, ddof=1)  # Use ddof=1 for sample standard deviation
    n = len(data)  # Sample size
    
    alpha = 1 - confidence_level
    
    # Degrees of freedom
    df = n - 1
    
    # t critical value for two-tailed test
    t_critical = t.ppf(1 - alpha/2, df)
    
    # Margin of error
    margin_of_error = t_critical * (std_dev / np.sqrt(n))
    
    # Confidence interval
    confidence_interval = (mean - margin_of_error, mean + margin_of_error)
    return confidence_interval

## Experimentos para el analisis de la Fase 2

In [14]:
import time 
import random
import numpy as np
import pandas as pd
from datetime import datetime
from ipynb.fs.full.datasets import * 
import logging
from sklearn.metrics import cohen_kappa_score

dataset_used = ["digits_dataset","breast_cancer","image_segmentation","gamma_telescope","wine"]
noise_levels = [0.0,0.15,0.3,0.45]
algorithms_used = ["SGDClassifier","gaussianNB","random_forest","mlp_classifier","logistic_regression"]

def experimentos_no_realizados():
    
    """
    Función que parte de una base de experimentos en un excel llamado "experimentos.xlsx", y va realizando los experimentos no empezados
    de forma secuencial
    """

    cwd = 'C:\\Users\\Galo\\Documents\\Matematicas\\4 Mates\\TFG\\Results fase 2\\'
   
    experimentos = pd.read_excel("analisis_fase_2.xlsx")
    experimentos_no_hechos = experimentos[experimentos["FECHA"].isna()]
    # print(experimentos_no_hechos.head())

    # Create a logger
    logger = logging.getLogger("my_logger")
    logger.setLevel(logging.DEBUG)  # Set the logging level
    
    # Create a FileHandler to write logs to a file
    file_handler = logging.FileHandler("analisis_fase_2.log")
    file_handler.setLevel(logging.DEBUG)  # Set handler logging level
    
    # Create a log message format
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)  # Attach formatter to handler
    
    # Add handler to the logger
    logger.addHandler(file_handler)
    
    for index, row in experimentos_no_hechos.iterrows():

        #Datasets elegidos para el experimento
        if experimentos.at[index, "DATASETS"] == "all":
            # Se genera una lista con los nombres de todos los datasets
            datasets_exp = list(datasets.keys())
        else:
            # Obtenemos una lista con los nombres de los datasets seleccionados para ese experimento
            datasets_exp = experimentos_no_hechos.at[index,"DATASETS"].split(",")

        # Parametros que utilizaremos para el experimento
        parametros = [float(num) for num in str(experimentos_no_hechos.at[index,"PARAMETROS"]).split(",")]
        iter = [int(num) for num in str(experimentos_no_hechos.at[index,"ITERACIONES"]).split(",")]
        # print(iter)
        # print(parametros)
        # print(experimentos_no_hechos.at[index,"METODO"])

        #Metodo que utilizaremos durante el experimento
        metodo = experimentos_no_hechos.at[index,"METODO"]

        #Empezamos el experimento apuntando en el archivo ".log" lo que vamos a realizar
        exp_str = f"Empezamos el analisis de la fase 2 con el método " + metodo + " y parametros " + experimentos_no_hechos.at[index,"PARAMETROS"]
        logger.critical(exp_str)

        #Tiempo actual
        now = datetime.now()
        date_time = now.strftime("%m/%d/%Y")

        #Parámetros
        noise = parametros[0]
        k = int(parametros[1])
        size = int(parametros[2])
        m = int(parametros[3])
        

        repes = iter[0]
        repes2 = iter[1]
        
        fuzzy_y = pd.read_csv(cwd + "normal_" + datasets_exp[0] + "_" + str(noise) + "_" + metodo + ".csv")
        df = datasets[datasets_exp[0]]
        X_columns = [c for c in df.columns if c!="label"] 
        y = df["label"].to_numpy() #Etiquetas
        X = df[X_columns] #Atributos
        results = {}
        kappas_results = {"kappas" : []}
        for j in range(repes):
            logger.info(f"Iteracion {j}")
            for i in range(repes2):
                logger.debug(f"{i}")
                row_to_drop = random.choice(X.index)
                sample = X.iloc[row_to_drop,].to_numpy()
                X_mod = X.drop(index=row_to_drop).to_numpy()
                fuzzy_y_mod = fuzzy_y.drop(index=row_to_drop).reset_index(drop=True)
                
                u, clases, k_vectors = fuzzy_knn_modificado(X_mod, fuzzy_y_mod, sample, k, size, m)
                index_max = np.argmax(u)

                clase_predicha = int(clases[index_max])
                clase_real = int(y[row_to_drop])

                if i == 0 and j == 0:
                    for t in range(len(clases)):
                        results[clases[t]] = [u[t]] 
                    results["predicha"] = [clase_predicha]
                    results["real"] = [clase_real]
                else:
                    for t in range(len(clases)):
                        results[clases[t]].append(u[t])
                    results["predicha"].append(clase_predicha)
                    results["real"].append(clase_real)
                # print(results)
                # print([clase_predicha,clase_real])
            kappa = cohen_kappa_score(results["predicha"][-100:], results["real"][-100:])
            kappas_results["kappas"].append(kappa) 
        df_results = pd.DataFrame(results)
        filename = datasets_exp[0] + "_" + str(noise) + "_" + metodo
        df_results.to_csv(filename + ".csv", index=False)   
        # Write dictionary to a JSON file
        kappas_results["IC"] = t_student(kappas_results["kappas"],0.05)
        with open(filename + ".json", "w") as json_file:
            json.dump(kappas_results, json_file, indent=4)
            
        # Actualizamos el dataframe con el registro de los experimentos
        experimentos.loc[index, "FECHA"] = date_time  
        experimentos.loc[index, "NOMBRE ARCHIVO RESULTADOS"] = filename

        #Una vez finalizado el experimento, actualizamos el excel con los resultados obtenidos
        experimentos.to_excel("analisis_fase_2.xlsx", index=False)

    file_handler.close()
    logger.removeHandler(file_handler)
    print("Terminados todos los experimentos")
    
    return 1

In [15]:
experimentos_no_realizados()

Terminados todos los experimentos


1

## Funcion para cerrar los handlers cuando surga un error en el código (no forma parte del programa si no hay fallos)

In [6]:
import logging

def close_all_handlers():
    """Closes and removes all handlers from all loggers."""
    for logger_name in logging.root.manager.loggerDict:
        logger = logging.getLogger(logger_name)
        if hasattr(logger, "handlers"):  # Check if the logger has handlers
            for handler in logger.handlers[:]:  # Iterate over a copy of the handlers list
                handler.close()
                logger.removeHandler(handler)

# Example usage
close_all_handlers()