# Notebook to solve the kaggle competition "Child Mind Institute - Detect Sleep States"
Link to the competition: https://www.kaggle.com/competitions/child-mind-institute-detect-sleep-states


### Install packages

In [1]:
%pip install matplotlib plotly pandas numpy tqdm scikit-learn pyarrow

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.1.2 -> 23.3.2
[notice] To update, run: python.exe -m pip install --upgrade pip


### Imports

In [2]:
from itertools import product
from tqdm import tqdm
from score import score
#from sklearn.svm import SVC
#from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import recall_score, precision_score, average_precision_score, f1_score
import wandb
from abc import ABC, abstractmethod #, classmethod
from math import ceil, floor
import os
from pathlib import Path
from joblib import dump, load
from custom_enums import ModelTrainingType
import pandas as pd
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import gc
from scipy.special import softmax

In [3]:

# wandb.login() requires you to get your API key from your account settings
# open the weights and biases website https://wandb.ai/login and login to your account
# then go to your account settings and copy the API key
# paste it in the input box and hit enter

#wandb.login() #TODO: uncomment this line to login to your account

In [4]:
# # start a new wandb run to track this script
# wandb.init(
#     # set the wandb project where this run will be logged
#     project="my-awesome-project",
    
#     # track hyperparameters and run metadata
#     config={
#     "learning_rate": 0.02,
#     "architecture": "CNN",
#     "dataset": "CIFAR-100",
#     "epochs": 10,
#     }
# )

To be able to switch between different models from different libraries at a glance, we implement an interface called `IPipelineRequirements`. This allows us to make the pipleine even more robust and easier to extend upon. 

In [5]:
class IPipelineRequirements(ABC):
    @abstractmethod
    def train(self):
        pass

    @abstractmethod
    def predict(self):
        pass

    @abstractmethod
    def save(self):
        pass

    @abstractmethod
    def load(self):
        pass

    @abstractmethod
    def evaluate(self):
        pass

Our first model is the baseline model, which just takes the mean over all onset and wakeup times and tries to predict `onset` and `wakeup` events with the calculated time.

In [6]:
class BaselineModel(IPipelineRequirements):
    def __init__(self):
        pass

    def train(self):
        raise NotImplementedError("Please Implement this method")

    def predict(self):
        raise NotImplementedError("Please Implement this method")

    def save(self):
        raise NotImplementedError("Please Implement this method")

    def load(self):
        raise NotImplementedError("Please Implement this method")

    def evaluate(self):
        raise NotImplementedError("Please Implement this method")

In [7]:
g = BaselineModel()

Numerous studies have concentrated on applications using `RandomForest`. The primary motivation for this preference is the model's inherent transparency in decision-making processes, which are readily identifiable in such models. Subsequent to the BaselineModel, efforts have been made to abstract models from the Scikit-learn library. Fortunately, the majority of models within their API exhibit consistent implementation patterns, facilitating their integration into the processing pipeline.

