<a href="https://colab.research.google.com/github/MinaBeric/Stat_method_for_Machine_Learning/blob/main/tuning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from collections import Counter
import graphviz
from graphviz import Digraph

In [2]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score,precision_score, recall_score, f1_score

In [3]:
from joblib import Parallel, delayed
from itertools import product

In [4]:
import random

In [13]:
import pickle
import multiprocessing

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
# Load pre-split data
X_train = pd.read_pickle("/content/drive/MyDrive/machinelearning/X_train.pkl")
X_test = pd.read_pickle("/content/drive/MyDrive/machinelearning/X_test.pkl")
y_train = pd.read_pickle("/content/drive/MyDrive/machinelearning/y_train.pkl")
y_test = pd.read_pickle("/content/drive/MyDrive/machinelearning/y_test.pkl")


In [7]:
class Node():
    def __init__(self,feature=None, threshold=None,left=None,right=None, value=None):
        '''constructor'''

        #for decision node
        self.feature=feature
        self.threshold=threshold
        self.left=left
        self.right=right
        self.value=value

    #for leaf node
    def is_leaf(self):
        return self.value is not None

In [9]:
class Decision_Tree:
    def __init__(self, min_samples_split=2, max_depth=20, n_features=None, criterion='entropy', min_impurity_decrease=0.001):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_features = n_features
        self.criterion = criterion
        self.min_impurity_decrease=min_impurity_decrease
        self.root = None
        self.class_labels={0: 'e', 1: 'p'}
        self.feature_names=None

    def get_params(self,deep=True):
        return {
            'min_samples_split': self.min_samples_split,
            'max_depth': self.max_depth,
            'n_features': self.n_features,
            'criterion': self.criterion,
            'min_impurity_decrease':self.min_impurity_decrease
        }

    def set_params (self, **params):
        for param,value in params.items():
            if hasattr(self,param):
                setattr(self,param,value)
        return self

    def fit(self, X, y,feature_names=None):
        self.feature_names=feature_names
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1], self.n_features)

        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_feats = X.shape
        n_labels = len(np.unique(y))


        if self.criterion=="entropy":
            parent_impurity=self._scaled_entropy(y)
        elif self.criterion=="gini":
            parent_impurity=self._gini(y)
        elif self.criterion=="squared_impurity":
            parent_impurity=self._squared_impurity(y)


        if n_labels > 1:
            gain = max(
                self._information_gain(y, X[:, feat_idx], thr)
                for feat_idx in range(n_feats)
                for thr in np.unique(X[:, feat_idx])
            )
        else:
            gain = 0


        if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split or parent_impurity< self.min_impurity_decrease):
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)


        feat_idxs = np.random.choice(n_feats, self.n_features, replace=False)


        best_feature, best_thresh = self._best_split(X, y, feat_idxs)


        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh)


        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return Node(best_feature, best_thresh, left, right)

    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                gain = self._information_gain(y, X_column, thr)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_threshold = thr

        return split_idx, split_threshold

    def _information_gain(self, y, X_column, threshold):

        if self.criterion=='entropy':
            parent_impurity=self._scaled_entropy(y)
        elif self.criterion == 'gini':
            parent_impurity=self._gini(y)
        elif self.criterion=="squared_impurity":
            parent_impurity=self._squared_impurity(y)


        left_idxs, right_idxs = self._split(X_column, threshold)

        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0


        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)

        e_l = self._scaled_entropy(y[left_idxs]) if self.criterion == 'entropy' else self._gini(y[left_idxs]) if self.criterion == 'gini' else self._squared_impurity(y[left_idxs])
        e_r = self._scaled_entropy(y[right_idxs]) if self.criterion == 'entropy' else self._gini(y[right_idxs]) if self.criterion == 'gini' else self._squared_impurity(y[right_idxs])

        child_impurity = (n_l / n) * e_l + (n_r / n) * e_r


        information_gain = parent_impurity - child_impurity
        return information_gain

    def _split(self, X_column, split_thresh):
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs

    def _scaled_entropy(self, y):
        p=np.sum(y)/len(y)
        if p==0 or p==1:
            return 0
        return -2 * p * np.log2(p) - 2 * (1 - p) * np.log2(1 - p)

    def _gini(self, y):
        p=np.sum(y)/len(y) #Proportion of class 1
        return 2 * p * (1 - p)

    def _squared_impurity(self,y):
        p=np.sum(y)/len(y)
        return np.sqrt(p*(1-p))

    def _most_common_label(self, y):
        counter = Counter(y)
        value = counter.most_common(1)[0][0]
        return value

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf():
            return node.value

        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def accuracy(self, y_test, y_pred):
        """
        Compute accuracy of predictions.
        """
        return np.sum(y_test == y_pred) / len(y_test)

    def evaluate(self, X_test, y_test,data_type='test'):
        """
        Evaluate the decision tree on the test set.
        """
        predictions = self.predict(X_test)
        acc = self.accuracy(y_test, predictions)
        if data_type == 'train':
            print(f"Training Accuracy with {self.criterion}: {acc}")
        else:
            print(f"Test Accuracy with {self.criterion}: {acc}")


    def compute_zero_one_loss(self, X, y,data_type='training'):
        """
        Computes the training error using zero-one loss.
        """
        predictions = self.predict(X)
        errors = np.sum(predictions != y)
        total_samples = len(y)
        error_rate = errors / total_samples
        print(f"The {data_type} error with {self.criterion} is {error_rate}")


    def to_dict(self, node=None):
        """Convert the tree into a dictionary format for visualization."""
        if node is None:
            node = self.root

        if node.is_leaf():
            return self.class_labels[node.value]

        feature_name=( self.feature_names[node.feature] if self.feature_names else f"Feature {node.feature}")


        return {
            f"{feature_name} <= {node.threshold}": {
                "Left": self.to_dict(node.left),
                "Right": self.to_dict(node.right),
            }
        }

    def visualize(self,filename="tree"):

        def add_nodes_edges(node, graph, node_id=0):


            if node.is_leaf():

                label=self.class_labels.get(node.value,f"Class {node.value}")

                graph.node(str(node_id), f"Leaf: {label}", shape="box",style="filled", color="lightsalmon")
                return node_id


            feature_name=self.feature_names[node.feature] if self.feature_names else f"Feature {node.feature}"

            graph.node(str(node_id), f"{feature_name} <= {node.threshold}",shape="ellipse",style="filled",color="powderblue")


            left_id = node_id + 1
            graph.edge(str(node_id), str(left_id), label=f"<= {node.threshold}", color="lightgreen")
            left_id = add_nodes_edges(node.left, graph, left_id)


            right_id = left_id + 1
            graph.edge(str(node_id), str(right_id), label=f"> {node.threshold}", color="red")
            right_id = add_nodes_edges(node.right, graph, right_id)

            return right_id


        graph = Digraph()


        add_nodes_edges(self.root, graph)

        # Render and save the tree visualization
        graph.render(filename, format="png", cleanup=True)
        print(f"Tree visualization saved as {filename}.png")



