# Mushroom categorization with adaboost manual implementation

In [24]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score
from typing import Literal

### Read data

In [25]:
data = pd.read_csv('./data/mushrooms.csv')
print(data.head(2))


  class cap-shape cap-surface cap-color bruises odor gill-attachment  \
0     p         x           s         n       t    p               f   
1     e         x           s         y       t    a               f   

  gill-spacing gill-size gill-color  ... stalk-surface-below-ring  \
0            c         n          k  ...                        s   
1            c         b          k  ...                        s   

  stalk-color-above-ring stalk-color-below-ring veil-type veil-color  \
0                      w                      w         p          w   
1                      w                      w         p          w   

  ring-number ring-type spore-print-color population habitat  
0           o         p                 k          s       u  
1           o         p                 n          n       g  

[2 rows x 23 columns]


In [26]:
def generate_dummy_columns(data: pd.DataFrame, columns : list) -> pd.DataFrame :
    # data = data.dropna()
    for col in columns:        
        new_column_suffix = data[col].apply(lambda x: str(x)).unique()
        new_cols = [ col + '_' + suffix for suffix in new_column_suffix ]
        new_data =pd.get_dummies(data[col], prefix=col)[new_cols]
        data = pd.concat([data, new_data], axis=1) 
        del data[col]
    return data

data.fillna('missing', inplace=True)
data_x = data.iloc[:,1:]
X = generate_dummy_columns(data_x, data_x.columns)
Y = data['class'].apply(lambda x: +1 if x == 'p' else 0)

print(X.columns)
print(X.head(2))
X['cap-shape_k'].unique()

Index(['cap-shape_x', 'cap-shape_b', 'cap-shape_s', 'cap-shape_f',
       'cap-shape_k', 'cap-shape_c', 'cap-surface_s', 'cap-surface_y',
       'cap-surface_f', 'cap-surface_g',
       ...
       'population_v', 'population_y', 'population_c', 'habitat_u',
       'habitat_g', 'habitat_m', 'habitat_d', 'habitat_p', 'habitat_w',
       'habitat_l'],
      dtype='object', length=117)
   cap-shape_x  cap-shape_b  cap-shape_s  cap-shape_f  cap-shape_k  \
0         True        False        False        False        False   
1         True        False        False        False        False   

   cap-shape_c  cap-surface_s  cap-surface_y  cap-surface_f  cap-surface_g  \
0        False           True          False          False          False   
1        False           True          False          False          False   

   ...  population_v  population_y  population_c  habitat_u  habitat_g  \
0  ...         False         False         False       True      False   
1  ...         False 

array([False,  True])

### Build decision tree with sample weights ###

In [27]:
def get_weighted_error_count(data:pd.DataFrame, sample_weights:pd.DataFrame, label_column:str) -> float:
    count_positive = (sample_weights[data[label_column] == 1]).sum()
    return min(count_positive , len(data) - count_positive)
# _get_best_features
def get_best_feature(remaining_features: list, data: pd.DataFrame, sample_weights:pd.DataFrame, label_column: str) -> str:
    return get_best_feature_by_weighted_error_rate(remaining_features, data, sample_weights, label_column)

def get_best_feature_by_weighted_error_rate(remaining_features: list, data: pd.DataFrame, sample_weights:pd.DataFrame, label_column: str) -> str:
    total_count = len(data)
    min_error_rate = 2
    best_feature = ''
    for feature in remaining_features:
        left_sample_weights = sample_weights[data[feature] == 0]
        right_sample_weights = sample_weights[data[feature] == 1]
        left_data = data[data[feature] == 0]
        right_data = data[data[feature] == 1]
        # print(f'feature:{feature}, left_data:{left_data.shape},left_sample_weights:{left_sample_weights.shape}, right_data:{right_data.shape}, right_sample_weights:{right_sample_weights.shape}')
        weighted_error_count_0 = get_weighted_error_count(left_data, left_sample_weights, label_column) 
        weighted_error_count_1 = get_weighted_error_count(right_data, right_sample_weights, label_column) 
        
        error_rate = (weighted_error_count_0 + weighted_error_count_1) * 1.0 / sum(sample_weights)
        if error_rate < min_error_rate:
            min_error_rate = error_rate
            best_feature = feature
    return best_feature


In [None]:
# fit()
# predict()
    # dfs visit from the root feature and use the leaf node for prediction 
# _create_tree()
    # termniation. max_depth, no more split or good enough, reaching leaf
    # find the best feature
    # split and build left/right tree


class TreeNode:
    def __init__(self, feature_name: str) -> None:        
        self.feature_name = feature_name
        self.left = None
        self.right = None
        self.prediction = 0
        self.is_leaf = False