In [8]:
class SkLearnModel(IPipelineRequirements):

    def __init__(self, model_path):
        self.load(model_path)

    def __init__(self, model, identifier, scaler=StandardScaler, sk_model_params=None):
        self._model = model
        self._scaler = scaler if not callable(scaler) else scaler() # if scaler is a classpointer, instantiate it
        self.identifier = identifier
        self._model_params = sk_model_params

    def train(self, X, y, not_scaled=False, **kwargs):
        if not_scaled:
            X = self._scaler.transform(X)

        # due to the big dataset we need to check wich model was instantiated and do some model
        # specific stuff to enable us to train the model in batches
        if isinstance(self._model, RandomForestClassifier):
            new_estimators = kwargs['add_estimators'] if 'add_estimators' in kwargs else 50
            if self._model.warm_start: self._model.n_estimators += new_estimators
            #print(f'Current estimator increased to {self._model.n_estimators}, {new_estimators} added this round.')
        # if isinstance(self._model, SVC):
        #     pass

        self._model.fit(X, y)

    def predict(self, X, not_scaled=False):
        if not self._model: raise ValueError('Please load or train a model first.')
        if not_scaled:
            if not self._scaler: raise ValueError('Please load or fit a scaler first.')
            X = self._scaler.transform(X)
        return self._model.predict(X)

    def save(self):
        try:
            Path.mkdir(Path('models'), exist_ok=True)
            with open(f'models/model_{self.identifier}.jlb', 'wb') as f:
                dump(self._model, f)
            with open(f'models/scaler_{self.identifier}.jlb', 'wb') as f:
                dump(self._scaler, f)
        except:
            raise ValueError('Unable to save model and scaler.')

    def load(self, filepath):
        try:
            # load model and scaler
            if os.path.isfile(filepath):
                print(f'Loading model from {filepath}')
                with open(filepath, 'rb') as f:
                    self._model = load(f)
            scaler_path = f'{os.path.split(filepath)[0]}/scaler_{os.path.split(filepath)[1].split(".")[0].split("_")[1]}.pkl'
            if os.path.isfile(scaler_path):
                print(f'Loading scaler from {scaler_path}')
                with open(filepath, 'rb') as f:
                    self._scaler = load(f)

            # extract identifier from filename
            self._identifier = filepath.split('_')[1].split('.')[0]
        except (FileNotFoundError) as e:
                print(f'File {e} not found')
                raise ValueError('Filepath is not valid. Unable to load model and scaler.')
        except (IndexError) as e:
                print(f'The name of the file does not implement the convention "<model|scaler>_<some identifier>".')
                raise ValueError('Filepath is not valid. Unable to load model and scaler.')
        except (Exception) as e:
            print(f'Something went wrong. {e}')
            raise ValueError('Unknown Error.')

    def evaluate(self, X, y, scoreFx=None, not_scaled=False, prepredicted=None):
        if not scoreFx:
            raise ValueError('Please provide a score function.')
        if not self._model:
            raise ValueError('Please load or train a model first.')
        if not_scaled and prepredicted is None:
            if not self._scaler: raise ValueError('Please load or fit a scaler first.')
            X = self._scaler.transform(X)

        return scoreFx(y, prepredicted if prepredicted is not None else self._model.predict(X))

### Import the training and the validation data

In [10]:
NEW_TRAIN_DATA = '../data/train/'
NEW_VALIDATION_DATA = '../data/val/'
TRAIN_DATA = '../data/train_20231021'
#TRAIN_DATA = '../data/train_20231021_20M'
VALIDATION_DATA = '../data/validation_20231021'

# figure out series id mapping
SERIES_MAPPING_ENABLED = False
if not 'series_id_mapping' in vars() and SERIES_MAPPING_ENABLED:
    series_id_mapping = {'train': dict(), 'validation': dict()}
    t = ds.dataset(NEW_TRAIN_DATA).to_table(columns=['series_id'])
    #t = pq.ParquetDataset(TRAIN_DATA).read(columns=['series_id'])
    for i, data in enumerate(t.to_pandas()['series_id'].unique()):
        series_id_mapping['train'][data] = i
    #v = pq.ParquetDataset(VALIDATION_DATA).read(columns=['series_id'])
    v = ds.dataset(NEW_VALIDATION_DATA).to_table(columns=['series_id'])
    for i, data in enumerate(v.to_pandas()['series_id'].unique()):
        series_id_mapping['validation'][data] = i
    del t, v
    gc.collect()

#train_dataset_length = pq.ParquetFile(TRAIN_DATA).metadata.num_rows
train_dataset_length = ds.dataset(NEW_TRAIN_DATA).count_rows()
validation_dataset_length = ds.dataset(NEW_VALIDATION_DATA).count_rows()

def dataloader_full_dataset(validation=False):
    return pd.read_parquet(VALIDATION_DATA if validation else TRAIN_DATA)

def dataloader(validation=False, batch_size=5_000_000):
    parquet_file = pq.ParquetFile(VALIDATION_DATA if validation else TRAIN_DATA)
    for i in parquet_file.iter_batches(batch_size=batch_size):
        yield i.to_pandas()

