In [None]:
# Step 1 – Import Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
%matplotlib inline

In [None]:
# Step 2 – Load Dataset
df = pd.read_csv("secondary_data.csv", sep=";")
print(df.shape)
print(df['class'].value_counts())

In [None]:
# Step 3 – Node Definition
class Node:
    def __init__(self, is_leaf=False, prediction=None, test_function=None):
        self.is_leaf = is_leaf
        self.prediction = prediction
        self.test_function = test_function
        self.left = None
        self.right = None

    def predict(self, x):
        if self.is_leaf:
            return self.prediction
        elif self.test_function is not None:
            if self.test_function(x):
                return self.left.predict(x)
            else:
                return self.right.predict(x)
        else:
            raise Exception("Invalid node: no test function and not a leaf.")

In [None]:
class TreePredictor:
    def __init__(self, impurity, max_depth=11, min_samples_split=2, max_splits=100):
        self.impurity = impurity
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.max_splits = max_splits
        self.num_splits = 0
        self.root = None

        if impurity == "gini":
            self.impurity_fn = gini_impurity
        elif impurity == "entropy":
            self.impurity_fn = entropy
        elif impurity in ["misclassification", "error"]:
            self.impurity_fn = misclassification_error
        else:
            raise ValueError(f"Unknown impurity: {impurity}")

    def fit(self, X, y):
        self.num_splits = 0
        self.root = self._build_tree(X, y, depth=0)

    def _build_tree(self, X, y, depth, available_features=None):
        if available_features is None:
            available_features = X.columns.tolist()

        print(f"📐 Depth {depth} | Samples: {len(y)}")

        if len(y) < self.min_samples_split or depth >= self.max_depth or y.nunique() == 1:
            prediction = y.mode().iloc[0]
            return Node(is_leaf=True, prediction=prediction)

        best_feature, best_criterion, best_score, left_idx, right_idx = self._choose_split(X, y, available_features)

        if best_feature is None or len(left_idx) == 0 or len(right_idx) == 0 or y.empty:
            prediction = y.mode().iloc[0] if not y.empty else None
            return Node(is_leaf=True, prediction=prediction)

        self.num_splits += 1

        left_node = self._build_tree(X.iloc[left_idx], y.iloc[left_idx], depth + 1, available_features)
        right_node = self._build_tree(X.iloc[right_idx], y.iloc[right_idx], depth + 1, available_features)

        return Node(
            is_leaf=False,
            test_function=best_criterion,
            feature_index=best_feature,
            left=left_node,
            right=right_node
        )

    def _choose_split(self, X, y, features):
        numerical_features = [f for f in features if np.issubdtype(X[f].dtype, np.number)]
        categorical_features = [f for f in features if not np.issubdtype(X[f].dtype, np.number)]

        df_numerical = get_all_best_splits_numerical(X, y, numerical_features, self.impurity_fn)
        df_categorical = get_all_best_splits_categorical(X, y, categorical_features, self.impurity_fn)

        df_all = pd.concat([df_numerical, df_categorical], ignore_index=True)
        if df_all.empty:
            return None, None, None, None, None

        best_row = df_all.loc[df_all['impurity'].idxmin()]
        best_feature = best_row['feature']
        best_threshold = best_row.get('best_threshold', np.nan)
        best_categories = best_row.get('best_categories', np.nan)

        if not pd.isna(best_threshold):
            test_function = lambda x, t=best_threshold: x[best_feature] < t
            left_mask = X[best_feature] < best_threshold
        else:
            test_function = lambda x, cat=best_categories: x[best_feature] in cat
            left_mask = X[best_feature].isin(best_categories)

        right_mask = ~left_mask

        return (
            best_feature,
            test_function,
            best_row['impurity'],
            left_mask[left_mask].index.tolist(),
            right_mask[right_mask].index.tolist()
        )
        
    def predict_single(self, x):
        node = self.root
        while not node.is_leaf:
            if node.test_function(x):
                node = node.left
            else:
                node = node.right
        return node.prediction

    def evaluate(self, X, y):
        predictions = X.apply(self.predict_single, axis=1)
        return (predictions != y).mean()
    
    def predict(self, X):
        return np.array([self.predict_single(x) for _, x in X.iterrows()])


