In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sklearn
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc, roc_auc_score, average_precision_score
import catboost
from catboost import CatBoostClassifier
from sklearn.preprocessing import StandardScaler
import optuna
from optuna.samplers import TPESampler
from catboost.utils import eval_metric
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier
import lightgbm as lgb
from sklearn.ensemble import VotingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix
from sklearn.model_selection import FixedThresholdClassifier


EDA & Preprocessing

In [None]:
def calculate_percent_empty(df):
    result = []
    for column in df.columns:
        percent_empty = df[column].isna().mean() * 100
        if percent_empty > 0:
            column_type = df[column].dtype
            result.append((column, f"{percent_empty:.3f}%", column_type))
    result_df = pd.DataFrame(result, columns=['Column', 'Percent Empty', 'Data Type'])
    return result_df

result = calculate_percent_empty(df)
result

In [None]:
numerical_features = df_te.select_dtypes(include=['int64', 'float64']).columns

corr_matrix = df_te[numerical_features].corr()

plt.figure(figsize=(14, 8))
sns.heatmap(corr_matrix, annot=False, cmap='coolwarm', square=True)
plt.title('Correlation Matrix')
plt.show()

In [None]:
def correlation_matrix(df, threshold): # > threshold
    correlation_matrix = df.corr()
    high_correlation = correlation_matrix[correlation_matrix.abs() > threshold]
    high_corr_pairs = correlation_matrix.stack().reset_index()
    high_corr_pairs.columns = ['Feature 1', 'Feature 2', 'Correlation']
    high_corr_pairs = high_corr_pairs[high_corr_pairs['Correlation'].abs() > threshold]
    high_corr_pairs = high_corr_pairs[high_corr_pairs['Feature 1'] != high_corr_pairs['Feature 2']]
    high_corr_pairs['Pair'] = high_corr_pairs.apply(lambda x: tuple(sorted([x['Feature 1'], x['Feature 2']])), axis=1)
    unique_pairs = high_corr_pairs.drop_duplicates(subset='Pair')
    unique_pairs = unique_pairs[['Feature 1', 'Feature 2', 'Correlation']].sort_values(by='Correlation', ascending=False)
    return unique_pairs

data_correlation_matrix = correlation_matrix(df_te, 0.6)
data_correlation_matrix

In [None]:
class Transfomer(object):
    def __init__(self, fill_value=True, one_hot_encoder_flg=False, drop_columns=[], scale_features=None, 
                 nan_new_value_column = 
                 ["////////////"],
                categorical_features = ["//////////////"]
                ):

        self.drop_columns = ["///////////"] + drop_columns

        self.nan_mean_column = ["/////////"]
        self.nan_mean_dic = {}

        self.nan_mode_column = ["//////////"]
        self.nan_mode_dic = {}

        self.nan_new_value_column = nan_new_value_column

        self.nan_new_value_dic = {column: "Unknown" for column in self.nan_new_value_column}

        self.fill_value = fill_value

        #Scaler
        self.scale_features = scale_features if scale_features is not None else []
        self.scaler = None
        #OneHotEncoder
        self.categorical_features = categorical_features

        self.categorical_features = list(set(self.categorical_features) - set(self.drop_columns))
        self.one_hot_encoder_flg = one_hot_encoder_flg
        self.one_hot_encoder = None

    def fit(self, X):
        if self.fill_value:
            for column in self.nan_mean_column:
                self.nan_mean_dic[column] = X[column].mean()
    
            for column in self.nan_mode_column:
                self.nan_mode_dic[column] = X[column].mode()[0]

        if self.scale_features:
            self.scaler = StandardScaler()
            self.scaler.fit(X[self.scale_features])


        self.one_hot_encoder = OneHotEncoder(handle_unknown='ignore')
        self.one_hot_encoder.fit(X[self.categorical_features].fillna("Unknown"))

    def transform(self, X):
        res = X.copy()
        res.fillna({
            **self.nan_new_value_dic
        }, inplace=True)
        
        if self.fill_value:
            res = X.copy() \
                   .drop(columns=self.drop_columns)
            res.fillna({
                **self.nan_mean_dic,
                **self.nan_mode_dic
            }, inplace=True)


        if self.one_hot_encoder_flg:
            res_one_hot = self.one_hot_encoder.transform(X[self.categorical_features])
            res_one_hot = pd.DataFrame(res_one_hot.toarray(), index=X.index, columns=self.one_hot_encoder.get_feature_names_out())
            res = res.drop(self.categorical_features, axis=1)
            res = pd.concat([res, res_one_hot], axis=1)

        if self.scale_features:
            res[self.scale_features] = self.scaler.transform(res[self.scale_features])

        return res

    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)

