# Initialization. Load previous state. Load modules

In [1]:
import dill
# Restore the entire session
#dill.load_session('Finetune_ptr_uri_a_can.db')

In [2]:
#check last result on disk
#dir()

In [3]:
import json

import pandas as pd
import numpy as np
import numpy as cp
#import cupy as cp

from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import confusion_matrix

from xgboost import XGBClassifier
from tabpfn import TabPFNClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.svm import SVC

In [4]:
#PLS-DA, implemented using sklearn PLS and PDA functions
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.cross_decomposition import PLSRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted

class PLSDA_Classifier(BaseEstimator, ClassifierMixin):
    def __init__(self, n_components=2):
        """
        A classifier that combines PLS for dimensionality reduction
        followed by LDA for classification.
        
        Parameters:
        ----------
        n_components: int
            Number of PLS components to use.
        """
        self.n_components = n_components
    
    def fit(self, X, y):
        """
        Fit the PLSDA classifier.

        Parameters:
        ----------
        X : array-like, shape (n_samples, n_features)
            Training data.

        y : array-like, shape (n_samples,)
            Target labels.
        """
        # Validate input
        X, y = check_X_y(X, y)
        self.classes_ = np.unique(y)
        
        # Fit the PLS for dimensionality reduction
        self.pls_ = PLSRegression(n_components=self.n_components)
        X_reduced = self.pls_.fit(X, y).x_scores_

        # Fit the LDA for classification
        self.lda_ = LinearDiscriminantAnalysis()
        self.lda_.fit(X_reduced, y)
        
        return self

    def predict(self, X):
        """
        Predict labels for given data.

        Parameters:
        ----------
        X : array-like, shape (n_samples, n_features)
            Input data.

        Returns:
        -------
        y_pred : array, shape (n_samples,)
            Predicted labels.
        """
        # Check if the model is fitted
        check_is_fitted(self, ["pls_", "lda_"])
        
        # Transform the data with PLS
        X = check_array(X)
        X_reduced = self.pls_.transform(X)
        
        # Predict using LDA
        return self.lda_.predict(X_reduced)

    def predict_proba(self, X):
        """
        Predict class probabilities for X.

        Parameters:
        ----------
        X : array-like, shape (n_samples, n_features)
            Input data.

        Returns:
        -------
        proba : array, shape (n_samples, n_classes)
            Predicted probabilities.
        """
        # Check if the model is fitted
        check_is_fitted(self, ["pls_", "lda_"])
        
        # Transform the data with PLS
        X = check_array(X)
        X_reduced = self.pls_.transform(X)
        
        # Predict probabilities using LDA
        return self.lda_.predict_proba(X_reduced)


In [5]:
def predict_class_by_label(IDs, y, X, classifier, param_grid={}, cv=5, gpu=False, verbose=True):
    """
    Calculates predictions for test sets, each with a unique label in IDs, and selects the best parameters using GridSearchCV.

    Args:
        IDs: A pandas Series containing labels.
        y: A pandas Series or numpy array containing target labels.
        X: A pandas DataFrame or numpy array containing features.
        classifier: A scikit-learn classifier class.
        param_grid: A dictionary specifying parameter options for GridSearchCV.
        cv: Number of folds for cross-validation. If cv=0 it directly tunes a single model
        gpu: If True, move X to gpu before training.
        verbose: If True, prints progress and metrics.

    Returns:
        predictions: Predictions for y as test sets.
        classification error.
        best_params_per_label: Dictionary with the best parameters for each label.
    """
    unique_labels = IDs.unique()
    best_params_per_label = {}
    
    # Ensure X and y are numpy arrays for compatibility
    if not isinstance(X, np.ndarray):
        X = X.to_numpy()
    if not isinstance(y, np.ndarray):
        y = y.to_numpy()

    predictions = y[:]*0

    for label in unique_labels:
        mask = (IDs == label)
            
        if gpu:
            X_train, X_test = cp.array(X[~mask]), cp.array(X[mask])
        else:
            X_train, X_test = X[~mask], X[mask]            
        y_train, y_test = y[~mask], y[mask]
        
        if cv>1:
            # Use GridSearchCV to find the best parameters
            grid_search = GridSearchCV(
                estimator=classifier(),
                param_grid=param_grid,
                scoring='accuracy',
                cv=cv,
                verbose=verbose
            )
            grid_search.fit(X_train, y_train)
            best_model = grid_search.best_estimator_

            # Store the best parameters
            best_params_per_label[label] = grid_search.best_params_

        else:
            best_model = classifier(**param_grid)
            best_model.fit(X_train, y_train)
            best_params_per_label[label] = param_grid
    
        # Predict on the test set
        predictions[mask] = best_model.predict(X_test)
 
        if verbose:
            print(f"Label: {label}, Best Params: {best_params_per_label[label]}")
            print(f"Test Set error for label {label}: {round(1.0-accuracy_score(y_test, predictions[mask]), 4)}")

    # Final metrics
    final_error = 1.0-accuracy_score(y, predictions)
    
    if verbose:
        print(f"\nOverall Classification Error: {round(final_error, 4)}")

    return predictions, final_error, best_params_per_label


# Load datasets froms json file

In [6]:
# Load data from JSON
with open('data_classif.json', 'r') as json_file:
    loaded_datasets = json.load(json_file)


In [7]:
# Load data from JSON
with open('data_gc_classif.json', 'r') as json_file:
    loaded_datasets = json.load(json_file)