# **HYPERPARAMETER TUNING**

In [14]:
num_cores = multiprocessing.cpu_count()
print(f"Number of available CPU cores: {num_cores}")

Number of available CPU cores: 2


In [16]:
def save_results(results, filename="search_results.pkl"):
    """Save results to a pickle file."""
    with open(filename, 'wb') as f:
        pickle.dump(results, f)

def load_results(filename="search_results.pkl"):
    """Load previously saved results."""
    try:
        with open(filename, 'rb') as f:
            return pickle.load(f)
    except FileNotFoundError:
        return None



def random_search_cv(X, y, model_class, param_distributions, cv, scoring, n_iter=10, n_jobs=2, random_state=None,saved_results_file="search_results.pkl"):
    """
    Perform random search cross-validation.

    Parameters:
    - X: Feature matrix.
    - y: Target vector.
    - model_class: The class of the model to be tuned.
    - param_distributions: Dictionary of parameter distributions to sample from.
    - cv:  ( StratifiedKFold).
    - scoring: Dictionary of scoring metrics.
    - n_iter: Number of random combinations to test.
    - n_jobs: Number of parallel jobs.
    - random_state: Seed for reproducibility.

    Returns:
    - best_params: Parameters of the best model.
    - best_score: Best score achieved.
    - results: Detailed results of all tested combinations.
    """
    np.random.seed(random_state)
    random.seed(random_state)

    previous_results = load_results(saved_results_file)
    best_params = previous_results['best_params'] if previous_results else None
    best_score = previous_results['best_score'] if previous_results else -np.inf
    results = previous_results['results'] if previous_results else []

    primary_metric = 'accuracy'

    param_names = list(param_distributions.keys())


    def random_sample():
        return {key: random.choice(values) for key, values in param_distributions.items()}

    def evaluate_combination(params):
        fold_scores = {metric: [] for metric in scoring.keys()}


        for train_idx, val_idx in cv.split(X, y):
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]


            model = model_class(**params)
            model.fit(X_train, y_train)


            y_pred = model.predict(X_val)
            for metric, func in scoring.items():
                fold_scores[metric].append(func(y_val, y_pred))


        mean_score = {metric: np.mean(scores) for metric, scores in fold_scores.items()}
        return {'params': params, 'mean_score': mean_score, 'fold_scores': fold_scores}


    random_combinations=[random_sample() for _ in range(n_iter)]

    parallel_results=Parallel(n_jobs=n_jobs)(delayed(evaluate_combination)(combination) for combination in random_combinations)

    for result in parallel_results:
      results.append(result)

      if result['mean_score'][primary_metric] > best_score:
        best_score = result['mean_score'][primary_metric]
        best_params = result['params']

      save_results({'best_params': best_params, 'best_score': best_score, 'results': results},saved_results_file)

    return best_params, best_score, results