In [None]:
transformer_main = Transfomer(fill_value=False, one_hot_encoder_flg=True, scale_features=['///'])
transformer_main.fit(df)
df_encode = transformer_main.transform(df)

Base CatBoost Model

In [None]:
X = df_encode.drop(columns=["target"])
Y = df_encode["target"]


X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42, stratify=Y)
X_valid, X_test, Y_valid, Y_test = train_test_split(X_test, Y_test, test_size=0.33, random_state=42, stratify=Y_test)

transformer_train = Transfomer(one_hot_encoder_flg=False, nan_new_value_column=[], categorical_features= [])
transformer_train.fit(X_train)
X_train_transformed = transformer_train.transform(X_train)

transformer_test = Transfomer(one_hot_encoder_flg=False, nan_new_value_column=[], categorical_features= [])
transformer_test.fit(X_test)
X_test_transformed = transformer_test.transform(X_test)

transformer_valid = Transfomer(one_hot_encoder_flg=False, nan_new_value_column=[], categorical_features= [])
transformer_valid.fit(X_valid)
X_valid_transformed = transformer_valid.transform(X_valid)

print(X_train_transformed.shape)
print(Y_train.shape)
print(X_test_transformed.shape)
print(Y_test.shape)
print(X_valid_transformed.shape)
print(Y_valid.shape)

In [None]:
X_train_transformed.isna().sum()

In [None]:
cat_train_pool = catboost.Pool(X_train_transformed, Y_train)
cat_val_pool = catboost.Pool(X_valid_transformed, Y_valid)
cat_test_pool = catboost.Pool(X_test_transformed, Y_test)

Model Tuning

In [None]:
def objective(trial):
    params = {
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.1),
        'depth': trial.suggest_int('depth', 3, 10),
        'l2_leaf_reg': trial.suggest_float('l2_leaf_reg', 1, 10),
        'boosting_type': trial.suggest_categorical('boosting_type', ['Ordered', 'Plain']),
        'max_ctr_complexity': trial.suggest_int('max_ctr_complexity', 0, 8),
        'iterations': 100
    }

    model = catboost.CatBoostClassifier(**params, random_seed=42)
    model.fit(cat_train_pool, verbose=0, eval_set=cat_val_pool)
    y_pred = model.predict_proba(cat_val_pool)
    return eval_metric(cat_val_pool.get_label(), y_pred[:, 1], 'AUC')

sampler = TPESampler(seed=123)
study = optuna.create_study(direction='maximize', sampler=sampler)
study.optimize(objective, n_trials=20)

In [None]:
params = {
'learning_rate': 0.0861912916877385, 'depth': 10, 'l2_leaf_reg': 6.147444224443349, 'boosting_type': 'Ordered', 'max_ctr_complexity': 7,
    'iterations': 300
}

In [None]:
model = catboost.CatBoostClassifier(**params,
                                    random_state=42,)

model.fit(X_train_transformed, Y_train,
          #cat_features=transformer_train.categorical_features
         )

Metrics

In [None]:
Y_pred_valid = model.predict(X_valid_transformed)
confusion_matrix(Y_valid, Y_pred_valid)

Y_pred_test = model.predict(X_test_transformed)
confusion_matrix(Y_test, Y_pred_test)

print(classification_report(Y_valid, Y_pred_valid))

print(classification_report(Y_test, Y_pred_test))

