In [None]:
import warnings
from functools import partial
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import optuna
import polars as pl
import polars.selectors as cs

from catboost import CatBoostRegressor, MultiTargetCustomMetric
from xgboost import XGBRegressor

from numpy.typing import ArrayLike, NDArray
from polars.testing import assert_frame_equal
from sklearn.base import BaseEstimator
from sklearn.metrics import cohen_kappa_score
from sklearn.model_selection import StratifiedKFold, RepeatedKFold

warnings.filterwarnings("ignore", message="Failed to optimize method")


#DATA_DIR = Path("/kaggle/input/child-mind-institute-problematic-internet-use")
DATA_DIR = Path("./data")

TARGET_COLS = [
    "PCIAT-PCIAT_01",
    "PCIAT-PCIAT_02",
    "PCIAT-PCIAT_03",
    "PCIAT-PCIAT_04",
    "PCIAT-PCIAT_05",
    "PCIAT-PCIAT_06",
    "PCIAT-PCIAT_07",
    "PCIAT-PCIAT_08",
    "PCIAT-PCIAT_09",
    "PCIAT-PCIAT_10",
    "PCIAT-PCIAT_11",
    "PCIAT-PCIAT_12",
    "PCIAT-PCIAT_13",
    "PCIAT-PCIAT_14",
    "PCIAT-PCIAT_15",
    "PCIAT-PCIAT_16",
    "PCIAT-PCIAT_17",
    "PCIAT-PCIAT_18",
    "PCIAT-PCIAT_19",
    "PCIAT-PCIAT_20",
    "PCIAT-PCIAT_Total",
    "sii",
]

FEATURE_COLS = [
    "Basic_Demos-Age",
    "PreInt_EduHx-computerinternet_hoursday",
    "SDS-SDS_Total_Raw",
    "Physical-Height",
    "FGC-FGC_TL",
    "Physical-Waist_Circumference",
    "BIA-BIA_FMI",
    "BIA-BIA_Fat",
    "Fitness_Endurance-Time_Sec",
    "Fitness_Endurance-Max_Stage",
    "FGC-FGC_CU_Zone",
]

In [None]:
# Load data
train = pl.read_csv(DATA_DIR / "train.csv")
test = pl.read_csv(DATA_DIR / "test.csv")
train_test = pl.concat([train, test], how="diagonal")

IS_TEST = test.height <= 100

assert_frame_equal(train, train_test[: train.height].select(train.columns))
assert_frame_equal(test, train_test[train.height :].select(test.columns))

In [None]:
# Cast string columns to categorical
train_test = train_test.with_columns(cs.string().cast(pl.Categorical).fill_null("NAN"))
train = train_test[: train.height]
test = train_test[train.height :]

# ignore rows with null values in TARGET_COLS
train_without_null = train_test.drop_nulls(subset=TARGET_COLS)
X = train_without_null.select(FEATURE_COLS)
X_test = test.select(FEATURE_COLS)
y = train_without_null.select(TARGET_COLS)
y_sii = y.get_column("sii").to_numpy()  # ground truth
cat_features = X.select(cs.categorical()).columns

print("Features selected:")
print(cat_features)  # Should be none

# Tubotubo's Quadratic Weighted Kappa metric & Optuna optimizer

In [None]:
class MultiTargetQWK(MultiTargetCustomMetric):
    def get_final_error(self, error, weight):
        return np.sum(error)  # / np.sum(weight)

    def is_max_optimal(self):
        # if True, the bigger the better
        return True

    def evaluate(self, approxes, targets, weight):
        # approxes: 予測値 (shape: [ターゲット数, サンプル数])
        # targets: 実際の値 (shape: [ターゲット数, サンプル数])
        # weight: サンプルごとの重み (Noneも可)

        approx = np.clip(approxes[-1], 0, 3).round().astype(int)
        target = targets[-1]

        qwk = cohen_kappa_score(target, approx, weights="quadratic")

        return qwk, 1

    def get_custom_metric_name(self):
        return "MultiTargetQWK"


