In [3]:
import os
import time
import json
import warnings
from dataclasses import dataclass

import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from dotenv import load_dotenv
import mlflow

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import PowerTransformer, OrdinalEncoder, OneHotEncoder, LabelEncoder, label_binarize
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    balanced_accuracy_score, confusion_matrix, ConfusionMatrixDisplay,
    roc_auc_score, roc_curve, auc, average_precision_score, precision_recall_curve
)
from sklearn.base import clone
import matplotlib.ticker as ticker

from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier

import optuna

In [4]:
RNG = 42
N_SPLITS = 5
N_TRIALS_XGB = 10
N_TRIALS_LGBM = 12
N_TRIALS_CAT = 10
EARLY_STOP = 50
EXPERIMENT_NAME = "raw_data_analysis"
CACHE_DIR = "cache_ml"

warnings.filterwarnings("ignore")

In [5]:
load_dotenv()
TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI", "http://84.201.144.227:8000")
mlflow.set_tracking_uri(TRACKING_URI)
mlflow.set_experiment(EXPERIMENT_NAME)

<Experiment: artifact_location='s3://tigran1/artifacts/1', creation_time=1750530849025, experiment_id='1', last_update_time=1750530849025, lifecycle_stage='active', name='raw_data_analysis', tags={}>

In [7]:
df = pd.read_csv("data/student.csv").drop(columns=["number", "Id"], errors="ignore")

# Attendance: приводим к числу; "3" -> 3; всё остальное аккуратно в NaN, потом имьютация в числовом пайпе
df["Attendance"] = (
    df["Attendance"]
    .replace({"Always": 3, "Sometimes": 2, "Never": 1})
    .pipe(pd.to_numeric, errors="coerce")
)

# Scholarship: "0%" -> 0 и т.д.
if "Scholarship" in df.columns:
    df["Scholarship"] = df["Scholarship"].fillna("0%").astype(str).str.replace("%", "", regex=False)
    df["Scholarship"] = pd.to_numeric(df["Scholarship"], errors="coerce").fillna(0).astype(int)

# Grade -> целевую в 0..8
grade_mapping = {"Fail": 0, "FD": 1, "DD": 2, "DC": 3, "CC": 4, "CB": 5, "BB": 6, "BA": 7, "AA": 8}
df["Grade"] = df["Grade"].map(grade_mapping)


In [8]:
X = df.drop(columns=["Grade"])
y = df["Grade"]

In [9]:
le = LabelEncoder()
y_encoded = pd.Series(le.fit_transform(y), index=y.index)

X_train, X_test, y_train, y_test = train_test_split(
    X, y_encoded, test_size=0.2, random_state=RNG, stratify=y_encoded
)

In [10]:
X_test.to_csv("X_test.csv", index=False)
y_test.to_csv("y_test.csv", index=False)


In [11]:
cat_cols_all = X.select_dtypes(include="object").columns.tolist()

In [12]:
explicit_ohe = [c for c in ["Sex", "High_School_Type", "Transportation"] if c in X.columns]
ord_list = [c for c in cat_cols_all if c not in explicit_ohe]
num_list = X.select_dtypes(exclude="object").columns.tolist()


In [14]:
ohe_pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="constant", fill_value="unknown")),
    # Для совместимости используем sparse=False (в новых версиях упадёт варнинг – он заглушён)
    ("ohe", OneHotEncoder(handle_unknown="ignore"))
])

ord_pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="constant", fill_value="unknown")),
    ("ord", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1))
])

num_pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("power", PowerTransformer())
])

In [15]:
transform = ColumnTransformer([
    ("ord_pipe", ord_pipe, ord_list),
    ("num_pipe", num_pipe, num_list),
    ("ohe_pipe", ohe_pipe, explicit_ohe),
])

In [16]:
memory = joblib.Memory(location=CACHE_DIR, verbose=0)

In [17]:
def transform_fit_transform(X_tr, X_val, transformer):
    """Фитим трансформер на train, применяем на train/val (для ранней остановки в тюнинге)."""
    Xt_tr = transformer.fit_transform(X_tr)
    Xt_val = transformer.transform(X_val)
    return Xt_tr, Xt_val, transformer

def macro_f1(y_true, y_pred):
    return f1_score(y_true, y_pred, average="macro")

@dataclass
class TuneResult:
    best_params: dict
    best_score: float