Y_pred_prob_valid = model.predict_proba(X_valid_transformed)
ap_score = average_precision_score(Y_valid, Y_pred_prob_valid[:,1])
print(ap_score)

Y_pred_prob_test = model.predict_proba(X_test_transformed)
ap_score = average_precision_score(Y_test, Y_pred_prob_test[:,1])
print(ap_score)

In [None]:
Y_pred_prob_train = model.predict_proba(X_train_transformed)
Y_pred_prob = model.predict_proba(X_valid_transformed)
Y_pred_prob_test = model.predict_proba(X_test_transformed)

# ROC valid
fpr_valid, tpr_valid, thresholds_valid = roc_curve(Y_valid, Y_pred_prob[:,1])
roc_auc_valid = auc(fpr_valid, tpr_valid)

# ROC train
fpr_train, tpr_train, thresholds_train = roc_curve(Y_train, Y_pred_prob_train[:,1])
roc_auc_train = auc(fpr_train, tpr_train)

# ROC test
fpr_test, tpr_test, thresholds_test = roc_curve(Y_test, Y_pred_prob_test[:,1])
roc_auc_test = auc(fpr_test, tpr_test)

lw = 2
plt.figure()

plt.plot(fpr_valid, tpr_valid, color='darkorange', lw=lw, 
         label='ROC curve (valid) (area = %0.2f)' % roc_auc_valid)

plt.plot(fpr_train, tpr_train, color='blue', lw=lw, 
         label='ROC curve (train) (area = %0.2f)' % roc_auc_train)

plt.plot(fpr_test, tpr_test, color='green', lw=lw, 
         label='ROC curve (test) (area = %0.2f)' % roc_auc_test)

plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')

plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic')
plt.legend(loc="lower right")
plt.show()

Main part. Modelling

In [None]:
X = df_encode.drop(columns=["target"])
Y = df_encode["target"]


X_train, X_valid, Y_train, Y_valid = train_test_split(X, Y, test_size=0.2, random_state=42, stratify=Y)

transformer_train = Transfomer(one_hot_encoder_flg=False, nan_new_value_column=[], categorical_features= [])
transformer_train.fit(X_train)
X_train_transformed = transformer_train.transform(X_train)

transformer_valid = Transfomer(one_hot_encoder_flg=False, nan_new_value_column=[], categorical_features= [])
transformer_valid.fit(X_valid)
X_valid_transformed = transformer_valid.transform(X_valid)

print(X_train_transformed.shape)
print(Y_train.shape)
print(X_valid_transformed.shape)
print(Y_valid.shape)

Оптимизация и кросс-валидация

In [None]:
Y_train.value_counts() # scale

In [None]:
# CatBoost
search = optuna.integration.OptunaSearchCV(
    estimator=CatBoostClassifier(scale_pos_weight=scale, random_state=42, iterations=100, verbose=0),
    param_distributions={
        'learning_rate': optuna.distributions.FloatDistribution(0.01, 0.1),
        'depth': optuna.distributions.IntDistribution(3, 10),
        'l2_leaf_reg': optuna.distributions.FloatDistribution(0.1, 10.0),
        'max_ctr_complexity': optuna.distributions.IntDistribution(0, 8),
        'boosting_type': optuna.distributions.CategoricalDistribution(['Ordered', 'Plain'])
    },

    
    cv=5,
    n_trials=20,
    random_state=42,
    scoring='recall_macro'
)

search.fit(X_train_transformed, Y_train)

print('Best params:', search.best_params_)
print('Best score:', search.best_score_)

In [None]:
params = {
'learning_rate': 0.07482501462190899, 'depth': 8, 'l2_leaf_reg': 9.63710157921392, 'max_ctr_complexity': 8, 'boosting_type': 'Plain'
}


model_cat = catboost.CatBoostClassifier(**params,
                                    scale_pos_weight=scale, random_state=42, iterations=300, verbose=1)

model_cat.fit(X_train_transformed, Y_train,
         )

