In [None]:
import pandas as pd
from sklearn.base import BaseEstimator, ClassifierMixin, clone
from sklearn.ensemble import BaseEnsemble
from scipy.stats import mode
import numpy as np

In [None]:
from rolling_lookahead_dt_pulp import rollo_oct_pulp
from sklearn.utils.validation import check_is_fitted
#from rolling_lookahead_dt_pulp.oct.optimal_tree_pulp import predict_model_pulp

from rolling_lookahead_dt_pulp.rolling_tree.rolling_optimize_pulp import rolling_optimize_pulp
from rolling_lookahead_dt_pulp.oct.tree import *
from rolling_lookahead_dt_pulp.oct.optimal_tree_pulp import *
from helpers.helpers import preprocess_dataframes

# was hiermit eben nicht geht ist, dass man auf Trainingsdaten trainiert (was einem das reine Modell geben sollte). Dabei werden aber leider gleichzeitig
# die Testdaten auf diesen Modell predicted
# Das Resultat ist also, dass man nicht andere Testdaten auf dem fertigen modell testen kann

class CustomTreeWrapper(BaseEstimator, ClassifierMixin):
    def __init__(self, train_data, test_data, depth=None, criterion='gini', target_label=None, features=None, time_limit = 1800, big_m = 99):
        self.depth = depth
        self.criterion = criterion
        self.test_data = test_data
        self.train_data = train_data
        self.target_label = target_label
        self.features = features
        self.time_limit = time_limit
        self.big_m = big_m
        self.construct()

    def construct(self):
        train, test = preprocess_dataframes( #./rollo_oct/utils/helpers.py
        train_df = self.train_data,
        test_df = self.test_data,
        target_label = self.target_label,
        features = self.features)

        df = pd.concat([train, test])
        self.P = [int(i) for i in
            list(train.loc[:, train.columns != 'y'].columns)]
        train.columns = ["y", *self.P]
        test.columns = ["y", *self.P]
        self.K = sorted(list(set(df.y)))

        self.result_dict = {} #adding dict to store solutions for every level
        self.result_dict['tree'] = {}
        self.result_dict['tree'][2] = {}
        
        # generate model
        self.main_model = generate_model_pulp(P=self.P, K=self.K, data=train, y_idx=0, big_m=self.big_m, criterion=self.criterion)
    
    def fit(self, X, y):

        self.train_data = pd.concat([y, X], axis=1, ignore_index=False)

        train, test = preprocess_dataframes( #./rollo_oct/utils/helpers.py
                                            train_df = self.train_data,
                                            test_df = self.test_data,
                                            target_label = self.target_label,
                                            features = self.features)
        
        self.P = [int(i) for i in 
            list(train.loc[:, train.columns != 'y'].columns)]
        
        self.main_model = train_model_pulp(model_dict=self.main_model, data=train, P=self.P)

        self.result_dict['tree'][2]['trained_dict'] = self.main_model

        # predict model
        result_train = predict_model_pulp(data=train, model_dict=self.main_model, P=self.P)

        misclassified_leafs = find_misclassification(df=result_train)

        result_test = predict_model_pulp(data=test, model_dict=self.main_model, P=self.P)
        
        
        train_acc = len(result_train.loc[result_train["prediction"] == result_train["y"]]) / \
                    len(result_train["y"])

        test_acc = len(result_test.loc[result_test["prediction"] == result_test["y"]]) / \
                len(result_test["y"])
        
        
        self.result_dict['tree'][2]['train'] = result_train[['y', 'prediction', 'leaf']]
        self.result_dict['tree'][2]['test'] = result_test[['y', 'prediction', 'leaf']]

        self.result_dict[2] = {
        "training_accuracy": train_acc,
        "test_accuracy": test_acc
        }

        train = train.drop(["prediction", "leaf"], axis=1)
        test = test.drop(["prediction", "leaf"], axis=1)

        if self.depth > 2:
            self.result_dict = rolling_optimize_pulp(predefined_model=self.main_model,
                                            train_data=train,
                                            test_data=test,
                                            main_depth=2,
                                            target_depth=self.depth,
                                            features=self.P,
                                            time_limit=self.time_limit,
                                            to_go_deep_nodes=misclassified_leafs,
                                            result_dict=self.result_dict,
                                            criterion=self.criterion)

        self.is_fitted_ = True
        return self
    
    def predict(self, X):
        check_is_fitted(self, 'is_fitted_')

        #print(X)

        model_dict = self.result_dict['tree'][self.depth]['trained_dict']

        dummy = pd.DataFrame({'y': [None]*len(X)}, index=X.index)

        test = pd.concat([dummy, X], axis=1)

        #print(test)

        res = predict_model_pulp(data=test, model_dict=model_dict, P=self.P)

        #print(res)
        
        preds = res['prediction']
        if preds is None:
            raise RuntimeError("No stored predictions found. Run fit first.")
        
        #check = self.result_dict['tree'][self.depth]['test']
        #check = check.drop(columns=['y', 'leaf'])
        #print(check)

        #print(preds.equals(check['prediction']))
        return preds

Forest

In [None]:
from sklearn.utils import resample
from scipy.stats import mode