In [18]:
def tune_xgb(X_tr, y_tr, transformer, n_classes) -> TuneResult:
    skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=RNG)

    def objective(trial):
        params = {
            "n_estimators": trial.suggest_int("n_estimators", 400, 1200),
            "max_depth": trial.suggest_int("max_depth", 4, 10),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
            "subsample": trial.suggest_float("subsample", 0.6, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
            "min_child_weight": trial.suggest_float("min_child_weight", 1.0, 10.0),
            "gamma": trial.suggest_float("gamma", 0.0, 2.0),
            "reg_alpha": trial.suggest_float("reg_alpha", 0.0, 2.0),
            "reg_lambda": trial.suggest_float("reg_lambda", 0.0, 2.0),
        }
        f1_scores = []
        for tr_idx, val_idx in skf.split(X_tr, y_tr):
            X_tr_f, X_val_f = X_tr.iloc[tr_idx], X_tr.iloc[val_idx]
            y_tr_f, y_val_f = y_tr.iloc[tr_idx], y_tr.iloc[val_idx]

            # Трансформируем вручную (чтобы пробросить eval_set уже в преобразованном виде)
            trf = clone(transformer)
            Xt_tr_f, Xt_val_f, _ = transform_fit_transform(X_tr_f, X_val_f, trf)

            clf = XGBClassifier(
                objective="multi:softprob",
                num_class=n_classes,
                random_state=RNG,
                n_jobs=-1,
                tree_method="hist",
                eval_metric="mlogloss",
                **params
            )
            clf.fit(
                Xt_tr_f, y_tr_f,
                eval_set=[(Xt_val_f, y_val_f)],
                early_stopping_rounds=EARLY_STOP,
                verbose=False
            )
            y_pred_f = clf.predict(Xt_val_f)
            f1_scores.append(macro_f1(y_val_f, y_pred_f))
        return float(np.mean(f1_scores))

    study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler(seed=RNG),
                                pruner=optuna.pruners.MedianPruner())
    study.optimize(objective, n_trials=N_TRIALS_XGB, show_progress_bar=False)
    return TuneResult(study.best_params, study.best_value)

In [19]:
def tune_lgbm(X_tr, y_tr, transformer) -> TuneResult:
    skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=RNG)

    def objective(trial):
        params = {
            "n_estimators": trial.suggest_int("n_estimators", 400, 1800),
            "max_depth": trial.suggest_int("max_depth", -1, 12),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
            "num_leaves": trial.suggest_int("num_leaves", 31, 256),
            "subsample": trial.suggest_float("subsample", 0.6, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
            "reg_alpha": trial.suggest_float("reg_alpha", 0.0, 2.0),
            "reg_lambda": trial.suggest_float("reg_lambda", 0.0, 2.0),
            "min_child_samples": trial.suggest_int("min_child_samples", 10, 200),
            "min_split_gain": trial.suggest_float("min_split_gain", 0.0, 1.0),
        }
        f1_scores = []
        for tr_idx, val_idx in skf.split(X_tr, y_tr):
            X_tr_f, X_val_f = X_tr.iloc[tr_idx], X_tr.iloc[val_idx]
            y_tr_f, y_val_f = y_tr.iloc[tr_idx], y_tr.iloc[val_idx]

            trf = clone(transformer)
            Xt_tr_f, Xt_val_f, _ = transform_fit_transform(X_tr_f, X_val_f, trf)

            clf = LGBMClassifier(
                objective="multiclass",
                random_state=RNG,
                n_jobs=-1,
                **params
            )
            clf.fit(
                Xt_tr_f, y_tr_f,
                eval_set=[(Xt_val_f, y_val_f)],
                eval_metric="multi_logloss",
                early_stopping_rounds=EARLY_STOP,
                verbose=False
            )
            y_pred_f = clf.predict(Xt_val_f)
            f1_scores.append(macro_f1(y_val_f, y_pred_f))
        return float(np.mean(f1_scores))

    study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler(seed=RNG),
                                pruner=optuna.pruners.MedianPruner())
    study.optimize(objective, n_trials=N_TRIALS_LGBM, show_progress_bar=False)
    return TuneResult(study.best_params, study.best_value)


In [20]:
def tune_cat(X_tr, y_tr, transformer) -> TuneResult:
    skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=RNG)

    def objective(trial):
        params = {
            "iterations": trial.suggest_int("iterations", 400, 1500),
            "depth": trial.suggest_int("depth", 4, 10),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
            "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1.0, 10.0),
            "random_strength": trial.suggest_float("random_strength", 0.0, 2.0),
            "bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 2.0),
        }
        f1_scores = []
        for tr_idx, val_idx in skf.split(X_tr, y_tr):
            X_tr_f, X_val_f = X_tr.iloc[tr_idx], X_tr.iloc[val_idx]
            y_tr_f, y_val_f = y_tr.iloc[tr_idx], y_tr.iloc[val_idx]

            trf = clone(transformer)
            Xt_tr_f, Xt_val_f, _ = transform_fit_transform(X_tr_f, X_val_f, trf)

            clf = CatBoostClassifier(
                loss_function="MultiClass",
                random_seed=RNG,
                thread_count=-1,
                verbose=False,
                **params
            )
            clf.fit(
                Xt_tr_f, y_tr_f,
                eval_set=(Xt_val_f, y_val_f),
                use_best_model=True,
                early_stopping_rounds=EARLY_STOP,
                verbose=False
            )
            y_pred_f = clf.predict(Xt_val_f)
            # Cat возвращает shape (n,1); приведём
            y_pred_f = y_pred_f.reshape(-1).astype(int)
            f1_scores.append(macro_f1(y_val_f, y_pred_f))
        return float(np.mean(f1_scores))

    study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler(seed=RNG),
                                pruner=optuna.pruners.MedianPruner())
    study.optimize(objective, n_trials=N_TRIALS_CAT, show_progress_bar=False)
    return TuneResult(study.best_params, study.best_value)

In [21]:
def make_pipeline(estimator):
    return Pipeline(
        steps=[("transform", transform), ("model", estimator)],
        memory=memory
    )