class MyWeightedDecisionTreeClassifier:
    def __init__(self, min_error_rate =0.3, max_depth = 4) -> None:
        self.min_error_rate = min_error_rate
        self.max_depth = max_depth
        self.label_column = ""
    
    def fit(self, X : pd.DataFrame, Y : pd.Series, sample_weights: pd.Series):
        data = pd.concat([X, Y], axis=1)
        self.label_column = Y.name
        self.root = self._create_tree(data, sample_weights, X.columns, 0 )      
        print('finish fitting MyWeightedDecisionTreeClassifier')  
    
    def predict(self, X: pd.DataFrame):
        return X.apply(lambda d: self._predict_single(d, self.root), axis=1)

    def _predict_single(self, row_data, node=None):
        if node == None:
            node = self.root
        if node.is_leaf:
            return node.prediction
        
        if row_data[node.feature_name] == 0:
            return self._predict_single(row_data, node.left)
        else:
            return self._predict_single(row_data, node.right)
    
    def _create_tree(self, data : pd.DataFrame, sample_weights:pd.Series, remaining_features: list, depth: int) -> TreeNode:
        if depth == self.max_depth or len(remaining_features) == 0:
            # print(f'terminated at depth {depth}')
            return self._create_leaf(data[self.label_column], '')        
        
        # current node
        best_feature = get_best_feature(remaining_features, data, sample_weights, self.label_column)
        node = TreeNode(best_feature)
        # print(f'split on feature {best_feature}' )        
        # build left/right node
        left_split_data = data[data[best_feature] == 0]
        right_split_data = data[data[best_feature] == 1]
        # print(f'split on feature : {best_feature}, {len(left_split_data)}, {len(right_split_data)}'  )

        if(len(left_split_data) == len(data)):
            # print('perfect left split')
            return self._create_leaf(left_split_data[self.label_column] , best_feature)
        elif(len(right_split_data) == len(data)):
            # print('perfect right split')
            return self._create_leaf(right_split_data[self.label_column] , best_feature)
        
        new_remaining_features = remaining_features.drop(best_feature)
        left_sample_weights = sample_weights[data[best_feature] == 0]
        right_sample_weights = sample_weights[data[best_feature] == 1]

        left_node = self._create_tree(left_split_data, left_sample_weights, new_remaining_features, depth + 1)
        right_node = self._create_tree(right_split_data, right_sample_weights, new_remaining_features, depth + 1)
        
        node.left = left_node
        node.right = right_node
        
        return node        
        
    def _create_leaf(self, Y: pd.Series, feature_name: str = ''):
        positive_count = (Y == 1).sum()
        prediction =  1 if positive_count/len(Y) >= 0.5 else -1
        node = TreeNode(feature_name)
        node.prediction = prediction
        node.is_leaf = True         
        return node
    
    def count_leaves(self):
        return self.count_leaves_helper(self.root)
    
    def count_leaves_helper(self, tree):
        if tree.is_leaf:
            return 1
        return self.count_leaves_helper(tree.left) + self.count_leaves_helper(tree.right)

    def score(self, test_x, test_y):
        pred = self.predict(test_x)
        return accuracy_score(test_y, pred)

# model =  MyWeightedDecisionTreeClassifier(max_depth=10, min_error_rate=1e-8)
# sample_weights = np.ones(len(X))
# sample_weights = sample_weights / len(X)
# model.fit(X, Y, sample_weights) 
# print(model.score(X,Y))     
# # print(model.count_leaves())   


### Ada boost

In [53]:
# fit
    # pick model number
        # initial sample
        # sample weight
        # pick a feature
        # train current model
        # calculate model weight
        # update sample weight weight

# predict 
    # get the weighted predictions of all the models
from sklearn.base import BaseEstimator
class MyAdaBoostClassifier(BaseEstimator):
    def __init__(self, N = 10):
        self.N = N
        self.models = []
        self.model_weights = []

    def fit(self, X, y):        
        sample_weights = np.ones(len(X)) / len(X)
        
        for i in range(self.N):
            model = MyWeightedDecisionTreeClassifier(max_depth=2)
            model.fit(X, y, sample_weights)
            y_pred = model.predict(X)
            
            # calculate new model weight
            weighted_error_rate = sample_weights.dot(y_pred != y)  
            if weighted_error_rate > 1:
                weighted_error_rate = 9e-23

            model_weight = 0.5 * np.log((1- weighted_error_rate)/ weighted_error_rate)
            
        
            # new normalized sample weights
            new_sample_weights = sample_weights* np.exp(-model_weight * y_pred * y )
            sample_weights = new_sample_weights/new_sample_weights.sum() 
            
            print('sample weights sum: ' , sample_weights.sum())

            self.models.append(model)
            self.model_weights.append(model_weight)  
        

    def predict(self, X):
        result = np.zeros(len(X))
        for i in range(self.N):
            result += self.model_weights[i] * self.models[i].predict(X)
        return np.sign(result)

    def score(self, X, y):        
        y_pred = self.predict(X)
        return accuracy_score(y, y_pred)

my_adboost = MyAdaBoostClassifier(N=10)
# my_adboost.fit(X, Y)
# accuracy_score(Y, my_adboost.predict(X))
score4 = cross_val_score(my_adboost, X, Y, cv=3)
print('my_adboost score:', score4.mean())

finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  1.0
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  1.0
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  1.0
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  0.9999999999999999
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  0.9999999999999999
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  0.9999999999999999
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  1.0
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  1.0
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  0.9999999999999999
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  0.9999999999999998
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  1.0
finish fitting MyWeightedDecisionTreeClassifier
sample weights sum:  1.0
finish fitting MyWeightedDecisionTreeClassifier
s

### Transform data

### Train model

### Evaluate model

### Predit on test dataset


### Output

In [None]:

# df = pd.DataFrame()
# df.to_csv('submission.csv',index = False, header=True)