In [47]:
from dataclasses import dataclass
from typing import Iterable, Literal
import pandas as pd
import math
from sklearn.preprocessing import OrdinalEncoder
import numpy as np

In [48]:
class Node:

    def __init__(self, feature_name: str, value: any, depth_index: int = 0):
        self.feature_name = feature_name
        self.value = value
        self.depth_index = depth_index
        self.children: Iterable[Node] = []
        self.class_label = None

    def __str__(self) -> str:
        return '-'*self.depth_index + f'{self.feature_name}!{self.value}!{self.class_label}' + '\n'.join([str(child) for child in self.children])

    def parent_equal(self, other):
        if self is other:
            return self
        for child in self.children:
            child.parent_equal(other)

    def add_level(self, data: Iterable):
        self.children = [Node(None, d, self.depth_index + 1) for d in data]
        return self.children

    def split_epoch(self, X: pd.DataFrame, y: np.array, rand_features:bool=True):
        if rand_features:
            X = X[np.random.choice(X.columns, size=math.floor(math.sqrt(len(X.columns))), replace=False)]
        unique, counts = np.unique(y, return_counts=True)
        if len(X.columns) == 0 or len(unique) == 1:
            class_index = 0
            max_count = 0
            for index, c in enumerate(counts):
                if c > max_count:
                    max_count = c
                    class_index = index
            self.class_label = unique[class_index]
            return

        labels_length = len(y)
        positive_labels_prop = counts[1]/labels_length
        negative_labels_prop = counts[0]/labels_length
        whole_entropy = DecisionTreeID3.entropy(positive_labels_prop, negative_labels_prop)
        feature_unique_values: dict[str, np.array] = {}
        num_of_rows = len(X.index)
        best_feature = FeatureInfo('', 0)
        
        for feature_name in X.columns:
            positive_pairs: dict[str, int] = {}
            information_gain = whole_entropy
            for index, value in enumerate(X[feature_name]):
                if value not in positive_pairs.keys():
                    positive_pairs[value] = 0
                if y[index] == 1:
                    positive_pairs[value] += 1
            unique, counts = np.unique(X[feature_name], return_counts=True)
            for index, value in enumerate(unique):
                positive_prop = positive_pairs[value]/counts[index]
                entropy = DecisionTreeID3.entropy(positive_prop, 1-positive_prop)
                information_gain -= counts[index]/num_of_rows*entropy
            feature_unique_values[feature_name] = unique
            if best_feature.information_gain <= information_gain:
                best_feature = FeatureInfo(feature_name, information_gain)
        self.feature_name = best_feature.feature_name
        for child in self.add_level(feature_unique_values[self.feature_name]):
            new_indexes = X[self.feature_name] == child.value
            child.split_epoch(X.drop(self.feature_name, axis=1).loc[new_indexes], y[list(new_indexes)])

    def predict_cascade(self, row: pd.Series) -> any:
        if self.class_label is not None:
            return self.class_label
        for child in self.children:
            if row[self.feature_name] == child.value:
                return child.predict_cascade(row)

@dataclass
class FeatureInfo:
    feature_name: str
    information_gain: float

class DecisionTreeID3:

    def __init__(self):
        self.root = Node(None, None)
        self.depth = 0

    def __str__(self) -> str:
        return str(self.root)

    def fit(self, X: pd.DataFrame, y: np.array, rand_features:bool=True):
        self.root.split_epoch(X, y, rand_features)
        return self

    def predict(self, X: pd.DataFrame) -> np.array:
        predict_result = []
        for row in range(len(X.index)):
            predict_result.append(self.root.predict_cascade(X.loc[row]))
        return np.array(predict_result)
         
    @staticmethod
    def entropy(positive_proportion: float, negative_proportion: float) -> float:
        return (-positive_proportion*math.log2(positive_proportion) if positive_proportion != 0 else 0) -(negative_proportion*math.log2(negative_proportion) if negative_proportion != 0 else 0) 

In [49]:
class RandomForestClassifier:
    
    def __init__(self, n_estimators:int=100, rand_features:bool=True):
        self.n_estimators: int = n_estimators
        self.estimators: list[DecisionTreeID3] = []
        self.rand_features = rand_features

    def fit(self, X: pd.DataFrame, y: np.array):
        n = len(X.index)
        for i in range(self.n_estimators):
            bootstrap_samples = np.random.randint(n, size=n)
            new_estimator = DecisionTreeID3().fit(X.iloc[bootstrap_samples], y[bootstrap_samples], self.rand_features)
            self.estimators.append(new_estimator)
        return self

    def predict(self, X: pd.DataFrame) -> np.array:
        counters: list[dict] = [{}]*len(X.index)
        for tree in self.estimators:
            prediction = tree.predict(X)
            for index, label in enumerate(prediction):
                if label in counters[index]:
                    counters[index][label] += 1
                else:
                    counters[index][label] = 1

        return np.array(list(map(lambda elem: max(elem, key=elem.get), counters)))