In [6]:
# Load data from JSON
with open('data_classif_breath.json', 'r') as json_file:
    loaded_datasets = json.load(json_file)


# Train methods

## All datasets

In [7]:
#PTRMS
#datasets = ["Tea", "Gum2", "Gum3", "Cafe", "Ham", "Pesce", "Spinaci", "Peperoncini", "Funghi13", "Funghi20", "Funghi21","Urine","Smoker","CancerTypes","Lab"]
#GC
#datasets = ["Funghi", "Peperoncini", "Spinaci", "Fragole","Nocciole"]
#Breath
#datasets = ["Mentine","Forest","Etnia"]

datasets = ["Etnia"]

# Dictionary to store results
results = {dataset: {} for dataset in datasets}

# Define model parameters

models = {
    "RF": {
        "classifier": RandomForestClassifier,
        "param_grid": {
            'n_estimators': 1000,
            'max_features': 'sqrt'
        },
        "cv": 0,
        "gpu": False
    },
    "PDA": {
        "classifier": LinearDiscriminantAnalysis,
        "param_grid": {
            'solver':'lsqr', 
            'shrinkage':'auto'
        },
        "cv": 0,
        "gpu": False
    },
    "XGB": {
        "classifier": XGBClassifier,
        "param_grid": {
            'device': ['cpu'],
            'n_estimators': [1000], 
            'eta': [0.25,0.10,0.05], 
            'max_depth': [1,3,5], 
            'subsample': [1,0.8,0.6], 
            'colsample_bytree': [1,0.75,0.5]
        },
        "cv": 3,
        "gpu": False
    },
    "SVM": {
        "classifier": SVC,
        "param_grid": {
            'kernel':['linear'],
            'C':[0.01,0.1,1.0,10,100,1000,10000,100000],
            'degree':[3],
            'gamma':['scale']
        },
        "cv": 3,
        "gpu": False
    },
    "PLS-DA": {
        "classifier": PLSDA_Classifier,
        "param_grid": {
            'n_components':[2,3,4,5,6,7,8]
        },
        "cv": 3,
        "gpu": False
    },
    "tabPFN": {
        "classifier": TabPFNClassifier,
        "param_grid": {
            'device': 'cuda',
            'ignore_pretraining_limits':True
        },
        "cv": 0,
        "gpu": False
    }
}

from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

# Loop through datasets
for dataset in datasets:
    this_data = loaded_datasets[dataset]
    
    IDs = pd.Series(this_data["IDs"])
    y = np.array(this_data["y"])
    X = pd.DataFrame(this_data["X"])
    X = scaler.fit_transform(X)
    
    for model_name, model_info in models.items():
        predict = predict_class_by_label(
            cv=model_info["cv"], 
            gpu=model_info["gpu"], 
            IDs=IDs, 
            y=y, 
            X=X, 
            classifier=model_info["classifier"], Etnia        | 0.3117          | 0.4026          | 0.3117          | 0.3766          | 0.4026          | 0.3636         

            param_grid=model_info["param_grid"], 
            verbose=False
        )
        results[dataset][model_name] = predict
        print(f"{dataset}-{model_name}: {round(predict[1],4)}")

Etnia-RF: 0.3247
Etnia-PDA: 0.4416
Etnia-XGB: 0.3117
Etnia-SVM: 0.3636
Etnia-PLS-DA: 0.3896
Etnia-tabPFN: 0.2987


In [8]:
dill.dump_session('Comp_class_breath_2.db')

In [9]:
# Print header
print("Dataset".ljust(12) + " | " + " | ".join(model_name.ljust(15) for model_name in models))

# Print separator line
print("-" * (12 + 3 + len(models) * 18))

# Print results
for dataset in datasets:
    result_line = f"{dataset.ljust(12)} | " + " | ".join(f"{round(results[dataset][model_name][1],4):<15}" for model_name in models)
    print(result_line)


Dataset      | RF              | PDA             | XGB             | SVM             | PLS-DA          | tabPFN         
---------------------------------------------------------------------------------------------------------------------------
Etnia        | 0.3247          | 0.4416          | 0.3117          | 0.3636          | 0.3896          | 0.2987         


In [10]:
print("---------------------------")
print("Confusion matrices")
print("---------------------------")

for dataset in datasets:
    this_data = loaded_datasets[dataset]    
    y = np.array(this_data["y"])
    for model_name in models:
        confusion_matrix = pd.crosstab(y, results[dataset][model_name][0], rownames=['Actual'], colnames=['Predicted'])
        # Print the confusion matrix with a title
        print(f"Confusion Matrix for {dataset}-{model_name}:")
        print(confusion_matrix)
        print("\n")  # Add a newline for better readability


---------------------------
Confusion matrices
---------------------------
Confusion Matrix for Etnia-RF:
Predicted   0   1
Actual           
0          26  13
1          12  26


Confusion Matrix for Etnia-PDA:
Predicted   0   1
Actual           
0          22  17
1          17  21


Confusion Matrix for Etnia-XGB:
Predicted   0   1
Actual           
0          26  13
1          11  27


Confusion Matrix for Etnia-SVM:
Predicted   0   1
Actual           
0          23  16
1          12  26


Confusion Matrix for Etnia-PLS-DA:
Predicted   0   1
Actual           
0          25  14
1          16  22


Confusion Matrix for Etnia-tabPFN:
Predicted   0   1
Actual           
0          27  12
1          11  27


