## main

In [10]:
import logging
import configparser
import argparse
import warnings

from src.utils import utils
from src.modelling import training as train
from src.data_processing import augmentation as aug

warnings.filterwarnings('ignore')


def main(model_name:str, tune_trials=10, balance_data=False):
    logger = logging.getLogger(__name__)
    proj_root = utils.get_proj_root()

    config = configparser.ConfigParser(interpolation=None)
    config.read(proj_root.joinpath('config/data_config.ini'))

    final_year = int(config['year_limits']['end_year'])

    training_data_rel_path = config['data_paths']['preprocessed_data_path']
    training_data_path =  proj_root.joinpath(training_data_rel_path)
    feature_set_path = proj_root.joinpath(config['modelling_paths']['optimal_features'])
    model_output_dir = proj_root.joinpath(config['modelling_paths']['model_output'])

    label_col_name = 'dps_change_next_year'
    optimal_features = train.get_features(feature_set_path) 

    model_params = config._sections[model_name]
    
    model_class = train.get_model_class(model_name=model_name)
    model = model_class(**model_params)

    # get data
    training_data = train.get_training_data(file_path=training_data_path)

    # split dataset
    training_data_subset, testing_data_subset = train.train_test_split(df=training_data, final_year=final_year)

    if balance_data:
    # balance data
        training_data_subset = aug.balance_data(training_data_subset, label_col_name=label_col_name)

    training_data_subset = training_data_subset[optimal_features+[label_col_name]]
    testing_data_subset = testing_data_subset[optimal_features+[label_col_name]]

    model_output_path = model_output_dir.joinpath(model_name+'.pkl')
    trainer = train.ModelTrainer(model_class=model,
                                    training_data=training_data_subset,
                                    testing_data=testing_data_subset,
                                    label_col_name=label_col_name,
                                    model_output_path=model_output_path)

    print('tuning model')
    trainer.tune_model(n_trials=tune_trials)
    logger.info('tuning completed')
    model = trainer.train_model(save_model=True)
    logger.info('training completed')

    score = trainer.evaluate_model(show_report=True)
    logger.info(f'test score:{score}')
    print(training_data_subset.columns)
    print(testing_data_subset.columns)

if __name__ == '__main__':
    log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    logging.basicConfig(level=logging.INFO, format=log_fmt)

    parser = argparse.ArgumentParser(description='training model')
    parser.add_argument("--model_name", type=str)
    parser.add_argument("--tune_trials", type=int, default=1)
    parser.add_argument("--balance_data", action=argparse.BooleanOptionalAction)
    # args = parser.parse_args()

    args = parser.parse_args(['--model_name', 'random_forest', '--tune_trials', '1', '--balance_data'])

    main(model_name=args.model_name,
         tune_trials=args.tune_trials, balance_data=args.balance_data)


NameError: name 'sklearn' is not defined

## transforms

In [None]:
# transformers.py

""" List of functions for data processing. """

from typing import Literal, Sequence
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OrdinalEncoder
from sklearn.utils import resample
from sklearn.compose import ColumnTransformer
from imblearn.over_sampling import SMOTE
import pandas as pd
import numpy as np
import logging

logger = logging.getLogger(__name__)



class CollinearColsRemover(BaseEstimator, TransformerMixin):

    def __init__(self, thresh, label_col) -> None:
        self.thresh = thresh
        self.label_col = label_col
    
    
    def fit(self, X:pd.DataFrame, y=None):
        X = X.copy()
        X.drop(labels=self.label_col, axis=1, inplace=True)
        self.cols_to_drop = self._get_collinear_cols(df=X, thresh=self.thresh)
        return self
    
    def transform(self, X:pd.DataFrame):
        n_cols = X.shape[1]
        X = X.drop(self.cols_to_drop, axis=1)
        new_n_cols = X.shape[1]
        n_cols_dropped = n_cols - new_n_cols
        # print(type(X))
        logging.getLogger(self.__class__.__name__).info(f'dropped {n_cols_dropped} cols')
        return X

    @staticmethod
    def _get_collinear_cols(df:pd.DataFrame, thresh:np.float_):

        df = df.select_dtypes(include=np.float_)

        corr_mat = df.corr().abs()
        corr_mat_u = corr_mat.where(np.triu(np.ones(corr_mat.shape), k=1)
                                    .astype(np.bool_))

        cols_to_drop = [col for col in corr_mat_u.columns \
                    if any(corr_mat_u[col] > thresh)]
        
        return cols_to_drop
    

class ColumnsOrdinalEncoder(OrdinalEncoder):


    def __init__(self, col_names) -> None:
        self.col_names = col_names
        
        super().__init__(dtype=int)

    def fit(self, X, y=None):
        data_subset = X[self.col_names]

        return super().fit(X=data_subset)
    
    def transform(self, X):
        data_subset = X[self.col_names]
        transformed_data = super().transform(data_subset)

        X[self.col_names] = transformed_data

        return X
    