class OptimizedRounder:
    """
    A class for optimizing the rounding of continuous predictions into discrete class labels using Optuna.
    The optimization process maximizes the Quadratic Weighted Kappa score by learning thresholds that separate
    continuous predictions into class intervals.

    Args:
        n_classes (int): The number of discrete class labels.
        n_trials (int, optional): The number of trials for the Optuna optimization. Defaults to 100.

    Attributes:
        n_classes (int): The number of discrete class labels.
        labels (NDArray[np.int_]): An array of class labels from 0 to `n_classes - 1`.
        n_trials (int): The number of optimization trials.
        metric (Callable): The Quadratic Weighted Kappa score metric used for optimization.
        thresholds (List[float]): The optimized thresholds learned after calling `fit()`.

    Methods:
        fit(y_pred: NDArray[np.float_], y_true: NDArray[np.int_]) -> None:
            Fits the rounding thresholds based on continuous predictions and ground truth labels.

            Args:
                y_pred (NDArray[np.float_]): Continuous predictions that need to be rounded.
                y_true (NDArray[np.int_]): Ground truth class labels.

            Returns:
                None

        predict(y_pred: NDArray[np.float_]) -> NDArray[np.int_]:
            Predicts discrete class labels by rounding continuous predictions using the fitted thresholds.
            `fit()` must be called before `predict()`.

            Args:
                y_pred (NDArray[np.float_]): Continuous predictions to be rounded.

            Returns:
                NDArray[np.int_]: Predicted class labels.

        _normalize(y: NDArray[np.float_]) -> NDArray[np.float_]:
            Normalizes the continuous values to the range [0, `n_classes - 1`].

            Args:
                y (NDArray[np.float_]): Continuous values to be normalized.

            Returns:
                NDArray[np.float_]: Normalized values.

    References:
        - This implementation uses Optuna for threshold optimization.
        - Quadratic Weighted Kappa is used as the evaluation metric.
    """

    def __init__(self, n_classes: int, n_trials: int = 100):
        self.n_classes = n_classes
        self.labels = np.arange(n_classes)
        self.n_trials = n_trials
        self.metric = partial(cohen_kappa_score, weights="quadratic")

    def fit(self, y_pred: NDArray[np.float_], y_true: NDArray[np.int_]) -> None:
        y_pred = self._normalize(y_pred)

        def objective(trial: optuna.Trial) -> float:
            thresholds = []
            for i in range(self.n_classes - 1):
                low = max(thresholds) if i > 0 else min(self.labels)
                high = max(self.labels)
                th = trial.suggest_float(f"threshold_{i}", low, high)
                thresholds.append(th)
            try:
                y_pred_rounded = np.digitize(y_pred, thresholds)
            except ValueError:
                return -100
            return self.metric(y_true, y_pred_rounded)

        optuna.logging.disable_default_handler()
        study = optuna.create_study(direction="maximize")
        study.optimize(
            objective,
            n_trials=self.n_trials,
        )
        self.thresholds = [study.best_params[f"threshold_{i}"] for i in range(self.n_classes - 1)]

    def predict(self, y_pred: NDArray[np.float_]) -> NDArray[np.int_]:
        assert hasattr(self, "thresholds"), "fit() must be called before predict()"
        y_pred = self._normalize(y_pred)
        return np.digitize(y_pred, self.thresholds)

    def _normalize(self, y: NDArray[np.float_]) -> NDArray[np.float_]:
        # normalize y_pred to [0, n_classes - 1]
        return (y - y.min()) / (y.max() - y.min()) * (self.n_classes - 1)

# Start making models!

In [None]:
from sklearn.preprocessing import PolynomialFeatures, PowerTransformer
from sklearn.impute import SimpleImputer
from catboost import CatBoostRegressor
import numpy as np
import pandas as pd

# Define your parameters
params = dict(
    loss_function="MultiRMSE",
    eval_metric="MultiRMSE",  # Ensure eval_metric is appropriate
    iterations=8000,  # Adjust as needed
    learning_rate=0.05,
    depth=5,
    early_stopping_rounds=50,
)

