# Tuning

In [1]:
import os
import random
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import preprocess_dataset as ut
from tfkan import DenseKAN

In [2]:
# Valore del seme causale
seed_value = 0

# Impostazione dei semi casuali per os, random, numpy e tensorflow
os.environ['PYTHONHASHSEED'] = str(seed_value)
random.seed(seed_value)
np.random.seed(seed_value)
tf.random.set_seed(seed_value)

Prepariamo il dataset per l'addestramento e selezioniamo un record di esempio

In [3]:
x_train = pd.read_csv("datasets/x_train.csv")   # Caricamento del dataset
y_train = pd.read_csv("datasets/y_train.csv")
x_test = pd.read_csv("datasets/x_test.csv")  
y_test = pd.read_csv("datasets/y_test.csv")

ut.standardize(x_train)                         # Standardizzazione
x_train = x_train.to_numpy()[:, 1:-1]
y_train = y_train.to_numpy()[:, -1]
ut.standardize(x_test)                   
x_test = x_test.to_numpy()[:, 1:-1]
y_test = y_test.to_numpy()[:, -1]

x_train = tf.convert_to_tensor(x_train, dtype=tf.float32)   # Conversione a tensore
y_train = tf.convert_to_tensor(y_train, dtype=tf.float32)
x_test = tf.convert_to_tensor(x_test, dtype=tf.float32)  
y_test = tf.convert_to_tensor(y_test, dtype=tf.float32)


In [None]:
class HyperKAN(HyperModel):
    def __init__(self, input_shape):
        self.input_shape = input_shape

    def build(self, hp):
        model = keras.Sequential()
        model.add(tf.keras.layers.Input(shape=self.input_shape))

        num_layers = hp.Int('num_layers', 1, 3)
        for i in range(num_layers):
            grid_range = hp.Float(f'grid_range_min_{i}', 1.0, 4.0)
            model.add(DenseKAN(
                units=hp.Int(f'units_{i}', 1, 16),
                grid_size=hp.Int(f'grid_size_{i}', 8, 32),
                grid_range=[
                    -grid_range,
                    grid_range
                ]
            ))
        
        model.add(DenseKAN(1))

        learning_rate = hp.Float('learning_rate', 1e-4, 1e-1, sampling='log')
        optimizer = hp.Choice('optimizer', ['adam', 'rmsprop', 'adadelta', 'adagrad'])
        
        opt = tf.keras.optimizers.get(optimizer)
        opt.learning_rate = learning_rate

        model.compile(
            optimizer=opt,
            loss='mean_absolute_error',
            metrics=[tf.keras.metrics.MeanAbsoluteError(name='mae')]
        )
        return model


def run_tuner(x_train, y_train, x_test, y_test, input_shape):
    hypermodel = HyperKAN(input_shape=input_shape)

    tuner = Hyperband(
        hypermodel,
        objective='val_loss',
        max_epochs=50,
        directory="./modelli_salvati",
        project_name='retina_kan'
    )

    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True
    )

    kfold = KFold(n_splits=5, shuffle=True, random_state=0)

    x_train = tf.convert_to_tensor(x_train)
    y_train = tf.convert_to_tensor(y_train)

    for fold, (train_indices, val_indices) in enumerate(kfold.split(x_train.numpy())):
        print(f"Fold {fold + 1}")
        
        x_train_fold = tf.gather(x_train, train_indices)
        x_val_fold = tf.gather(x_train, val_indices)
        y_train_fold = tf.gather(y_train, train_indices)
        y_val_fold = tf.gather(y_train, val_indices)

        tuner.search(
            x_train_fold, y_train_fold,
            epochs=100,
            validation_data=(x_val_fold, y_val_fold),
            callbacks=[early_stopping]
        )

    tuner.results_summary()
    best_model = tuner.get_best_models(num_models=1)[0]

    test_loss, test_mae = best_model.evaluate(x_test, y_test)
    print(f"Test MAE: {test_mae}")

    best_model.save('best_retina_kan_model.h5')

    return best_model

N_FEATURES = x_train.shape[1]
best_model = run_tuner(x_train, y_train, x_test, y_test, input_shape=(N_FEATURES,))

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras import layers, regularizers
from keras_tuner import RandomSearch, HyperModel
from sklearn.model_selection import KFold

class MyHyperModel(HyperModel):
    def __init__(self, input_shape):
        self.input_shape = input_shape

    def build(self, hp):
        model = keras.Sequential()
        model.add(tf.keras.layers.Input(shape=self.input_shape))

        num_layers = hp.Int('num_layers', 1, 5)
        for i in range(num_layers):
            model.add(layers.Dense(
                units=hp.Int(f'units_{i}', 4, 256, step=4),
                activation=hp.Choice(f'activation_{i}', ['relu', 'elu', 'selu']),
                kernel_regularizer=regularizers.l2(hp.Float(f'l2_{i}', 1e-4, 1e-2, sampling='log'))
            ))

            use_dropout = hp.Boolean(f'use_dropout_{i}')
            if use_dropout:
                model.add(layers.Dropout(rate=0.5))

        
        model.add(layers.Dense(1))

        learning_rate = hp.Float('learning_rate', 1e-4, 1e-1, sampling='log')
        optimizer = hp.Choice('optimizer', ['adam', 'rmsprop', 'adadelta', 'adagrad'])

        opt = tf.keras.optimizers.get(optimizer)
        opt.learning_rate = learning_rate

        model.compile(
            optimizer=opt,
            loss='mean_absolute_error',
            metrics=[tf.keras.metrics.MeanAbsoluteError(name='mae')]
        )
        return model