class  OptimalColumnSelector(BaseEstimator, TransformerMixin):

    def __init__(self, n_min_cols=2, optimal_cols_path=None) -> None:
        super().__init__()
        self.n_min_cols = n_min_cols
        self.optimal_cols_path = optimal_cols_path
        self.cols_to_drop = None

    def fit(self, X:pd.DataFrame, y=None):
        if self.optimal_cols_path is not None:
            try:
                optimal_col_names = utils.load_value(self.optimal_cols_path)
            except(FileNotFoundError):
                raise FileNotFoundError('File for optimal columns not available \
                                        optimal columns should be determined first.')
        return self
    
    def transform(self, X:pd.DataFrame):

        X = X.drop(self.cols_to_drop, axis=1)

        logging.getLogger(self.__class__.__name__).info(f'selected columns: {self.cols_to_drop}')

        return X


## models.py

In [None]:
# models.py
"""Scripts for constituting models"""

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
import numpy as np
import optuna
from abc import ABC, abstractmethod
import configparser
import logging

from src.utils import utils



class WrappedModel(ABC):
    @abstractmethod
    def train(self, X_train, y_train, transform_pipeline:Pipeline=None):
        """
        Train the model on the given training data.

        Parameters:
        - X_train: Input features for training.
        - y_train: Target labels for training.
        """
        pass

    @abstractmethod
    def predict(self, X):
        """
        Make predictions using the trained model.

        Parameters:
        - X: Input features for prediction.

        Returns:
        - Predicted labels.
        """
        pass

    @abstractmethod
    def objective_function(self, trial, X, y, transform_pipeline:Pipeline=None):
        """
        Objective function for optimization tasks.

        Parameters:
        - X: Input features for objective evaluation.
        - y: Target labels for objective evaluation.

        Returns:
        - Objective value (e.g., accuracy, loss).
        """
        pass

    @abstractmethod
    def init_model(self, params):

        pass


class LogisticWrapper(WrappedModel):
    
    def __init__(self, solver, n_jobs) -> None:
        self.solver = solver
        self.n_jobs=int(n_jobs)
        self.model = LogisticRegression
        self.is_tuned=False
        self.tunable_params =('C', 'penalty')
        self.constant_params = ('solver', 'n_jobs')
        self.tuned_params=None


    def objective_function(self, trial, X, y, transform_pipeline:Pipeline):
        C = trial.suggest_float('C', 0.1, 10, log=True)
        penalty = trial.suggest_categorical('penalty', ['l1', 'l2'])

        model = self.model(
            C=C,
            penalty=penalty,
            solver=self.solver,
            n_jobs=self.n_jobs
        )

        if transform_pipeline is not None:
            model = transform_pipeline.steps.append(['logistic-regressor-model', model])
        # Using cross_val_score to get the average precision score for each fold
        scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')
        roc_auc = np.mean(scores)
        # Printing intermediate results
        print(f"Trial {trial.number}, C: {C}, penalty: {penalty}, ROC-AUC: {roc_auc}")
        return roc_auc
    
    def init_model(self):

        if self.tuned_params is not None:
            print('using tuned params')
            model = self.model(**self.tuned_params, 
                               solver=self.solver, n_jobs=self.n_jobs)
        else:
            print("not using tuned")
            model = self.model(solver=self.solver, n_jobs=self.n_jobs)

        self.model = model

        
    def train(self, X, y, transform_pipeline:Pipeline=None):

        if not self.is_tuned:
            self.init_model()

        logging.getLogger(self.__class__.__name__).info(f'training with {type(self.model)}')

        if transform_pipeline is not None:
            pipeline_w_model = transform_pipeline.steps.append(['logistic model', self.model])
            model = pipeline_w_model.fit(X, y)
        else:
            model = self.model.fit(X, y)
        
        return model

    def predict(self, X, return_prob=False):
        if return_prob:
            y_pred = self.model.predict_proba(X)[:, 1]
        else:
            y_pred = self.model.predict(X)
        return y_pred

    