# Updated SILLY MODEL with Polynomial Features and Imputation
class BaselinePolynomial:
    def __init__(self, degree=2, include_bias=False, imputation_strategy='mean'):
        """
        Initializes the SillyManPolynomial model.

        Parameters:
        - degree (int): Degree of the polynomial features.
        - include_bias (bool): Whether to include a bias column.
        - imputation_strategy (str): Strategy for imputing missing values ('mean', 'median', 'most_frequent', etc.).
        """
        self.imputer = SimpleImputer(strategy=imputation_strategy)
        self.scaler = PowerTransformer()
        self.poly = PolynomialFeatures(degree=degree, include_bias=include_bias)
        self.cat = CatBoostRegressor(**params)
        
    def fit(self, x, y, eval_set, cat_features, verbose=False):
        """
        Fits the model with polynomial feature generation and imputation.

        Parameters:
        - x (pd.DataFrame): Training features.
        - y (pd.Series or pd.DataFrame): Training target.
        - eval_set (tuple): Validation set as (X_val, y_val).
        - cat_features (list): List of categorical feature names to exclude or handle separately.
        - verbose (bool): Verbosity flag.
        """
        self.cat_features = cat_features
        
        # Separate numerical features (assuming categorical features are to be excluded or handled separately)
        if cat_features:
            numerical = x.drop(columns=self.cat_features)
            categorical = x[self.cat_features]
        else:
            numerical = x.copy()
            categorical = None

        # Impute missing values in numerical features
        X_train = self.imputer.fit_transform(numerical)
        X_val = self.imputer.transform(eval_set[0].drop(columns=self.cat_features)) if self.cat_features else self.imputer.transform(eval_set[0])

        # Scale features
        #X_train = self.scaler.fit_transform(X_train)
        #X_val = self.scaler.transform(X_val)

        # Apply Polynomial Feature Generation to numerical features
        X_train = self.poly.fit_transform(X_train)
        X_val = self.poly.transform(X_val)

        # If there are categorical features, append them to the polynomial features
        if categorical is not None:
            X_train_cat = categorical.values
            X_val_cat = eval_set[0][cat_features].values
            X_train_processed = np.hstack([X_train, X_train_cat])
            X_val_processed = np.hstack([X_val, X_val_cat])
            # Update cat_features indices to point to the categorical features appended at the end
            new_cat_features = list(range(X_train.shape[1], X_train_processed.shape[1]))
        else:
            X_train_processed = X_train
            X_val_processed = X_val
            new_cat_features = []

        # Fit CatBoost
        self.cat.fit(
            X_train_processed,
            y,
            eval_set=(X_val_processed, eval_set[1]),
            cat_features=new_cat_features,  # Specify categorical features if any
            verbose=verbose
        )
        
    def predict(self, x):
        """
        Predicts using the fitted model.

        Parameters:
        - x (pd.DataFrame): Features for prediction.
        - cat_features (list): List of categorical feature names.

        Returns:
        - np.ndarray: Predictions.
        """
        # Separate numerical features
        if cat_features:
            numerical = x.drop(columns=self.cat_features)
            categorical = x[self.cat_features]
        else:
            numerical = x.copy()
            categorical = None

        # Impute missing values
        X = self.imputer.transform(numerical)
        
        # Scale features
        #X = self.scaler.fit_transform(X)
        
        # Apply Polynomial Feature Generation
        X = self.poly.transform(X)
        
        # If there are categorical features, append them
        if categorical is not None:
            X_cat = categorical.values
            X_processed = np.hstack([X, X_cat])
        else:
            X_processed = X

        return self.cat.predict(X_processed)


In [None]:
# Cross-validation
models: list = []
y_pred = np.full((X.height, len(TARGET_COLS)), fill_value=np.nan)

#skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=52)
rkf = RepeatedKFold(n_splits=5, n_repeats=3, random_state=52)

for train_idx, val_idx in rkf.split(X, y_sii):
    X_train, X_val = X[train_idx], X[val_idx]
    y_train, y_val = y[train_idx], y[val_idx]
    
    # Initialize and train model
    model = BaselinePolynomial()
    #modelcat = CatBoostRegressor(**params)
    model.fit(
        X_train.to_pandas(),
        y_train.to_pandas(),
        eval_set=(X_val.to_pandas(), y_val.to_pandas()),
        cat_features=cat_features,
        verbose=False,
    )
    '''modelcat.fit(
        X_train.to_pandas(),
        y_train.to_pandas(),
        eval_set=(X_val.to_pandas(), y_val.to_pandas()),
        cat_features=cat_features,
        verbose=False,
    )'''
    models.append(model)
    #models.append(modelcat)
    
    # Predict
    y_pred[val_idx] = model.predict(X_val.to_pandas()) # + modelcat.predict(X_val.to_pandas())) / 2

assert np.isnan(y_pred).sum() == 0

# Optimize thresholds
optimizer = OptimizedRounder(n_classes=4, n_trials=300)
y_pred_total = y_pred[:, TARGET_COLS.index("PCIAT-PCIAT_Total")]
optimizer.fit(y_pred_total, y_sii)
y_pred_rounded = optimizer.predict(y_pred_total)

# Calculate QWK
qwk = cohen_kappa_score(y_sii, y_pred_rounded, weights="quadratic")
print(f"Cross-Validated QWK Score: {qwk}")

## RESULTS!

Baseline SMP    - Cross-Validated QWK Score: 0.4633317017779497
RepeatKFold SMP - Cross-Validated QWK Score: 0.4673477280860625

In [None]:
class AvgModel:
    def __init__(self, models: list[BaseEstimator]):
        self.models = models

    def predict(self, X: ArrayLike) -> NDArray[np.int_]:
        preds: list[NDArray[np.int_]] = []
        for model in self.models:
            pred = model.predict(X)
            preds.append(pred)

        return np.mean(preds, axis=0)

In [None]:
avg_model = AvgModel(models)
test_pred = avg_model.predict(X_test.to_pandas())[:, TARGET_COLS.index("PCIAT-PCIAT_Total")]
test_pred_rounded = optimizer.predict(test_pred)
test.select("id").with_columns(
    pl.Series("sii", pl.Series("sii", test_pred_rounded)),
).write_csv("submission.csv")