def batched_dataloader(validation=False, batch_size=100_000):
    dataset = ds.dataset(NEW_VALIDATION_DATA if validation else NEW_TRAIN_DATA)
    batch = pd.DataFrame()
    for file_batch in dataset.to_batches(batch_size=batch_size):
        batch = pd.concat([batch, file_batch.to_pandas()])
        if len(batch) >= batch_size:
            yield batch.reset_index(drop=True)
            batch = pd.DataFrame()
    yield batch.reset_index(drop=True)


In [11]:
def load_model_from_back(path: str) -> RandomForestClassifier:
    with open(path, 'rb') as f:
        model = load(f)
    return model

In [12]:
columns_to_drop = ['step', 'series_id', 'awake', 'wearable_on', 'seconds', 'year']

In [1]:
raise(ValueError('Forced stop, allows restarting the kernel and using the runn all button.'))

ValueError: Forced stop, allows restarting the kernel and using the runn all button.

### Run the pipeline

This pipline trains two classifiers with all possible combinations of hyperparameters, the sensor worn classifier and the sleep state classifier

In [None]:
# define all the combinations of models and features
models_and_hyperparams = {
    RandomForestClassifier: {
        'params': {
            'n_estimators': [500], #, 600, 400],
            'max_depth': [None, 20, 40, 60],
            'min_samples_leaf': [5, 15, 25],
            'random_state': [42],
            'n_jobs': [10],
            #'criterion': ['gini', 'entropy', 'log_loss'],
            'warm_start': [True]
        },
        'modeltype': SkLearnModel,
        'scaler': StandardScaler,
        'training_type': ModelTrainingType.BATCH,
        'batch_size': 3_000_000
    }
}

# define scaler
pretrained_scalers = dict() # used to store trained scalers for later use