param_distributions = {
    'max_depth': [5,10,20],
    'min_impurity_decrease': np.linspace(0.0, 0.01, 3),
    'criterion': ['gini', 'entropy','squared_impurity'],
    'min_samples_split': list(range(2, 8))
}

scoring = {
    'accuracy': accuracy_score,
    'precision': precision_score,
    'recall': recall_score,
    'f1': f1_score
}


cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)

# Perform Random Search CV
best_params, best_score, results = random_search_cv(
    X_train.values,
    y_train.values,
    Decision_Tree,
    param_distributions,

    cv,
    scoring,
    n_iter=80,
    n_jobs=2,
    random_state=42,
    saved_results_file="search_results.pkl"
)

print("Best Parameters:", best_params)
print("Best Accuracy:", best_score)


Best Parameters: {'max_depth': 20, 'min_impurity_decrease': 0.01, 'criterion': 'entropy', 'min_samples_split': 3}
Best Accuracy: 0.9934547991300423


In [17]:

best_result = None
for result in results:
    if result['mean_score']['accuracy'] == best_score:
        best_result = result
        break

if best_result:
    print("Best Precision:", best_result['mean_score']['precision'])
    print("Best Recall:", best_result['mean_score']['recall'])
    print("Best F1 Score:", best_result['mean_score']['f1'])

Best Precision: 0.994075470310264
Best Recall: 0.9941095963025849
Best F1 Score: 0.9940909130375141


In [21]:
def save_best_params(best_params, filename=save_path):
    """Save best parameters to a pickle file."""
    with open(filename, 'wb') as f:
        pickle.dump(best_params, f)

In [20]:
save_path = "/content/drive/MyDrive/machinelearning/best_params.pkl"

In [22]:
import os
print(os.path.exists("/content/drive/MyDrive/machinelearning/best_params.pkl"))


True


In [23]:
save_best_params(best_params)

In [24]:
print(results)

[{'params': {'max_depth': 20, 'min_impurity_decrease': 0.0, 'criterion': 'gini', 'min_samples_split': 7}, 'mean_score': {'accuracy': 0.9929418523533998, 'precision': 0.9942876957910017, 'recall': 0.9929612094237467, 'f1': 0.993623099058157}, 'fold_scores': {'accuracy': [0.9938446386802905, 0.9919980302843777, 0.9929828880955313], 'precision': [0.9940033314825097, 0.9946452476572959, 0.9942145082331998], 'recall': [0.9948871846170946, 0.9908868637474995, 0.993109579906646], 'f1': [0.9944450616598156, 0.9927624986081728, 0.9936617369064829]}}, {'params': {'max_depth': 10, 'min_impurity_decrease': 0.0, 'criterion': 'gini', 'min_samples_split': 3}, 'mean_score': {'accuracy': 0.8693421970536338, 'precision': 0.9420200607595305, 'recall': 0.8142111187046296, 'f1': 0.8734626984498525}, 'fold_scores': {'accuracy': [0.8723378062292256, 0.8698141080881447, 0.8658746768435307], 'precision': [0.947164449037592, 0.9413877132230345, 0.9375080200179648], 'recall': [0.8149383127709237, 0.8157368304067