class RandomForestWrapper(WrappedModel):
    
    def __init__(self, n_jobs) -> None:
        self.n_jobs=int(n_jobs)
        self.model = RandomForestClassifier
        self.is_tuned=False
        self.tuned_params=None


    def objective_function(self, trial, X, y):
        n_estimators = trial.suggest_int('n_estimators', 2, 150)
        max_depth = trial.suggest_int('max_depth', 1, 50)
        min_samples_split = trial.suggest_int('min_samples_split', 2, 15)
        min_samples_leaf = trial.suggest_int('min_samples_leaf', 1, 15)

        model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            n_jobs=-1
        )

        # Using cross_val_score to get the average ROC-AUC score for each fold
        scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')
        roc_auc = np.mean(scores)
        # Printing intermediate results
        print(f"Trial {trial.number}, n_estimators: {n_estimators}, max_depth: {max_depth}, "
            f"min_samples_split: {min_samples_split}, min_samples_leaf: {min_samples_leaf}, ROC-AUC: {roc_auc}")
        return roc_auc

    def init_model(self):

        if self.tuned_params is not None:
            model = self.model(**self.tuned_params, 
                                n_jobs=self.n_jobs)
        else:
            model = self.model(n_jobs=self.n_jobs)

        self.model = model
        
    def train(self, X, y):

        if not self.is_tuned:
            self.init_model()

        print(type(self.model))
        self.model.fit(X, y)
        return self.model

    def predict(self, X, return_prob=False):
        if return_prob:
            y_pred = self.model.predict_proba(X)[:, 1]
        else:
            y_pred = self.model.predict(X)
        return y_pred
    

## traini.py

In [None]:
# training.py

"""utilities for training model with training data"""

import pandas as pd
import configparser
import logging
import pathlib
import optuna
from sklearn.metrics import roc_auc_score, classification_report
import sklearn
from src.modelling import models


from src.utils import utils

logger = logging.getLogger(__name__)

class ModelTrainer():

    def __init__(self, model_class:models.WrappedModel, transform_pipeline,
                 training_data, testing_data, label_col_name, model_output_path, 
                 balance_data:bool=True, n_trials:int=100) -> None:

        # self.direction=None
        self.best_params = None
        self.model_output_path = model_output_path
        self.model_class = model_class
        self.n_trials = n_trials
        self.tune_best_score = None
        self.training_data = training_data
        self.testing_data = testing_data
        self.label_col_name = label_col_name
        self.transform_pipeline = transform_pipeline
        self.trained_model=None

        if balance_data:
            self.training_data = aug.balance_data(df=self.training_data, label_col_name=self.label_col_name)

    def get_optimal_features():
        pass


    def tune_model(self, n_trials, maximize=True):
        
        if maximize:
            direction='maximize'
        else:
            direction = 'minimize'
        self.direction=direction

        X_train, y_train = split_Xy(self.training_data, label_col_name=self.label_col_name)

        

        study = optuna.create_study(direction=direction)
        study.optimize(lambda trial: self.model_class.objective_function(
            trial=trial, X=X_train, y=y_train),
            n_trials=n_trials, transform_pipeline=self.transform_pipeline
        )

        self.best_params = study.best_params
        self.tune_best_score = study.best_value
        self.model_class.tuned_params = study.best_params
        # model = self.model_class.init_model(params=self.best_params)
        self.model_class.init_model()
        # self.model_class.model = model
        self.model_class.is_tuned=True
        print(f'best score is: {self.best_score}')

    def train_model(self,  save_model=True) -> Pipeline:

        X_train, y_train = split_Xy(self.training_data, label_col_name=self.label_col_name)

        model_is_tuned = self.model_class.is_tuned
        if not model_is_tuned:
            logging.getLogger(self.__class__.__name__).info('model is not tuned')

        trained_model = self.model_class.train(X=X_train, y=y_train, transform_pipeline=self.transform_pipeline)

        if save_model:
            utils.save_value(trained_model, fname=self.model_output_path)

        self.trained_model = trained_model

        return trained_model
    
    def evaluate_model(self, show_report=True):
        
        X_test, y_test = split_Xy(self.testing_data, label_col_name=self.label_col_name)

        y_pred_prob = self.trained_model.predict_proba(X_test)
        y_pred = self.trained_model.predict(X_test)
        score = roc_auc_score(y_true=y_test, y_score=y_pred_prob)

        if show_report:
            print(classification_report(y_true=y_test, y_pred=y_pred))
        return score    
    
    
    

def get_features(path:pathlib.Path):

    features_list = utils.load_value(path)

    return features_list


def get_training_data(file_path:pathlib.Path=None):

    df = pd.read_csv(file_path)

    return df


def train_test_split(df:pd.DataFrame, final_year:int, save_data=False):

    # final_year = int(config['year_limits']['end_year'])

    training_data = df.loc[df['year'] != final_year]
    testing_data = df.loc[df['year'] == final_year]

    logger.info(f"data split into: training ({training_data.shape}) and test ({testing_data.shape}) sets ")


    return training_data, testing_data




def split_Xy(df:pd.DataFrame, label_col_name:str):

    X = df.drop(label_col_name, axis=1)
    y = df[label_col_name]

    return X, y

def get_model_class(model_name:str):
    if model_name == 'logistic_regression':
        model = models.LogisticWrapper
    elif model_name == 'random_forest':
        model = models.RandomForestWrapper

    return model