# loop over all combinations and append it to the configurations list
# if there are no hyperparams, just instantiate the model without params
for model_type, hyperparams in models_and_hyperparams.items():
    if len(hyperparams['params']) > 0: # if there are hyperparams, build a dict and pass it to the model as parameters
        fx_param_names, fx_param_values = zip(*hyperparams['params'].items())
        for cartesian_product_values in product(*fx_param_values):
            hyperparam_dict = dict(zip(fx_param_names, cartesian_product_values))

            # create or reuse the scaler specified in the models_and_hyperparams dictionary
            if hyperparams['scaler'] in pretrained_scalers:
                print(f'Using pretrained scaler {hyperparams["scaler"].__name__}')
                scaler = pretrained_scalers[hyperparams['scaler']]
            else:
                scaler = hyperparams['scaler']()
                print(f'Start fitting scaler {hyperparams["scaler"].__name__}')
                for batch in tqdm(batched_dataloader(batch_size=hyperparams['batch_size']) if hyperparams['training_type'] == ModelTrainingType.BATCH else dataloader_full_dataset()):
                    X = batch.drop(columns_to_drop, axis=1)
                    #X = batch.drop(['wearable_on', 'awake', 'wakeup', 'onset'], axis=1)
                    #X['series_id'] = X['series_id'].map(series_id_mapping['train'])
                    scaler.partial_fit(X)
                    del X
                    gc.collect()
                # save the scaler for later use
                pretrained_scalers[hyperparams['scaler']] = scaler

            # create the model from the modeltype and the hyperparam_dict
            m = hyperparams['modeltype'](model_type(**hyperparam_dict), \
                                        f'{model_type.__name__}-wrist-{"-".join([f"{na}__{str(va)}" for na, va in hyperparam_dict.items()])}'.lower(), \
                                        scaler, \
                                        hyperparam_dict)
            m2 = hyperparams['modeltype'](model_type(**hyperparam_dict), \
                                        f'{model_type.__name__}-sleep-{"-".join([f"{na}__{str(va)}" for na, va in hyperparam_dict.items()])}'.lower(), \
                                        scaler, \
                                        hyperparam_dict)

            # init wandb
            # start a new wandb run to track this script
            #cfg = m._model_params.copy()
            #cfg['model'] = m._model.__name__
            #wandb.init(project="classic_models", config=cfg)

            # train model
            rounds = 1
            if hyperparams['training_type'] == ModelTrainingType.BATCH:
                print(f'Model will be traind in batches of {hyperparams["batch_size"]} samples.')
                print(f'Dataset contains {train_dataset_length} samples. Batchsize is {hyperparams["batch_size"]}. That means {(rounds := train_dataset_length / hyperparams["batch_size"])} batches will be used to fit the model.')
                if model_type == RandomForestClassifier:
                    # print(f'At the end {hyperparam_dict["n_estimators"]} estimators will be fitted. That means, {hyperparam_dict["n_estimators"]//rounds} estimators per batch.')
                    m._model.n_estimators = 0 #set to 0. model will increase the number of estimators in each round
                    m2._model.n_estimators = 0 #set to 0. model will increase the number of estimators in each round
                    print(f'Build estimator distribution for {rounds} rounds.')
                    est_anteil = [1 for _ in range(floor(rounds))]
                    est_anteil.append(rounds % floor(rounds))
                    est_anteil = softmax(est_anteil)
                    est_anteil *= hyperparam_dict['n_estimators']
                    est_anteil = est_anteil.round().astype(int)
                    print(f'Estimator distribution per round: {est_anteil}')

            print(f'Start training model {m.identifier}')
            for i, batch in enumerate(tqdm(batched_dataloader(batch_size=hyperparams['batch_size']) if hyperparams['training_type'] == ModelTrainingType.BATCH else dataloader_full_dataset(), total=floor(rounds)+1)):
                y = batch['wearable_on']
                X = batch.drop(columns_to_drop, axis=1)
                #X = batch.drop(['wearable_on', 'awake'], axis=1)
                #X['series_id'] = X['series_id'].map(series_id_mapping['train'])
                args = {}
                if model_type == RandomForestClassifier and hyperparams['training_type'] == ModelTrainingType.BATCH: args['add_estimators'] = est_anteil[i]
                m.train(X, y, not_scaled=True, **args)
                del X
                del y
                gc.collect()

            #  evaluate model
            print(f'Start evaluating model {m.identifier}')
            score_value_average_precision_score, score_value_recall, score_value_precision, score_f1 = [], [], [], []
            for validation in tqdm(batched_dataloader(batch_size=hyperparams['batch_size'], validation=True) if hyperparams['training_type'] == ModelTrainingType.BATCH else dataloader_full_dataset(validation=True), total = ceil(validation_dataset_length / hyperparams["batch_size"])):
                #validation['series_id'] = validation['series_id'].map(series_id_mapping['validation'])
                validation_y = validation['wearable_on']
                validation.drop(columns_to_drop, axis=1, inplace=True)
                #validation.drop(['wearable_on', 'awake'], axis=1, inplace=True)
                validation_y_hat = m.predict(validation, not_scaled=True)
                # this does the same as scoreFX(y, y_hat). It is wrapped in the models evaluate function.
                # the model itself could also do the prediction if prepredicted is None
                # then, the first argument of the evaluate function would be the X_validation data
                score_value_average_precision_score.append(m.evaluate(None, validation_y, scoreFx=average_precision_score, not_scaled=True, prepredicted=validation_y_hat))
                score_value_recall.append(m.evaluate(None, validation_y, scoreFx=recall_score, not_scaled=True, prepredicted=validation_y_hat))
                score_value_precision.append(m.evaluate(None, validation_y, scoreFx=precision_score, not_scaled=True, prepredicted=validation_y_hat))
                score_f1.append(m.evaluate(None, validation_y, scoreFx=f1_score, not_scaled=True, prepredicted=validation_y_hat))
                del validation, validation_y

            print(sum(score_value_average_precision_score) / len(score_value_average_precision_score))
            print(sum(score_value_recall) / len(score_value_recall))
            print(sum(score_value_precision) / len(score_value_precision))
            print(sum(score_f1) / len(score_f1))
            #  save model
            m.save()

            print(f'Start training model {m2.identifier}')
            for i, batch in enumerate(tqdm(batched_dataloader(batch_size=hyperparams['batch_size']) if hyperparams['training_type'] == ModelTrainingType.BATCH else dataloader_full_dataset(), total=floor(rounds)+1)):
                y = batch['awake']
                X = batch.drop(columns_to_drop, axis=1)
                #X = batch.drop(['wearable_on', 'awake'], axis=1)
                #X['series_id'] = X['series_id'].map(series_id_mapping['train'])
                args = {}
                if model_type == RandomForestClassifier and hyperparams['training_type'] == ModelTrainingType.BATCH: args['add_estimators'] = est_anteil[i]
                worn_y_hat = m.predict(X, not_scaled=True)
                # add worn_y_hat as new column to X
                X['pred_worn'] = worn_y_hat
                # filter out all rows where worn_y_hat is 0
                X = X[X['pred_worn'] == 1]
                y = y[y.index.isin(X.index)]
                X = X.drop(['pred_worn'], axis=1)
                m2.train(X, y, not_scaled=True, **args)
                del X, y
                gc.collect()

            print(f'Start evaluating model {m2.identifier}')
            score_value_average_precision_score_m2, score_value_recall_m2, score_value_precision_m2, f1_score_m2 = [], [], [], []
            for validation in tqdm(batched_dataloader(batch_size=hyperparams['batch_size'], validation=True) if hyperparams['training_type'] == ModelTrainingType.BATCH else dataloader_full_dataset(validation=True), total = ceil(validation_dataset_length / hyperparams["batch_size"])):
                #validation['series_id'] = validation['series_id'].map(series_id_mapping['validation'])
                validation_y = validation['awake']
                validation.drop(columns_to_drop, axis=1, inplace=True)
                #validation.drop(['wearable_on', 'awake'], axis=1, inplace=True)
                worn_y_hat = m.predict(validation, not_scaled=True)
                validation['pred_worn'] = worn_y_hat
                validation = validation[validation['pred_worn'] == 1]
                validation_y = validation_y[validation_y.index.isin(validation.index)]
                validation = validation.drop(['pred_worn'], axis=1)
                validation_y_hat = m2.predict(validation, not_scaled=True)
                # this does the same as scoreFX(y, y_hat). It is wrapped in the models evaluate function.
                # the model itself could also do the prediction if prepredicted is None
                # then, the first argument of the evaluate function would be the X_validation data
                score_value_average_precision_score_m2.append(m2.evaluate(None, validation_y, scoreFx=average_precision_score, not_scaled=True, prepredicted=validation_y_hat))
                score_value_recall_m2.append(m2.evaluate(None, validation_y, scoreFx=recall_score, not_scaled=True, prepredicted=validation_y_hat))
                score_value_precision_m2.append(m2.evaluate(None, validation_y, scoreFx=precision_score, not_scaled=True, prepredicted=validation_y_hat))
                f1_score_m2.append(m2.evaluate(None, validation_y, scoreFx=f1_score, not_scaled=True, prepredicted=validation_y_hat))

                del validation, validation_y

            print(sum(score_value_average_precision_score_m2) / len(score_value_average_precision_score_m2))
            print(sum(score_value_recall_m2) / len(score_value_recall_m2))
            print(sum(score_value_precision_m2) / len(score_value_precision_m2))
            print(sum(f1_score_m2) / len(f1_score_m2))
            #  save model
            m2.save()

            #break

            #wandb.finish()
    else:
        print("params in the dictionary cannot be empty. Use the standard values in the dictionary for the model", model_type.__name__)

