<a href="https://colab.research.google.com/github/Ni7070/Random-Forest/blob/master/RF_work.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from collections import Counter

import numpy as np


def entropy(y):
    hist = np.bincount(y)
    ps = hist / len(y)
    return -np.sum([p * np.log2(p) for p in ps if p > 0])


class Node:
    def __init__(
        self, feature=None, threshold=None, left=None, right=None, *, value=None
    ):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def is_leaf_node(self):
        return self.value is not None


class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=100, n_feats=None):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_feats = n_feats
        self.root = None

    def fit(self, X, y):
        self.n_feats = X.shape[1] if not self.n_feats else min(self.n_feats, X.shape[1])
        self.root = self._grow_tree(X, y)

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_features = X.shape
        n_labels = len(np.unique(y))

        # stopping criteria
        if (
            depth >= self.max_depth
            or n_labels == 1
            or n_samples < self.min_samples_split
        ):
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        feat_idxs = np.random.choice(n_features, self.n_feats, replace=False)

        # greedily select the best split according to information gain
        best_feat, best_thresh = self._best_criteria(X, y, feat_idxs)

        # grow the children that result from the split
        left_idxs, right_idxs = self._split(X[:, best_feat], best_thresh)
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return Node(best_feat, best_thresh, left, right)

    def _best_criteria(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_thresh = None, None
        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)
            for threshold in thresholds:
                gain = self._information_gain(y, X_column, threshold)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_thresh = threshold

        return split_idx, split_thresh

    def _information_gain(self, y, X_column, split_thresh):
        # parent loss
        parent_entropy = entropy(y)

        # generate split
        left_idxs, right_idxs = self._split(X_column, split_thresh)

        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        # compute the weighted avg. of the loss for the children
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        e_l, e_r = entropy(y[left_idxs]), entropy(y[right_idxs])
        child_entropy = (n_l / n) * e_l + (n_r / n) * e_r

        # information gain is difference in loss before vs. after split
        ig = parent_entropy - child_entropy
        return ig

    def _split(self, X_column, split_thresh):
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value

        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

    def _most_common_label(self, y):
        counter = Counter(y)
        most_common = counter.most_common(1)[0][0]
        return most_common

In [2]:
import numpy as np
from joblib import dump
# from decisiontree import DecisionTree
from collections import Counter

#import GA

class RandomForest():
    def __init__ (self, n_trees = 10, sample_sz = None, min_leaf = 5, max_depth=100, n_feats=None, isLLL = False):
        np.random.seed(42)         
                
        self.sample_sz = sample_sz 
        self.min_leaf = min_leaf
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.n_feats = n_feats
        self.trees = []

    
    def fit(self, X, y):
        if self.sample_sz is None:
            self.sample_sz = len(y)
            
        self.x, self.y  = X, y
        self.trees = [self.create_tree() for i in range(self.n_trees)]
                        
        
    def create_tree(self):
        idxs = np.random.choice(len(self.y), replace=True, size = self.sample_sz)      
        
        tree = DecisionTree(
                min_samples_split=self.min_leaf,
                max_depth=self.max_depth,
                n_feats=self.n_feats)
        
        tree.fit(self.x[idxs], self.y[idxs])
        return tree    

    
    def predict(self, x):
        # The predic function is classification
        percents = np.mean([t.predict(x) for t in self.trees], axis=0)
        return [1 if p>0.5 else 0 for p in percents]    
        #tree_preds = np.array([tree.predict(X) for tree in self.trees])
        #tree_preds = np.swapaxes(tree_preds, 0, 1)
        #y_pred = [most_common_label(tree_pred) for tree_pred in tree_preds]
        #return np.array(y_pred)
    
    
    def predict_regressor(self, x):        
        # for each tree, predict for each tree, then return the average
        # using list comprehension
        return np.mean([t.predict(x) for t in self.trees], axis=0)

    
    def continualLearner(self, oldmodel, newmodel=None):
        #TODO: We need to implement this function for continial learning
        # GA algorithm should be used to mutate the weights of the old and new 
        # deciion trees
        
        if newmodel == None:
            newmodel = self.create_tree()
        
        
        return
    
    def update(self,fileName):
        #save existing RF model with weights in a sklearn (joblib) save format
        # TODO: We may need to update this function later
        dump({'model': self.trees}, fileName)
        return
    

In [3]:
from sklearn import datasets
import pandas as pd
# from LLRF2 import RandomForest as RF
import numpy as np
from sklearn.model_selection import train_test_split


def accuracy(y_true, y_pred):
    accuracy = np.sum(y_true == y_pred) / len(y_true)
    return accuracy

Breast cancer Dataset

In [None]:
# Load data from sklearn for classification
breast_cancer = datasets.load_breast_cancer(as_frame=True)
print(breast_cancer['data'])

In [None]:
#into dataframe
df1 = pd.DataFrame(breast_cancer.data, columns = breast_cancer.feature_names)
df1['target'] = breast_cancer.target
df1.head()

In [6]:
# Build RF model
# X1, Y1 = breast_cancer.data, breast_cancer.target

# Data Split
X1_train, X1_test, Y1_train, Y1_test = train_test_split(df1.drop(['target'], axis = 'columns'), breast_cancer.target, test_size = 0.5, random_state=42)

In [7]:
Model1 = RandomForest(n_trees=100, sample_sz=None, min_leaf=5, isLLL = False)

In [8]:
X1_train = X1_train.to_numpy()
Y1_train = Y1_train.to_numpy()
Model1.fit(X1_train, Y1_train)