In [None]:
# Step 5 – Impurity Functions (Corrected)

def gini_impurity(labels):
    if len(labels) == 0:
        return 0
    counts = labels.value_counts(normalize=True)
    return 1 - sum(counts ** 2)

def entropy(labels):
    if len(labels) == 0:
        return 0
    counts = labels.value_counts(normalize=True)
    return -sum(p * np.log2(p) for p in counts if p > 0)
   
def misclassification_error(labels):
    if len(labels) == 0:
        return 0
    counts = labels.value_counts(normalize=True)
    return 1 - counts.max()


In [None]:
# Step 6 – Best Split for Numerical Feature
def get_best_split_numerical(X, y, feature, impurity_fn):
    print(f"🔍 Evaluating feature: {feature}")
    data = pd.DataFrame({feature: X[feature], 'label': y}).dropna()
    data = data.sort_values(by=feature)
    values = data[feature].values
    labels = data['label'].values

    thresholds = np.unique(values)
    thresholds = thresholds[::4]

    best_impurity = float('inf')
    best_threshold = None
    best_test_function = None

    for threshold in thresholds:
        left_mask = values <= threshold
        right_mask = values > threshold

        y_left = labels[left_mask]
        y_right = labels[right_mask]

        impurity_left = impurity_fn(pd.Series(y_left))
        impurity_right = impurity_fn(pd.Series(y_right))

        weighted_impurity = (
            len(y_left) / len(labels) * impurity_left +
            len(y_right) / len(labels) * impurity_right
        )

        if weighted_impurity < best_impurity:
            best_impurity = weighted_impurity
            best_threshold = threshold
            best_test_function = lambda x, t=threshold: x[feature] > t

    return best_threshold, best_impurity, best_test_function

In [None]:
# Step 7 – All Best Splits for Numerical Features
def get_all_best_splits_numerical(X, y, numerical_features, impurity_fn):
    results = []
    if not numerical_features:
        return pd.DataFrame(columns=['feature', 'best_threshold', 'impurity'])
    for feature in numerical_features:
        threshold, impurity, test_fn = get_best_split_numerical(X, y, feature, impurity_fn)
        results.append({
            "feature": feature,
            "best_threshold": threshold,
            "impurity": impurity
        })
    if not results:
        return pd.DataFrame(columns=['feature', 'best_threshold', 'impurity'])
    return pd.DataFrame(results).sort_values(by="impurity").reset_index(drop=True)

In [None]:
# Step 8 – Best Split for Categorical Feature
def get_best_split_categorical(X, y, feature, impurity_fn):
    data = pd.DataFrame({feature: X[feature], 'label': y}).dropna()
    unique_values = data[feature].unique()

    best_impurity = float('inf')
    best_value = None
    best_test_function = None

    for val in unique_values:
        left_mask = data[feature] == val
        right_mask = ~left_mask

        y_left = data['label'][left_mask]
        y_right = data['label'][right_mask]

        impurity_left = impurity_fn(y_left)
        impurity_right = impurity_fn(y_right)

        weighted_impurity = (
            len(y_left) / len(data) * impurity_left +
            len(y_right) / len(data) * impurity_right
        )

        if weighted_impurity < best_impurity:
            best_impurity = weighted_impurity
            best_value = val
            best_test_function = lambda x, v=val: x[feature] == v

    return best_value, best_impurity, best_test_function

In [None]:
# Step 9 – All Best Splits for Categorical Features
def get_all_best_splits_categorical(X, y, categorical_features, impurity_fn):
    results = []
    if not categorical_features:
        return pd.DataFrame(columns=['feature', 'best_value', 'impurity'])
    for feature in categorical_features:
        value, impurity, test_fn = get_best_split_categorical(X, y, feature, impurity_fn)
        results.append({
            "feature": feature,
            "best_value": value,
            "impurity": impurity
        })
    if not results:
        return pd.DataFrame(columns=['feature', 'best_value', 'impurity'])
    return pd.DataFrame(results).sort_values(by="impurity").reset_index(drop=True)