In [72]:
import matplotlib.pyplot as plt
import plotly.express as px
import plotly
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from os import listdir

def plot_all_models_in_models(feature_names):
    # look up all models in the models folder
    models = [f for f in listdir('models') if f.startswith('model') and f.find('random_state__42') != -1 and f.find('both') != -1]
    models = ['model_randomforestclassifier-both-n_estimators__500-max_depth__40-min_samples_leaf__15-random_state__42-n_jobs__10-warm_start__true.jlb']
    models = ['model_randomforestclassifier-wrist-n_estimators__500-max_depth__20-min_samples_leaf__15-random_state__42-n_jobs__10-warm_start__true.jlb']
    for mod in models:
        with open (f'models/{mod}', 'rb') as f:
            m_to_print = load(f)
            fig = px.bar(
                x=feature_names,
                y=m_to_print.feature_importances_,
                title='Sensor Worn Classifier Feature Importance (> 0.025 only)'
                )
            # figure uptade y axis label
            fig.update_yaxes(title_text='Feature Importance')
            # figure update x axis label
            fig.update_xaxes(title_text='Feature')
            # plot only x axis labels if value is greater than 0.03
            fig.update_xaxes(tickvals=[f for i, f in enumerate(feature_names) if m_to_print.feature_importances_[i] >= 0.025])
            # plot x labels bigger
            fig.update_xaxes(tickfont=dict(size=10))
            # reduce area of plot to show only the relevant features
            fig.update_layout(width=1000, height=500)
            plt.tight_layout()
            plotly.offline.plot(fig, filename=f'plots/{mod}.png')
            fig.show()