In [None]:
# XGBoost
search_xgb = optuna.integration.OptunaSearchCV(
    estimator=xgb.XGBClassifier(scale_pos_weight=scale, random_state=42, verbose=0, n_estimators=180),  #scale_pos_weight - sum(negative instances) / sum(positive instances)
    param_distributions={
        'max_depth': optuna.distributions.IntDistribution(3, 18),
        'gamma': optuna.distributions.IntDistribution(1, 9),
        'learning_rate': optuna.distributions.FloatDistribution(0.01, 0.2),
        'reg_alpha' : optuna.distributions.IntDistribution(40,180),
        'reg_lambda' : optuna.distributions.FloatDistribution(0,1),
        'colsample_bytree' : optuna.distributions.FloatDistribution(0.5,1),
        'min_child_weight' : optuna.distributions.IntDistribution(0, 10)
    },

    cv=5,
    n_trials=20,
    random_state=42,
    scoring='recall_macro'
)

search_xgb.fit(X_train_transformed, Y_train)

print('Best params', search_xgb.best_params_)
print('Best score:', search_xgb.best_score_)

In [None]:
params = {
'max_depth': 8, 'gamma': 5, 'learning_rate': 0.052343651876687255, 'reg_alpha': 141, 'reg_lambda': 0.5958114395169417, 'colsample_bytree': 0.9097366100669799, 'min_child_weight': 10
}


model_xg = xgb.XGBClassifier(**params,
                              scale_pos_weight=scale, random_state=42, n_estimators=200, verbose=0)

model_xg.fit(X_train_transformed, Y_train,
         )

In [None]:
# RandomForest
search_rf = optuna.integration.OptunaSearchCV(
    estimator=RandomForestClassifier(class_weight='balanced', random_state=42, verbose=0, n_estimators=100),  
    param_distributions={
        'min_samples_leaf': optuna.distributions.IntDistribution(1, 10),
        'max_depth': optuna.distributions.IntDistribution(10, 30),
        'min_samples_split' : optuna.distributions.IntDistribution(2,10)
    },

    cv=5,
    n_trials=20,
    random_state=42,
    scoring='recall_macro'
)

# Запустите поиск гиперпараметров
search_rf.fit(X_train_transformed, Y_train)

# Выведите лучшие гиперпараметры и точность модели
print('Best params:', search_rf.best_params_)
print('Best score:', search_rf.best_score_)

In [None]:
params = {
    'min_samples_leaf': 1, 'max_depth': 10, 'min_samples_split': 8
         }
model_rf = RandomForestClassifier(**params,
                              class_weight='balanced', random_state=42, n_estimators=100, verbose=0)

model_rf.fit(X_train_transformed, Y_train,
         )

In [None]:
# LGBM
search_lgb = optuna.integration.OptunaSearchCV(
    estimator=lgb.LGBMClassifier(scale_pos_weight=scale, random_state=42, verbose=0, n_estimators=180),
    param_distributions={
        'lambda_l1': optuna.distributions.FloatDistribution(1e-8, 10.0),
        'lambda_l2': optuna.distributions.FloatDistribution(1e-8, 10.0),
        'num_leaves': optuna.distributions.IntDistribution(2, 256),
        'feature_fraction':optuna.distributions.FloatDistribution(0.4, 1.0),
        'bagging_fraction': optuna.distributions.FloatDistribution(0.4, 1.0),
        'bagging_freq':optuna.distributions.IntDistribution(1, 7),
        'min_child_samples': optuna.distributions.IntDistribution(5, 100)
    },

    cv=5,
    n_trials=20,
    random_state=42,
    scoring='recall_macro'
)

# Запустите поиск гиперпараметров
search_lgb.fit(X_train_transformed, Y_train)

# Выведите лучшие гиперпараметры и точность модели
print('Best params:', search_lgb.best_params_)
print('Best score:', search_lgb.best_score_)

In [None]:
params = {
'lambda_l1': 1.0505721904583707, 'lambda_l2': 4.857206182144106, 'num_leaves': 51, 'feature_fraction': 0.9822057161879939, 'bagging_fraction': 0.8242312633076625, 'bagging_freq': 2, 'min_child_samples': 73
}