In [None]:
# Step 10 – _build_tree method
def _build_tree(self, X, y, depth, available_features=None):
    if available_features is None:
        available_features = X.columns.tolist()
    print(f"📐 Depth {depth} | Samples: {len(y)}")
    if len(y) < self.min_samples_split or depth >= self.max_depth or y.nunique() == 1:
        prediction = y.mode().iloc[0] if not y.empty else None
        return Node(is_leaf=True, prediction=prediction)

    numerical_features = [f for f in X.select_dtypes(include=["float64", "int64"]).columns if f in available_features]
    categorical_features = [f for f in X.select_dtypes(include=["object", "category"]).columns if f in available_features]

    numeric_results = get_all_best_splits_numerical(X, y, numerical_features, self.impurity_fn)
    numeric_results["split_type"] = "numerical"

    categorical_results = get_all_best_splits_categorical(X, y, categorical_features, self.impurity_fn)
    categorical_results["split_type"] = "categorical"

    dfs = [df for df in [numeric_results, categorical_results] if not df.empty]
    all_results = pd.concat(dfs, ignore_index=True)
    best = all_results.iloc[all_results["impurity"].idxmin()]
    best_feature = best['feature']
    split_type = best['split_type']
    print(f"   ➤ Best split on '{best_feature}' ({split_type}) with impurity = {best['impurity']:.4f}")
    best_feature = best["feature"]
    split_type = best["split_type"]

    if split_type == "numerical":
        threshold = best["best_threshold"]
        test_fn = lambda x, t=threshold: x[best_feature] > t
        left_mask = X[best_feature] > threshold
    else:
        value = best["best_value"]
        test_fn = lambda x, v=value: x[best_feature] == v
        left_mask = X[best_feature] == value

    right_mask = ~left_mask
    X_left, y_left = X[left_mask], y[left_mask]
    X_right, y_right = X[right_mask], y[right_mask]

    node = Node(is_leaf=False, test_function=test_fn)
    node.left = self._build_tree(X_left, y_left, depth + 1, [f for f in available_features if f != best_feature])
    node.right = self._build_tree(X_right, y_right, depth + 1, [f for f in available_features if f != best_feature])
    return node

TreePredictor._build_tree = _build_tree

In [None]:
# Step 11 – Training Loss (0-1) for Each Impurity Function

def compute_training_error(X, y, impurity_list, max_depth=12, min_samples_split=100):
    results = {}
    for impurity in impurity_list:
        print(f"🔍 Training with impurity: {impurity}")
        model = TreePredictor(impurity=impurity, max_depth=max_depth, min_samples_split=min_samples_split)
        model.fit(X, y)
        loss = model.evaluate(X, y)
        print(f"  ➤ 0-1 Loss: {loss:.4f}")
        results[impurity] = loss
    return results

def plot_training_errors(loss_dict):
    impurities = list(loss_dict.keys())
    losses = list(loss_dict.values())

    plt.figure(figsize=(8, 5))
    plt.bar(impurities, losses, color=["#4CAF50", "#2196F3", "#FF9800"])
    plt.title("0-1 Training Loss for Each Impurity Function")
    plt.ylabel("0-1 Loss")
    plt.xlabel("Impurity Function")
    plt.ylim(0, 1)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()


In [None]:
# Step 12 – Select Best Impurity and Nested Cross-Validation

from sklearn.model_selection import KFold
import numpy as np

def select_best_impurity(loss_dict):
    best_impurity = min(loss_dict, key=loss_dict.get)
    print(f"✅ Best impurity based on training loss: {best_impurity} (loss = {loss_dict[best_impurity]:.4f})")
    return best_impurity

from sklearn.model_selection import KFold
import numpy as np
from collections import Counter