plot_all_models_in_models(fg.columns)


Your filename `plots/model_randomforestclassifier-wrist-n_estimators__500-max_depth__20-min_samples_leaf__15-random_state__42-n_jobs__10-warm_start__true.jlb.png` didn't end with .html. Adding .html to the end of your file.



<Figure size 640x480 with 0 Axes>

In [32]:
additionally_drop = ['anglez_rolling_mean_1', 'anglez_rolling_max_1', 'anglez_rolling_std_1',
       'anglez_diff_1', 'anglez_diff_rolling_mean_1',
       'anglez_diff_rolling_max_1', 'anglez_diff_rolling_std_1', 'enmo_rolling_max_1', 'enmo_rolling_std_1', 'enmo_diff_1',
       'enmo_diff_rolling_mean_1', 'enmo_diff_rolling_max_1',
       'enmo_diff_rolling_std_1']

bad_performer_drop = ["enmo_rolling_mean_1",
"enmo_rolling_max_5",
"anglez_diff_480",
"enmo_diff_rolling_mean_30",
"anglez_diff_rolling_mean_30",
"enmo_diff_480",
"enmo_diff_rolling_std_5",
"anglez_diff_rolling_max_5",
"anglez_diff_120",
"enmo_rolling_std_5",
"enmo_diff_120",
"anglez_diff_30",
"minute",
"enmo_diff_rolling_max_5",
"enmo_diff_30",
"anglez_diff_rolling_mean_5",
"enmo_diff_rolling_mean_5",
"anglez_diff_5",
"enmo_diff_5", "anglez_abs", "enmo_abs"]
columns_to_drop = ['series_id', 'awake', 'wearable_on', 'seconds', 'year'] + additionally_drop + bad_performer_drop

In [25]:
len(m.feature_importances_)

45

In [None]:
m._model.feature_importances_

In [14]:
NEW_VALIDATION_DATA = '../data/OLD_DATA/'

In [33]:
ff = batched_dataloader(validation=True, batch_size=1)

In [34]:
fg = next(ff)

In [35]:
fg.drop(columns_to_drop, axis=1, inplace=True)

In [36]:
fg.columns