In [50]:
df = pd.DataFrame({'Opady': ['brak', 'mżawka', 'burza', 'burza', 'brak', 'brak'], 'Temperatura': ['ciepło', 'ciepło', 'ciepło', 'zimno', 'zimno', 'zimno'], 'Mgła': ['brak', 'lekka', 'brak', 'lekka', 'duża', 'brak'], 'Stan pogody': ['dobra', 'dobra', 'zła', 'zła', 'zła', 'dobra']})
X = df.drop('Stan pogody', axis=1)
y = df['Stan pogody'].copy()
enc = OrdinalEncoder(categories=[['zła', 'dobra']], dtype=np.int8)
y = enc.fit_transform(y.values.reshape(-1,1)).flatten()

In [51]:
forest = RandomForestClassifier()
forest.fit(X, y)

<__main__.RandomForestClassifier at 0x154e428ad90>

In [52]:
forest.predict(pd.DataFrame({'Opady': ['burza', 'brak'], 'Temperatura': ['zimno', 'ciepło'], 'Mgła': ['duża', 'brak']}))

array([0, 0], dtype=int8)

In [53]:
tree = DecisionTreeID3()
tree.fit(X, y)
print(tree)

Temperatura!None!None-None!ciepło!1
-None!zimno!0


In [54]:
print(X, y)

    Opady Temperatura   Mgła
0    brak      ciepło   brak
1  mżawka      ciepło  lekka
2   burza      ciepło   brak
3   burza       zimno  lekka
4    brak       zimno   duża
5    brak       zimno   brak [1 1 0 0 0 1]


In [55]:
tree.predict(pd.DataFrame({'Opady': ['burza', 'brak'], 'Temperatura': ['zimno', 'ciepło'], 'Mgła': ['duża', 'brak']}))

array([0, 1], dtype=int8)

In [56]:
n = len(X.index)
X.iloc[np.random.randint(n, size=n)] #bootstrap

Unnamed: 0,Opady,Temperatura,Mgła
3,burza,zimno,lekka
3,burza,zimno,lekka
5,brak,zimno,brak
4,brak,zimno,duża
5,brak,zimno,brak
0,brak,ciepło,brak


In [57]:
sample = np.random.randint(n, size=n)
print(X, y, X.iloc[sample], y[sample])

    Opady Temperatura   Mgła
0    brak      ciepło   brak
1  mżawka      ciepło  lekka
2   burza      ciepło   brak
3   burza       zimno  lekka
4    brak       zimno   duża
5    brak       zimno   brak [1 1 0 0 0 1]    Opady Temperatura   Mgła
2  burza      ciepło   brak
4   brak       zimno   duża
2  burza      ciepło   brak
2  burza      ciepło   brak
0   brak      ciepło   brak
3  burza       zimno  lekka [0 0 0 0 1 0]


In [58]:
%timeit np.array(list(map(lambda elem: max(elem, key=elem.get), [{'a':1, 'b':2}])))

1.67 µs ± 33.6 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [59]:
%timeit np.fromiter(map(lambda elem: max(elem, key=elem.get), [{1:1, 2:2}]), dtype=np.int64)

1.39 µs ± 19.1 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [60]:
print(forest.estimators[0])

Mgła!None!None-None!brak!1
-None!duża!0
-None!lekka!0


In [61]:
n = len(X.index)
elo = np.random.randint(n, size=n)
test = X.loc[elo]
test2 = y[elo]
test.loc[pd.Series([True, True, False, True, False, False])]
test2[pd.Series([True, True, False, True, False, False])]

array([1, 1, 1], dtype=int8)

In [62]:
test2[pd.Series([True, True, False, True, False, False])]

array([1, 1, 1], dtype=int8)

In [63]:
test2[pd.Series([False, False, False, False, True, False], index=[4, 4 ,2, 5, 1, 0])]

array([0], dtype=int8)

In [64]:
tuple(pd.Series([False, False, False, False, True, False], index=[4, 4 ,2, 5, 1, 0]))#.reset_index(drop=True)

(False, False, False, False, True, False)

In [65]:
test2[list(test['Opady'] == 'brak')]

array([1, 1, 1, 1], dtype=int8)

In [66]:
test2

array([1, 1, 1, 1, 0, 0], dtype=int8)

In [67]:
list(test['Opady'] == 'brak')

[True, True, True, True, False, False]