In [1]:
from sklearn.impute import KNNImputer
from sklearn.experimental import enable_halving_search_cv
from sklearn.model_selection import train_test_split, HalvingGridSearchCV, StratifiedKFold
from sklearn.tree import DecisionTreeClassifier
from ucimlrepo import fetch_ucirepo
from NaDropper import HighNaNDropper
from sklearn.pipeline import Pipeline
import numpy as np
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import cross_validate
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [2]:
def build_pipeline(**args):
    pipeline = Pipeline([
        ('dropper', HighNaNDropper(threshold=0.3)),
        ('imputer', KNNImputer()),
        ('classifier', DecisionTreeClassifier(**args))
    ])
    return pipeline

In [3]:
def build_preprocess_pipeline():
    pipeline = Pipeline([
        ('imputer', KNNImputer()),
    ])
    return pipeline

In [4]:
def process_dataset(x, y, balance=True):
    x_train, x_test, y_train, y_test = train_test_split(x, y, stratify=y, shuffle=True)
    y_train = np.ravel(y_train)
    preprocess_pipeline = build_preprocess_pipeline()
    x_train_processed = preprocess_pipeline.fit_transform(x_train)
    if balance:
        x_resampled, y_resampled = SMOTE().fit_resample(x_train_processed, y_train)
        return x_resampled, y_resampled, x_test, y_test
    else:
        return x_train_processed, y_train, x_test, y_test

In [5]:
def get_hyperparams_for_dataset(x, y, scoring):
    param_grid = {
    'classifier__ccp_alpha': np.linspace(0, 0.25, 25),
    'classifier__max_depth': [5, 10, 20, None],
    'classifier__min_samples_leaf': [2, 3, 5, 10]
    }
    pipeline = build_pipeline()
    search = HalvingGridSearchCV(pipeline, param_grid=param_grid, aggressive_elimination=True, n_jobs=2, scoring=scoring)
    search.fit(x, y)
    ccp_alpha, max_depth, min_samples = search.best_params_['classifier__ccp_alpha'], search.best_params_['classifier__max_depth'], search.best_params_['classifier__min_samples_leaf']
    return ccp_alpha, max_depth, min_samples

In [6]:
def build_for_hyperparams(ccp, max_depth, min_samples_leaf):
    pipeline = build_pipeline(ccp_alpha=ccp, max_depth=max_depth, min_samples_leaf=min_samples_leaf)
    return pipeline

In [7]:
from sklearn.model_selection import cross_val_score

def dump_cv_metrics(pipeline, x, y, stratified=False, scoring='f1_micro'):
    cv_results = cross_val_score(pipeline, x, y, cv=StratifiedKFold(shuffle=True) if stratified else 5, scoring=scoring)
    result_array = np.array(cv_results)
    print(f"Average F1 Score: {result_array}")

In [8]:
def dump_metrics(pipeline, x_test, y_test):
    y_pred = pipeline.predict(x_test)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')

    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

In [11]:
import os.path
import pickle as pkl

def load_cache_dataset(filename: str, id: int):
    if os.path.exists(filename):
        with open(filename, 'rb') as f:
            dataset = pkl.load(f)
    else:
        dataset = fetch_ucirepo(id=id)
        with open(filename, 'wb') as f:
            pkl.dump(dataset, f)
    return dataset

polish_companies_bankruptcy = load_cache_dataset('pcb.pkl', 365)
iris = load_cache_dataset('iris.pkl', 53)
iris.data.targets = LabelEncoder().fit_transform(iris.data.targets)

  y = column_or_1d(y, warn=True)


In [7]:
x_companies, y_companies, x_companies_test, y_companies_test = process_dataset(polish_companies_bankruptcy.data.features, polish_companies_bankruptcy.data.targets)
x_iris, y_iris, x_iris_test, y_iris_test = process_dataset(iris.data.features, iris.data.targets)

In [42]:
hyperparams_iris = get_hyperparams_for_dataset(x_iris, y_iris, 'f1_micro')
hyperparams_iris

(np.float64(0.1875), None, 3)

In [43]:
hyperparams_pcb = get_hyperparams_for_dataset(x_companies, y_companies, 'f1')
hyperparams_pcb

(np.float64(0.0), 5, 2)

In [44]:
iris_pipeline = build_for_hyperparams(*hyperparams_iris)
iris_pipeline.fit(x_iris, y_iris)

Accuracy: 0.8947
Precision: 0.8947
Recall: 0.8947
F1 Score: 0.8947




# Cross-validation stratification effect

In [48]:
iris_pipeline = build_for_hyperparams(*hyperparams_iris)
pcb_pipeline = build_for_hyperparams(*hyperparams_pcb)

In [76]:
dump_cv_metrics(iris_pipeline, x_iris, y_iris, False)

Average F1 Score: 0.9470740569461796


In [77]:
dump_cv_metrics(iris_pipeline, x_iris, y_iris, True)

Average F1 Score: 0.9472478076058639


In [78]:
dump_cv_metrics(pcb_pipeline, x_companies, y_companies, False)

Average F1 Score: 0.7990705365270216


In [79]:
dump_cv_metrics(pcb_pipeline, x_companies, y_companies, True)

Average F1 Score: 0.8013068918841968


# Class weight in decision tree

In [12]:
weighted_pipeline = build_pipeline(class_weight='balanced', ccp_alpha=0.01)
unweighted_pipeline = build_pipeline(ccp_alpha=0.01)

In [13]:
x_train, y_train, x_test, y_test = process_dataset(polish_companies_bankruptcy.data.features, polish_companies_bankruptcy.data.targets, balance=False)

In [14]:
weighted_pipeline.fit(x_train, y_train)
unweighted_pipeline.fit(x_train, y_train)

In [18]:
dump_cv_metrics(weighted_pipeline, x_test, y_test, False, scoring='recall')
dump_cv_metrics(unweighted_pipeline, x_test, y_test, False, scoring='recall')

Average F1 Score: [0.76190476 0.6        0.63461538 0.75961538 0.76190476]
Average F1 Score: [0. 0. 0. 0. 0.]