class CustomEnsembleClassifier:
    def __init__(self, n_estimators=10, tree_kwargs=None, random_state=None):
        """
        n_estimators: number of trees in the ensemble
        tree_kwargs: dictionary of keyword args for CustomTreeWrapper (except train_data and test_data)
        random_state: seed for reproducible bootstrap sampling
        """
        self.n_estimators = n_estimators
        self.tree_kwargs = tree_kwargs if tree_kwargs is not None else {}
        self.random_state = random_state
        self.trees_ = []
        self.bootstrap_indices_ = []

    def fit(self, X, y):
        """
        X: pd.DataFrame of features
        y: pd.Series of target labels
        """
        rng = np.random.RandomState(self.random_state)
        self.trees_ = []
        self.bootstrap_indices_ = []

        indices = np.array(X.index)
        for i in range(self.n_estimators):
            # Bootstrap sample indices
            sample_indices = rng.choice(indices, size=len(indices), replace=True)
            oob_mask = ~np.in1d(indices, sample_indices)
            oob_indices = indices[oob_mask]
            self.bootstrap_indices_.append((sample_indices, oob_indices))

            # Create train_data DataFrame: target as first column, features with integer columns
            X_boot = X.loc[sample_indices]
            y_boot = y.loc[sample_indices]
            train_data = pd.concat([y_boot, X_boot], axis=1)
            train_data.columns = ['y'] + list(range(X.shape[1]))

            # Out-of-bag for test_data
            if len(oob_indices) > 0:
                X_oob = X.loc[oob_indices]
                y_oob = y.loc[oob_indices]
                test_data = pd.concat([y_oob, X_oob], axis=1)
                test_data.columns = ['y'] + list(range(X.shape[1]))
            else:
                # If somehow no oob sample, just use train_data (edge case)
                test_data = train_data.copy()

            # Initialize and fit the tree
            tree = CustomTreeWrapper(train_data=train_data,
                                     test_data=test_data,
                                     **self.tree_kwargs)
            tree.fit(X_boot, y_boot)
            self.trees_.append(tree)

        return self

    def predict(self, X):
        """
        Majority-vote ensemble prediction.
        Returns: pd.Series with predictions, aligned to X.index
        """
        # Aggregate predictions (each as Series aligned to X.index)
        all_preds = []
        for tree in self.trees_:
            pred = tree.predict(X)
            # ensure we have a Series matching X's index
            if not isinstance(pred, pd.Series):
                pred = pd.Series(pred, index=X.index)
            all_preds.append(pred)
        # Stack and compute mode along 0 axis
        preds_matrix = pd.concat(all_preds, axis=1)
        maj_vote = preds_matrix.mode(axis=1)[0]
        maj_vote.index = X.index
        return maj_vote

    def predict_proba(self, X):
        """
        For binary classification:
        Returns an array of shape (n_samples, 2)
        """
        all_preds = []
        for tree in self.trees_:
            pred = tree.predict(X)
            if not isinstance(pred, pd.Series):
                pred = pd.Series(pred, index=X.index)
            all_preds.append(pred)
        preds_matrix = pd.concat(all_preds, axis=1)

        # Works for binary or multiclass
        classes_ = np.unique(preds_matrix.values)
        proba = np.zeros((X.shape[0], len(classes_)))
        for i, c in enumerate(classes_):
            proba[:, i] = (preds_matrix == c).sum(axis=1) / self.n_estimators
        return proba

    def oob_score(self, X, y):
        """
        Returns out-of-bag score.
        """
        # Prepare OOB predictions
        oob_votes = {idx: [] for idx in X.index}
        for (sample_ind, oob_ind), tree in zip(self.bootstrap_indices_, self.trees_):
            if len(oob_ind) == 0:
                continue
            X_oob = X.loc[oob_ind]
            preds = tree.predict(X_oob)
            for idx, pred in preds.items():
                oob_votes[idx].append(pred)
        # Only score samples with at least one OOB prediction
        final_oob_preds = []
        final_oob_true = []
        for idx, votes in oob_votes.items():
            if votes:
                final_oob_preds.append(mode(votes)[0][0])
                final_oob_true.append(y.loc[idx])
        if not final_oob_preds:
            raise ValueError("No OOB predictions collected.")
        accuracy = np.mean(np.array(final_oob_preds) == np.array(final_oob_true))
        return accuracy

Testing Tree

In [None]:
from sklearn.model_selection import train_test_split

data_seismic = pd.read_csv("datasets/seismic/seismic_bin.csv")


X = data_seismic.drop(columns=['y'])  # All columns except the target
y = data_seismic['y']                 # Only the target column

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=100, stratify=y, random_state=42)

stacked_train = pd.concat([y_train, X_train], axis=1, ignore_index=False)
stacked_test = pd.concat([y_test, X_test],axis=1, ignore_index=False)

train_data = stacked_train
test_data = stacked_test

#feature_columns = train_data.columns[1:]

feature_columns = X_train.columns
#print(feature_columns)

In [None]:
# Usage outside:
wrapper = CustomTreeWrapper(
    depth=3,
    criterion='gini',
    test_data=test_data,
    train_data= train_data,
    target_label='y',
    features=feature_columns
)
wrapper.fit(X_train, y_train); #semicolon for suppressing when interactive environment (Jupyter) tries to display the returned object, suppresses repr(self)


In [None]:
#predictions = wrapper.predict(test_data)
predictions = wrapper.predict(X_test)
print(predictions)

Forest

In [None]:
# Assume you already have: X_train, X_test, y_train, y_test (pandas DataFrame/Series)
tree_kwargs = dict(depth=3, criterion='gini', target_label='y', features=list(range(X_train.shape[1])))

ensemble = CustomEnsembleClassifier(n_estimators=10, tree_kwargs=tree_kwargs, random_state=42)
ensemble.fit(X_train, y_train)
y_pred = ensemble.predict(X_test)