def run_tuner(x_train, y_train, x_test, y_test, input_shape):
    hypermodel = MyHyperModel(input_shape=input_shape)

    tuner = Hyperband(
        hypermodel,
        objective='val_loss',
        max_epochs=50,
        directory="./modelli_salvati",
        project_name='retina_mlp',

    )

    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True
    )

    kfold = KFold(n_splits=5, shuffle=True, random_state=0)

    x_train = tf.convert_to_tensor(x_train)
    y_train = tf.convert_to_tensor(y_train)

    for fold, (train_indices, val_indices) in enumerate(kfold.split(x_train.numpy())):
        print(f"Fold {fold + 1}")
        
        x_train_fold = tf.gather(x_train, train_indices)
        x_val_fold = tf.gather(x_train, val_indices)
        y_train_fold = tf.gather(y_train, train_indices)
        y_val_fold = tf.gather(y_train, val_indices)

        tuner.search(
            x_train_fold, y_train_fold,
            epochs=50,
            validation_data=(x_val_fold, y_val_fold),
            callbacks=[early_stopping]
        )

    tuner.results_summary()
    best_model = tuner.get_best_models(num_models=1)[0]

    test_loss, test_mae = best_model.evaluate(x_test, y_test)
    print(f"Test MAE: {test_mae}")

    best_model.save('best_retina_kan_model.h5')

    return best_model

# Assicurati che x_train, y_train, x_test, y_test siano definiti correttamente
N_FEATURES = x_train.shape[1]
best_model = run_tuner(x_train, y_train, x_test, y_test, input_shape=(N_FEATURES,))


In [8]:
import tensorflow as tf
from skopt import BayesSearchCV
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, VotingRegressor
from sklearn.svm import SVR
from sklearn.linear_model import BayesianRidge
from skopt.space import Real, Categorical, Integer
from sklearn.model_selection import StratifiedKFold
from skopt.callbacks import DeadlineStopper
from sklearn.preprocessing import RobustScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin


class TensorToNumpyTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return X.numpy() if isinstance(X, tf.Tensor) else X

def create_preprocessor():
    return Pipeline([
        ('to_numpy', TensorToNumpyTransformer()),
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', RobustScaler())
    ])

def configure_search(estimator, param_space, n_iter=100, cv=3, n_jobs=-1):
    return BayesSearchCV(
        estimator=estimator,
        search_spaces=param_space,
        n_iter=n_iter,
        cv=StratifiedKFold(n_splits=cv, shuffle=True, random_state=42),
        n_jobs=n_jobs,
        return_train_score=True,
        scoring=['neg_mean_squared_error', 'r2'],
        refit='neg_mean_squared_error',
        optimizer_kwargs={'base_estimator': 'GP'},
        random_state=42
    )

# Define parameter spaces
tree_param_space = {
    'regressor__max_depth': Integer(2, 30),
    'regressor__ccp_alpha': Real(0.0, 1.0),
    'regressor__min_samples_split': Integer(2, 20),
    'regressor__min_samples_leaf': Integer(1, 10),
}

forest_param_space = {
    'regressor__n_estimators': Integer(50, 500),
    'regressor__max_depth': Integer(2, 30),
    'regressor__min_samples_split': Integer(2, 20),
    'regressor__min_samples_leaf': Integer(1, 10),
    'regressor__max_features': Categorical(['sqrt', 'log2', None]),
    'regressor__bootstrap': Categorical([True, False]),
}

voting_param_space = {
    'estimators__tree__max_depth': Integer(2, 30),
    'estimators__tree__ccp_alpha': Real(0.0, 1.0),
    'estimators__tree__min_samples_split': Integer(2, 20),
    'estimators__svr__C': Real(0.1, 100, prior='log-uniform'),
    'estimators__svr__epsilon': Real(0.01, 1.0, prior='log-uniform'),
    'estimators__svr__kernel': Categorical(['linear', 'rbf', 'poly']),
    'estimators__svr__gamma': Real(1e-4, 1, prior='log-uniform'),
    'weights': Real(0, 1, prior='uniform'),
}

def create_pipeline(estimator):
    return Pipeline([
        ('preprocessor', create_preprocessor()),
        ('regressor', estimator)
    ])

# Assume x_train and y_train are your TensorFlow tensors
x_train_np = x_train.numpy() if isinstance(x_train, tf.Tensor) else x_train
y_train_np = y_train.numpy() if isinstance(y_train, tf.Tensor) else y_train

# Configure searches
tree_pipeline = create_pipeline(DecisionTreeRegressor(random_state=42))
forest_pipeline = create_pipeline(RandomForestRegressor(random_state=42))
voting_regressor = VotingRegressor(
    estimators=[
        ('tree', DecisionTreeRegressor(random_state=42)),
        ('svr', SVR()),
        ('bayes', BayesianRidge())
    ],
    weights=[1, 1, 1]  # Initialize with equal weights
)

tree_search = configure_search(tree_pipeline, tree_param_space)
forest_search = configure_search(forest_pipeline, forest_param_space)
voting_search = configure_search(voting_regressor, voting_param_space)

# Run searches
searches = [tree_search, forest_search, voting_search]
names = ['DecisionTree', 'RandomForest', 'VotingRegressor']

for name, search in zip(names, searches):
    print(f"Tuning {name}...")
    search.fit(
        x_train_np, 
        y_train_np
    )
    print(f"Best parameters for {name}: {search.best_params_}")
    print(f"Best negative mean squared error: {search.best_score_['neg_mean_squared_error']:.4f}")
    print(f"Best R-squared score: {search.best_score_['r2']:.4f}\n")


Tuning DecisionTree...




Best parameters for DecisionTree: OrderedDict([('regressor__ccp_alpha', 0.0), ('regressor__max_depth', 30), ('regressor__min_samples_leaf', 3), ('regressor__min_samples_split', 20)])


IndexError: invalid index to scalar variable.