def nested_cross_validation(X, y, impurity, inner_folds_list, max_depth_grid, min_split_grid, outer_folds=5):
    print(f"🔁 Starting nested cross-validation for impurity: {impurity}")
    outer_kf = KFold(n_splits=outer_folds, shuffle=True, random_state=42)
    outer_scores = []
    chosen_params = []

    for fold_id, (train_idx, test_idx) in enumerate(outer_kf.split(X), 1):
        print(f"🔸 Outer Fold {fold_id}/{outer_folds}")
        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

        best_score = -np.inf
        best_params = None

        for inner_k in inner_folds_list:
            inner_kf = KFold(n_splits=inner_k, shuffle=True, random_state=fold_id)
            for max_depth in max_depth_grid:
                for min_split in min_split_grid:
                    inner_scores = []

                    for inner_train_idx, val_idx in inner_kf.split(X_train):
                        X_inner_train, X_val = X_train.iloc[inner_train_idx], X_train.iloc[val_idx]
                        y_inner_train, y_val = y_train.iloc[inner_train_idx], y_train.iloc[val_idx]

                        model = TreePredictor(impurity=impurity, max_depth=max_depth, min_samples_split=min_split)
                        model.fit(X_inner_train, y_inner_train)
                        acc = 1 - model.evaluate(X_val, y_val)
                        inner_scores.append(acc)

                    mean_score = np.mean(inner_scores)
                    if mean_score > best_score:
                        best_score = mean_score
                        best_params = (max_depth, min_split)

        best_max_depth, best_min_split = best_params
        print(f"  ➤ Best params: depth={best_max_depth}, split={best_min_split}")
        chosen_params.append(best_params)

        final_model = TreePredictor(impurity=impurity, max_depth=best_max_depth, min_samples_split=best_min_split)
        final_model.fit(X_train, y_train)
        outer_acc = 1 - final_model.evaluate(X_test, y_test)
        outer_scores.append(outer_acc)
        print(f"  ➤ Outer Fold Accuracy: {outer_acc:.4f}")

    most_common = Counter(chosen_params).most_common(1)[0]
    print(f"🏆 Parametri migliori globali: depth={most_common[0][0]}, split={most_common[0][1]} (scelti {most_common[1]} volte)")

    return {
        "mean_accuracy": np.mean(outer_scores),
        "std_accuracy": np.std(outer_scores),
        "outer_scores": outer_scores,
        "best_depth": most_common[0][0],
        "best_min_split": most_common[0][1]
    }


In [None]:
# Step 13 – Save Nested CV Results
def save_nested_cv_outputs(result_dict, impurity, filename_prefix="final_nested_cv"):
    import pandas as pd

    df = pd.DataFrame({
        "impurity": [impurity],
        "mean_accuracy": [result_dict["mean_accuracy"]],
        "std_accuracy": [result_dict["std_accuracy"]]
    })

    csv_path = f"{filename_prefix}.csv"
    df.to_csv(csv_path, index=False)
    print(f"💾 Results saved to {csv_path}")

    # Plot
    plt.figure(figsize=(6, 4))
    plt.bar([impurity], [result_dict["mean_accuracy"]], yerr=[result_dict["std_accuracy"]],
            capsize=8, color="#4CAF50")
    plt.title("Final Nested CV Accuracy")
    plt.ylim(0, 1)
    plt.ylabel("Accuracy")
    plt.tight_layout()
    plot_path = f"{filename_prefix}.png"
    plt.savefig(plot_path, dpi=300)
    print(f"🖼️ Plot saved to {plot_path}")


In [None]:
# ✅ FINAL EXECUTION – Full pipeline with Gini (or best) impurity

# Step A – Compute training errors for each impurity
impurities = ["gini", "entropy", "error"]
X = df.drop(columns=["class"])
y = df["class"]
training_losses = compute_training_error(X, y, impurities, max_depth=12, min_samples_split=500)

# Step B – Plot 0-1 loss and select best impurity
plot_training_errors(training_losses)
best_impurity = select_best_impurity(training_losses)

# Step C – Run nested CV on best impurity
nested_results = nested_cross_validation(
    X, y,
    impurity=best_impurity,
    inner_folds_list=[2],
    min_split_grid=[100, 500],
    max_depth_grid = [11,12],
    outer_folds=5
)

# Step D – Save final results
save_nested_cv_outputs(nested_results, impurity=best_impurity)

In [None]:
final_model = TreePredictor(
    impurity="gini",  # oppure usa la variabile best_impurity
    max_depth=12,
    min_samples_split=100
)
final_model.fit(X, y)

final_error = final_model.evaluate(X, y)
print(f"📉 Final training error (0-1 loss): {final_error:.4f}")