model_lgb = lgb.LGBMClassifier(**params,
                              scale_pos_weight=scale, random_state=42, n_estimators=180, verbose=0)

model_lgb.fit(X_train_transformed, Y_train,
         )

Voting Classifier

In [None]:
clf1 = model_cat
clf2 = model_xg
clf3 = model_lgb
clf4 = model_rf

model_vt = VotingClassifier(estimators=[('cat', clf1), ('xgb', clf2), ('lgb', clf3), ('rf', clf4)], voting='soft')
model_vt.fit(X_train_transformed, Y_train,
         )

Metrics Script

In [None]:
model = model_lgb

In [None]:
Y_pred_valid = model.predict(X_valid_transformed)
confusion_matrix(Y_valid, Y_pred_valid)

Y_pred_train = model.predict(X_train_transformed)
confusion_matrix(Y_train, Y_pred_train)

print(classification_report(Y_valid, Y_pred_valid))

print(classification_report(Y_train, Y_pred_train))

Y_pred_prob_valid = model.predict_proba(X_valid_transformed)
ap_score = average_precision_score(Y_valid, Y_pred_prob_valid[:,1])
print(ap_score)

Y_pred_prob_train = model.predict_proba(X_train_transformed)
ap_score = average_precision_score(Y_train, Y_pred_prob_train[:,1])
print(ap_score)

Precision & Recall

In [None]:
thresh_values = [0.1, 0.2, 0.3, 0.4, 0.5]
precision_scores = []
recall_scores = []
for threshold in thresh_values:
    model_fixed_threshold = FixedThresholdClassifier(estimator=model, threshold=threshold)
    y_pred = model_fixed_threshold.predict(X_valid_transformed)
    precision = precision_score(Y_valid, y_pred)
    recall = recall_score(Y_valid, y_pred)
    precision_scores.append(precision)
    recall_scores.append(recall)
    print(f'Threshold: {threshold}, Precision: {precision}, Recall: {recall}')
    print(confusion_matrix(Y_valid, y_pred))
plt.plot(thresh_values, precision_scores, label='Precision')
plt.plot(thresh_values, recall_scores, label='Recall')
plt.xlabel('Threshold')
plt.ylabel('Score')
plt.title('Precision and recall at different thresholds')
plt.legend()
plt.show()

ROC_AUC

In [None]:
Y_pred_prob_train = model.predict_proba(X_train_transformed)
Y_pred_prob = model.predict_proba(X_valid_transformed)

fpr_valid, tpr_valid, thresholds_valid = roc_curve(Y_valid, Y_pred_prob[:,1])
roc_auc_valid = auc(fpr_valid, tpr_valid)


fpr_train, tpr_train, thresholds_train = roc_curve(Y_train, Y_pred_prob_train[:,1])
roc_auc_train = auc(fpr_train, tpr_train)


lw = 2
plt.figure()


plt.plot(fpr_valid, tpr_valid, color='darkorange', lw=lw, 
         label='ROC curve (valid) (area = %0.2f)' % roc_auc_valid)


plt.plot(fpr_train, tpr_train, color='blue', lw=lw, 
         label='ROC curve (train) (area = %0.2f)' % roc_auc_train)


plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')

plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic')
plt.legend(loc="lower right")
plt.show()

Predicted Probabilities

In [None]:
y_pred_proba = np.array(Y_pred_prob[:,1])  

plt.hist(y_pred_proba, bins=200, alpha=0.7, color='blue', edgecolor='black')

plt.title('Predicted Probabilities')
plt.xlabel('Probability')
plt.ylabel('Count')

plt.show()

Feature importances

In [None]:
feature_importance = model.feature_importances_
sorted_idx = np.argsort(feature_importance)
fig = plt.figure(figsize=(12, 11))
plt.barh(range(len(sorted_idx)), feature_importance[sorted_idx], align='center')
plt.yticks(range(len(sorted_idx)), np.array(X_train_transformed.columns)[sorted_idx])
plt.title('Feature Importance')