In [None]:
!pip3 install -q numpy==1.22.4
!pip3 install -q pandas==1.5.3
!pip3 install -q scikit-learn
!pip3 install joblib

In [None]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV, StratifiedShuffleSplit, train_test_split
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.preprocessing import StandardScaler
import joblib 

import time
import os

---

Чтение данных

In [None]:
path1 = 'datasets/regression/train_log.csv'
path2 = 'datasets/regression/test_log.csv'

def read_file(path):
    df = pd.DataFrame()
    if os.path.exists(path):
        df = pd.read_csv(path, sep=',')
    elif os.path.exists(path[1:]):
        df = pd.read_csv(path[1:], sep=',')
    else:
        print('No such file or directory') 
        raise FileNotFoundError('No such file or directory')
    return df

df_train = read_file(path1)
df_test  = read_file(path2)

---

## Разбиение тренировочной выборки на тренировочную и владиационную

In [None]:
X_log = df_train.drop(columns=['Transported'], axis=1)
y_log = df_train.Transported

print(y_log.shape[0])

In [None]:
X_train_log, X_val_log, y_train_log, y_val_log = \
        train_test_split(X_log, y_log, test_size = 0.3, shuffle=True, random_state = 42)

print(y_train_log.shape, y_val_log.shape)

---

In [None]:
df_test.head()

Скалирование

In [None]:
scaler = StandardScaler()
scaler.fit(X_train_log)

X_train_log = scaler.transform(X_train_log)
X_val_log   = scaler.transform(X_val_log)
X_test_log  = scaler.transform(df_test)

Сразу сохраняем скалированную тестовую выборку для следующего шага выбора лучшей модели.

In [None]:
new_directory = 'datasets/for_comparison'

os.makedirs(new_directory, exist_ok=True)

In [None]:
np.save('datasets/for_comparison/X_test_log.npy', X_test_log)
np.save('datasets/for_comparison/X_val_log.npy', X_val_log)
y_val_log.to_csv('datasets/for_comparison/y_val_log.csv', index=False)

del df_test

---

# Вспомогательные функции

In [None]:
def grid_search_cv(model, cv, param_grid, X_train_set, y_train_set, scores, refit=True):
    grid_search = GridSearchCV(estimator=model, param_grid=param_grid, scoring=scores, cv=cv, refit=refit)

    start_time = time.time()

    grid_search.fit(X_train_set, y_train_set)

    total_time = time.time() - start_time
    print(f"Total time spent on grid search (wall-clock): {total_time:.2f} seconds or \
            {(total_time // 60):.0f} minutes {(total_time % 60):.2f} seconds")
    
    return grid_search

In [None]:
def custom_refit(cv_results):
    sorted_indices = np.lexsort((-cv_results["mean_test_roc_auc"], -cv_results["mean_test_f1"]))
    return sorted_indices[0]

In [None]:
def print_top_low(grid_search, show_columns, sort_by = ['rank_test_score', 'index'], n_rows = 10):
    df_results = pd.DataFrame(grid_search.cv_results_)
    df_results.index.name = 'index'
    df_results = df_results.sort_values(by=sort_by).reset_index(drop=True)

    print('Top ' + str(n_rows))
    display(df_results[:n_rows][show_columns])

    print('Low ' + str(n_rows))
    display(df_results[-n_rows:][show_columns])
    
    return df_results

In [None]:
def custom_threshold_scorer(estimator, X, y, threshold=0.5, scorer=f1_score):
    probabilities = estimator.predict_proba(X)

    predictions = (probabilities[:, 1] >= threshold).astype(int)
    score = scorer(y, predictions)

    return score

In [None]:
def findBestThreshold(model, features, target):
    best_result = {'f1': 0, 'roc_auc': 0}
    best_threshold = 0

    for threshold in np.arange(0, 1.05, 0.05):
        print(f'threshold: {threshold:.2f}')
        
        f1      = custom_threshold_scorer(model, features, target, threshold, f1_score)
        roc_auc = roc_auc_score(target, model.predict_proba(features)[:, 1])
        print(f"f1 score: {f1:.6f}")

        if f1 > best_result['f1']:
            best_result['f1'] = f1
            best_result['roc_auc'] = roc_auc
            best_threshold = threshold

    return best_threshold, best_result

In [None]:
from sklearn.exceptions import NotFittedError

def is_fitted(estimator, X):
    try:
        estimator.predict(X)
        return True
    except NotFittedError as e:
        print(repr(e))
        return False

# Обучение модели

In [None]:
param_grid = [
    {
        "n_jobs": [4],
        "penalty": ['l2'],
        "C": np.logspace(-5, 5, 20),
        "max_iter": [100, 1000, 10000]
    }
]

sss = StratifiedShuffleSplit(n_splits=5, test_size=0.2, random_state=42)

scores = ['f1', 'roc_auc']

columns = ['mean_fit_time', 'param_max_iter', 'param_penalty', 'param_C',\
                'mean_test_f1', 'mean_test_roc_auc', 'rank_test_f1', 'rank_test_roc_auc']

log_r = LogisticRegression(random_state=42, multi_class='ovr', solver='lbfgs')

In [None]:
grid_search = grid_search_cv(log_r, sss, param_grid, X_train_log, y_train_log, scores, custom_refit)

In [None]:
df_results = print_top_low(grid_search, columns, ['rank_test_f1', 'index'])

In [None]:
is_fitted(grid_search.best_estimator_, X_train_log)

In [None]:
%%parameters
best_threshold_log = findBestThreshold(grid_search.best_estimator_, X_train_log, y_train_log)

In [None]:
new_directory = 'models/'

os.makedirs(new_directory, exist_ok=True)

In [None]:
joblib_logr = 'models/logistic_regression.pkl'
joblib.dump(grid_search.best_estimator_, joblib_logr)