In [None]:
from sklearn.model_selection import KFold
import numpy as np

# Parametri ottimali trovati nella nested CV
best_depth = 12
best_min_split = 100
best_impurity = "gini"  # oppure usa la variabile che avevi prima

# K-fold cross-validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = []

print("🔁 Cross-validating final model with best hyperparameters...")

for fold, (train_idx, val_idx) in enumerate(kf.split(X), 1):
    X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
    y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

    model = TreePredictor(
        impurity=best_impurity,
        max_depth=best_depth,
        min_samples_split=best_min_split
    )
    model.fit(X_train, y_train)
    acc = 1 - model.evaluate(X_val, y_val)
    cv_scores.append(acc)
    print(f"  ✅ Fold {fold} accuracy: {acc:.4f}")

# Statistiche finali
mean_cv_acc = np.mean(cv_scores)
std_cv_acc = np.std(cv_scores)

print(f"\n📊 Final CV Accuracy: {mean_cv_acc:.4f} ± {std_cv_acc:.4f}")


In [None]:
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.utils import resample

class RandomForestPredictor:
    def __init__(self, n_estimators=10, impurity="gini", max_depth=5, min_samples_split=10, bootstrap=True, random_state=42):
        self.n_estimators = n_estimators
        self.impurity = impurity
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.bootstrap = bootstrap
        self.trees = []
        self.random_state = random_state

    def fit(self, X, y):
        np.random.seed(self.random_state)
        self.trees = []

        for i in range(self.n_estimators):
            if self.bootstrap:
                X_sample, y_sample = resample(X, y, random_state=self.random_state + i)
            else:
                X_sample, y_sample = X, y

            tree = TreePredictor(
                impurity=self.impurity,
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split
            )
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)
            print(f"🌲 Trained tree {i + 1}/{self.n_estimators}")

    def predict(self, X):
        # Ottieni le predizioni da ogni albero
        all_preds = []
        for tree in self.trees:
            preds = tree.predict(X)
            all_preds.append(preds)

        # Trasponi la lista: ogni riga = predizioni dei vari alberi per 1 punto
        all_preds = np.array(all_preds).T

        # Maggioranza
        majority_votes = []
        for row in all_preds:
            vote = Counter(row).most_common(1)[0][0]
            majority_votes.append(vote)

        return np.array(majority_votes)

    def evaluate(self, X, y_true):
        y_pred = self.predict(X)
        return np.mean(y_pred != y_true)  # 0-1 loss


In [None]:
# Training con 20 alberi
rf = RandomForestPredictor(
    n_estimators=5,             # 👈 puoi cambiare in 10 se vuoi
    impurity="gini",
    max_depth=12,
    min_samples_split=500,
    bootstrap=True
)

print("🚀 Training Random Forest...")
rf.fit(X, y)

train_error_rf = rf.evaluate(X, y)
print(f"🌲 Random Forest training error (0-1 loss): {train_error_rf:.4f}")

In [None]:
from sklearn.model_selection import KFold
import numpy as np

# Parametri per la Random Forest
n_estimators = 5
max_depth = 12
min_split = 500
impurity = "gini"

kf = KFold(n_splits=5, shuffle=True, random_state=42)
rf_scores = []

print("🔁 5-Fold Cross-Validation on Random Forest...")

for fold, (train_idx, val_idx) in enumerate(kf.split(X), 1):
    print(f"\n🔸 Fold {fold}/5 - Training Random Forest...")

    X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
    y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

    rf = RandomForestPredictor(
        n_estimators=n_estimators,
        impurity=impurity,
        max_depth=max_depth,
        min_samples_split=min_split,
        bootstrap=True,
        random_state=fold  # per diversificare i bootstrap ad ogni fold
    )

    rf.fit(X_train, y_train)
    acc = 1 - rf.evaluate(X_val, y_val)
    rf_scores.append(acc)

    print(f"  ✅ Fold {fold} accuracy: {acc:.4f}")

# Riassunto finale
mean_acc = np.mean(rf_scores)
std_acc = np.std(rf_scores)

print(f"\n📊 Random Forest CV Accuracy: {mean_acc:.4f} ± {std_acc:.4f}")