Index(['step', 'anglez', 'enmo', 'hour', 'day', 'month',
       'anglez_rolling_mean_5', 'anglez_rolling_max_5', 'anglez_rolling_std_5',
       'anglez_diff_rolling_std_5', 'anglez_rolling_mean_30',
       'anglez_rolling_max_30', 'anglez_rolling_std_30',
       'anglez_diff_rolling_max_30', 'anglez_diff_rolling_std_30',
       'anglez_rolling_mean_120', 'anglez_rolling_max_120',
       'anglez_rolling_std_120', 'anglez_diff_rolling_mean_120',
       'anglez_diff_rolling_max_120', 'anglez_diff_rolling_std_120',
       'anglez_rolling_mean_480', 'anglez_rolling_max_480',
       'anglez_rolling_std_480', 'anglez_diff_rolling_mean_480',
       'anglez_diff_rolling_max_480', 'anglez_diff_rolling_std_480',
       'enmo_rolling_mean_5', 'enmo_rolling_mean_30', 'enmo_rolling_max_30',
       'enmo_rolling_std_30', 'enmo_diff_rolling_max_30',
       'enmo_diff_rolling_std_30', 'enmo_rolling_mean_120',
       'enmo_rolling_max_120', 'enmo_rolling_std_120',
       'enmo_diff_rolling_mean_120', 'e

In [None]:
#sorted(list(zip(fg.columns, m2._model.feature_importances_.round(4))), key=lambda x: x[1], reverse=True)

This pipline trains all hyperparameter combinations for only one model, the sleep state classifier.

In [None]:
# define all the combinations of models and features
models_and_hyperparams = {
    RandomForestClassifier: {
        'params': {
            'n_estimators': [400, 600, 200],
            'max_depth': [None],
            'min_samples_leaf': [5, 1],
            'random_state': [42],
            'n_jobs': [10],
            'warm_start': [True]
        },
        'modeltype': SkLearnModel,
        'scaler': StandardScaler,
        'training_type': ModelTrainingType.BATCH,
        'batch_size': 1_500_000
    }
}

# define scaler
pretrained_scalers = dict() # used to store trained scalers for later use

# loop over all combinations and append it to the configurations list
# if there are no hyperparams, just instantiate the model without params
for model_type, hyperparams in models_and_hyperparams.items():
    if len(hyperparams['params']) > 0: # if there are hyperparams, build a dict and pass it to the model as parameters
        fx_param_names, fx_param_values = zip(*hyperparams['params'].items())
        for cartesian_product_values in product(*fx_param_values):
            hyperparam_dict = dict(zip(fx_param_names, cartesian_product_values))

            # create or reuse the scaler specified in the models_and_hyperparams dictionary
            if hyperparams['scaler'] in pretrained_scalers:
                print(f'Using pretrained scaler {hyperparams["scaler"].__name__}')
                scaler = pretrained_scalers[hyperparams['scaler']]
            else:
                scaler = hyperparams['scaler']()
                print(f'Start fitting scaler {hyperparams["scaler"].__name__}')
                for batch in tqdm(batched_dataloader(batch_size=hyperparams['batch_size']) if hyperparams['training_type'] == ModelTrainingType.BATCH else dataloader_full_dataset()):
                    X = batch.drop(columns_to_drop, axis=1)
                    X.dropna(inplace=True)
                    scaler.partial_fit(X)
                    del X
                    gc.collect()
                # save the scaler for later use
                pretrained_scalers[hyperparams['scaler']] = scaler

            # create the model from the modeltype and the hyperparam_dict
            m2 = hyperparams['modeltype'](model_type(**hyperparam_dict), \
                                        f'V2-{model_type.__name__}-BOTH-{"-".join([f"{na}__{str(va)}" for na, va in hyperparam_dict.items()])}'.lower(), \
                                        scaler, \
                                        hyperparam_dict)

            # train model
            rounds = 1
            if hyperparams['training_type'] == ModelTrainingType.BATCH:
                print(f'Model will be traind in batches of {hyperparams["batch_size"]} samples.')
                print(f'Dataset contains {train_dataset_length} samples. Batchsize is {hyperparams["batch_size"]}. That means {(rounds := train_dataset_length / hyperparams["batch_size"])} batches will be used to fit the model.')
                if model_type == RandomForestClassifier:
                    # print(f'At the end {hyperparam_dict["n_estimators"]} estimators will be fitted. That means, {hyperparam_dict["n_estimators"]//rounds} estimators per batch.')
                    m2._model.n_estimators = 0 #set to 0. model will increase the number of estimators in each round
                    # # tune hyperparameter 'n_estimators' based on the number of batches
                    print(f'Build estimator distribution for {rounds} rounds.')
                    est_anteil = [1 for _ in range(floor(rounds))]
                    est_anteil.append(rounds % floor(rounds))
                    est_anteil = softmax(est_anteil)
                    est_anteil *= hyperparam_dict['n_estimators']
                    est_anteil = est_anteil.round().astype(int)
                    print(f'Estimator distribution per round: {est_anteil}')

            print(f'Start training model {m2.identifier}')
            for i, batch in enumerate(tqdm(batched_dataloader(batch_size=hyperparams['batch_size']) if hyperparams['training_type'] == ModelTrainingType.BATCH else dataloader_full_dataset(), total=floor(rounds)+1)):
                batch.dropna(inplace=True)
                y = batch['awake']
                X = batch.drop(columns_to_drop, axis=1)
                args = {}
                if model_type == RandomForestClassifier and hyperparams['training_type'] == ModelTrainingType.BATCH:
                    args['add_estimators'] = est_anteil[i]
                m2.train(X, y, not_scaled=True, **args)
                del X, y
                gc.collect()

            print(f'Start evaluating model {m2.identifier}')
            score_value_average_precision_score_m2, score_value_recall_m2, score_value_precision_m2, f1_score_m2 = [], [], [], []
            for validation in tqdm(batched_dataloader(batch_size=hyperparams['batch_size'], validation=True) if hyperparams['training_type'] == ModelTrainingType.BATCH else dataloader_full_dataset(validation=True), total = ceil(validation_dataset_length / hyperparams["batch_size"])):
                validation.dropna(inplace=True)
                validation_y = validation['awake']
                validation.drop(columns_to_drop, axis=1, inplace=True)
                validation_y_hat = m2.predict(validation, not_scaled=True)
                # this does the same as scoreFX(y, y_hat). It is wrapped in the models evaluate function.
                # the model itself could also do the prediction if prepredicted is None
                # then, the first argument of the evaluate function would be the X_validation data
                score_value_average_precision_score_m2.append(m2.evaluate(None, validation_y, scoreFx=average_precision_score, not_scaled=True, prepredicted=validation_y_hat))
                score_value_recall_m2.append(m2.evaluate(None, validation_y, scoreFx=recall_score, not_scaled=True, prepredicted=validation_y_hat))
                score_value_precision_m2.append(m2.evaluate(None, validation_y, scoreFx=precision_score, not_scaled=True, prepredicted=validation_y_hat))
                f1_score_m2.append(m2.evaluate(None, validation_y, scoreFx=f1_score, not_scaled=True, prepredicted=validation_y_hat))

                del validation, validation_y

            print(sum(score_value_average_precision_score_m2) / len(score_value_average_precision_score_m2))
            print(sum(score_value_recall_m2) / len(score_value_recall_m2))
            print(sum(score_value_precision_m2) / len(score_value_precision_m2))
            print(sum(f1_score_m2) / len(f1_score_m2))
            #  save model
            m2.save()

            break

            #wandb.finish()
    else:
        print("params in the dictionary cannot be empty. Use the standard values in the dictionary for the model", model_type.__name__)

Start fitting scaler StandardScaler


6it [01:28, 14.82s/it]


Model will be traind in batches of 1500000 samples.
Dataset contains 8695770 samples. Batchsize is 1500000. That means 5.79718 batches will be used to fit the model.
Build estimator distribution for 5.79718 rounds.
Estimator distribution per round: [69 69 69 69 69 56]
Start training model v2-randomforestclassifier-both-n_estimators__400-max_depth__none-min_samples_leaf__5-random_state__42-n_jobs__10-warm_start__true


100%|██████████| 6/6 [53:54<00:00, 539.02s/it]


Start evaluating model v2-randomforestclassifier-both-n_estimators__400-max_depth__none-min_samples_leaf__5-random_state__42-n_jobs__10-warm_start__true


100%|██████████| 2/2 [00:35<00:00, 17.58s/it]


0.9674511932512078
0.9550855800187008
0.9770996518824876
0.9659262021309147