In [None]:
# Predict with the test dataset
X1_test = X1_test.to_numpy() 
Y1_test = Y1_test.to_numpy() 

In [12]:
y_pred = Model1.predict(X1_test)
acc = accuracy(Y1_test, y_pred)
print("Accuracy:", acc*100, "%")   

Accuracy: 94.73684210526315 %


Test with Iris Dataset

In [13]:
from sklearn.datasets import load_iris

iris = load_iris()
dir(iris)

df2 = pd.DataFrame(iris.data, columns = iris.feature_names)
df2['target'] = iris.target
df2.head()

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0


In [15]:
#split

X2_train, X2_test, Y2_train, Y2_test = train_test_split(df2.drop(['target'], axis = 'columns'), iris.target, test_size = 0.5, random_state=42)

In [None]:
X2_train = X2_train.to_numpy()
# Y2_train = Y2_train.to_numpy()

In [None]:
X2_test = X2_test.to_numpy() 
# Y2_test = Y2_test.to_numpy() 

In [45]:
Model2 = RandomForest(n_trees=80, sample_sz=None, min_leaf=10, isLLL = False)

In [46]:
Model2.fit(X2_train, Y2_train)

In [47]:
y_pred_iris = Model2.predict(X2_test)
acc_iris = accuracy(Y2_test, y_pred_iris)
print("Accuracy:", acc_iris*100, "%")   

Accuracy: 69.33333333333334 %


MNIST

In [24]:
# Fetching MNIST Dataset
from sklearn.datasets import fetch_openml
mnist = fetch_openml('mnist_784', version=1)

In [27]:
# Get the data and target
X3, Y3 = mnist["data"], mnist["target"]

In [28]:
# Split the train and test set
X3_train, X3_test, Y3_train, Y3_test = X3[:60000], X3[60000:], Y3[:60000], Y3[60000:]

In [32]:
Model3 = RandomForest(n_trees=150, sample_sz=None, min_leaf=5, isLLL = False)

In [33]:
X3_train = X3_train.to_numpy()
Y3_train = Y3_train.to_numpy()

In [34]:
X3_test = X3_test.to_numpy() 
Y3_test = Y3_test.to_numpy() 

In [None]:
Model3.fit(X3_train, Y3_train)

In [None]:
y_pred_MNIST = Model3.predict(X3_test)
acc_MNIST = accuracy(Y3_test, y_pred_MNIST)
print("Accuracy:", acc_iris*100, "%")   

Genetic ALgorithm

In [51]:
!pip3 install pygad

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pygad
  Downloading pygad-2.18.1-py3-none-any.whl (56 kB)
[K     |████████████████████████████████| 56 kB 3.7 MB/s 
Installing collected packages: pygad
Successfully installed pygad-2.18.1


In [None]:
import torch
import pygad.torchga
import pygad

class GeneticAlgo():
  def __init__(self, min_samples_split=2, max_depth=100, n_feats=None):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_feats = n_feats
        self.root = None


  def fitness_func(solution, sol_idx):
    global data_inputs, data_outputs, torch_ga, model, loss_function

    predictions = pygad.torchga.predict(model=model, 
                                        solution=solution, 
                                        data=data_inputs)
    abs_error = loss_function(predictions, data_outputs).detach().numpy() + 0.00000001

    solution_fitness = 1.0 / abs_error

    return solution_fitness

  def callback_generation(ga_instance):
    print("Generation = {generation}".format(generation=ga_instance.generations_completed))
    print("Fitness    = {fitness}".format(fitness=ga_instance.best_solution()[1]))

  # Create the PyTorch model.
  input_layer = torch.nn.Linear(3, 2)
  relu_layer = torch.nn.ReLU()
  output_layer = torch.nn.Linear(2, 1)

  model = torch.nn.Sequential(input_layer,
                              relu_layer,
                              output_layer)
  
  # Create an instance of the pygad.torchga.TorchGA class to build the initial population.
  torch_ga = pygad.torchga.TorchGA(model=model,
                                  num_solutions=10)

  loss_function = torch.nn.L1Loss()

  # Data inputs
  data_inputs = torch.tensor([[0.02, 0.1, 0.15],
                              [0.7, 0.6, 0.8],
                              [1.5, 1.2, 1.7],
                              [3.2, 2.9, 3.1]])

  # Data outputs
  data_outputs = torch.tensor([[0.1],
                              [0.6],
                              [1.3],
                              [2.5]])
  
  num_generations = 250 # Number of generations.
  num_parents_mating = 5 # Number of solutions to be selected as parents in the mating pool.
  initial_population = torch_ga.population_weights # Initial population of network weights

  ga_instance = pygad.GA(num_generations=num_generations, 
                        num_parents_mating=num_parents_mating, 
                        initial_population=initial_population,
                        fitness_func=fitness_func,
                        on_generation=callback_generation)

  ga_instance.run()

  # After the generations complete, some plots are showed that summarize how the outputs/fitness values evolve over generations.
  ga_instance.plot_fitness(title="PyGAD & PyTorch - Iteration vs. Fitness", linewidth=4)

  # Returning the details of the best solution.
  solution, solution_fitness, solution_idx = ga_instance.best_solution()
  print("Fitness value of the best solution = {solution_fitness}".format(solution_fitness=solution_fitness))
  print("Index of the best solution : {solution_idx}".format(solution_idx=solution_idx))

  predictions = pygad.torchga.predict(model=model, 
                                      solution=solution, 
                                      data=data_inputs)
  print("Predictions : \n", predictions.detach().numpy())

  abs_error = loss_function(predictions, data_outputs)
  print("Absolute Error : ", abs_error.detach().numpy())