def save_data(data:pd.DataFrame, path:pathlib.Path):

    data.to_csv(path_or_buf=path, index=False)

In [None]:
# training main

import logging
import configparser
import argparse
import warnings

from src.utils import utils
from src.modelling import training as train
from src.data_processing import augmentation as aug

warnings.filterwarnings('ignore')


def main(model_name:str, tune_trials=10, balance_data=False):
    logger = logging.getLogger(__name__)
    proj_root = utils.get_proj_root()

    config = configparser.ConfigParser(interpolation=None)
    config.read(proj_root.joinpath('config/data_config.ini'))

    final_year = int(config['year_limits']['end_year'])

    training_data_rel_path = config['data_paths']['preprocessed_data_path']
    training_data_path =  proj_root.joinpath(training_data_rel_path)
    feature_set_path = proj_root.joinpath(config['modelling_paths']['optimal_features'])
    model_output_dir = proj_root.joinpath(config['modelling_paths']['model_output'])

    label_col_name = 'dps_change_next_year'
    optimal_features = train.get_features(feature_set_path) 

    model_params = config._sections[model_name]
    
    model_class = train.get_model_class(model_name=model_name)
    model = model_class(**model_params)

    # get data
    training_data = train.get_training_data(file_path=training_data_path)

    # split dataset
    training_data_subset, testing_data_subset = train.train_test_split(df=training_data, final_year=final_year)

    if balance_data:
    # balance data
        training_data_subset = aug.balance_data(training_data_subset, label_col_name=label_col_name)

    training_data_subset = training_data_subset[optimal_features+[label_col_name]]
    testing_data_subset = testing_data_subset[optimal_features+[label_col_name]]

    model_output_path = model_output_dir.joinpath(model_name+'.pkl')
    trainer = train.ModelTrainer(model_class=model,
                                    training_data=training_data_subset,
                                    testing_data=testing_data_subset,
                                    label_col_name=label_col_name,
                                    model_output_path=model_output_path)

    logger.info('==========tuning model============')
    trainer.tune_model(n_trials=tune_trials)
    logger.info('========training model============')
    model = trainer.train_model(save_model=True)
    logger.info('training completed')

    score = trainer.evaluate_model(show_report=True)
    logger.info(f'test score:{score}')
    print(training_data_subset.columns)
    print(testing_data_subset.columns)

if __name__ == '__main__':
    log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    logging.basicConfig(level=logging.INFO, format=log_fmt)

    parser = argparse.ArgumentParser(description='training model')
    parser.add_argument("--model_name", type=str)
    parser.add_argument("--tune_trials", type=int, default=1)
    parser.add_argument("--balance_data", action=argparse.BooleanOptionalAction)
    # args = parser.parse_args()

    args = parser.parse_args(['--model_name', 'random_forest', '--tune_trials', '1', '--balance_data'])

    main(model_name=args.model_name,
         tune_trials=args.tune_trials, balance_data=args.balance_data)


In [30]:

from src.modelling import transforms

proj_root = utils.get_proj_root()

config = configparser.ConfigParser(interpolation=None)
config.read(proj_root.joinpath('config/data_config.ini'))

feature_set_path = proj_root.joinpath(config['modelling_paths']['optimal_features'])
label_col_name = 'dps_change_next_year'
categorical_features = ['industry', 'symbol']
collinear_thresh = 0.98
pipeline = Pipeline(steps=[
    ('remove collinear columns', transforms.CollinearColsRemover(thresh=collinear_thresh, 
                                                                    label_col=label_col_name)),
    ('cat_to_ordinal_cols', transforms.ColumnsOrdinalEncoder(col_names=categorical_features)),
    ('select optimal cols', transforms.OptimalColumnSelector(optimal_cols_path=feature_set_path))
])

In [27]:
pipeline.steps[-1]

('select optimal cols',
 OptimalColumnSelector(optimal_cols_path=PosixPath('/home/aroge/projects/dividend-cut-predictor/models/artifacts/optimal_features.pkl')))

In [34]:
import sklearn
m = sklearn.clone(pipeline)
m.steps.append(sklearn.linear_model.LogisticRegression())
print(type(m))
print(m.steps)
print(m.steps[-1])

<class 'sklearn.pipeline.Pipeline'>
[('remove collinear columns', CollinearColsRemover(label_col='dps_change_next_year', thresh=0.98)), ('cat_to_ordinal_cols', ColumnsOrdinalEncoder(col_names=['industry', 'symbol'])), ('select optimal cols', OptimalColumnSelector(optimal_cols_path=PosixPath('/home/aroge/projects/dividend-cut-predictor/models/artifacts/optimal_features.pkl'))), LogisticRegression()]
LogisticRegression()


In [16]:
model_pipeline = pipeline.copy

In [14]:
pipeline