In [1]:
"""
Simple regression-based model to predict daily excess returns and map them to allocation.

- Train a Linear Regression model on `market_forward_excess_returns`
- Use only numeric feature columns (excluding obvious ID/target columns)
- On each predict() call, take the single-row Polars DataFrame, align features,
  predict excess return, then convert it to an allocation in [0, 2].
"""

import os

import pandas as pd
import polars as pl

import kaggle_evaluation.default_inference_server

from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler

# ---------------------------------------------------------------------
# Global objects for lazy training
# ---------------------------------------------------------------------
MODEL = None
SCALER = None
FEATURE_COLS = None

DATA_PATH = "/kaggle/input/hull-tactical-market-prediction/"
TRAIN_FILE = os.path.join(DATA_PATH, "train.csv")
TARGET_COL = "market_forward_excess_returns"


def train_simple_regression_model():
    """
    Load train.csv and fit a very simple regression model:

    y = market_forward_excess_returns
    X = all numeric columns except obvious ID/target columns.
    """
    global MODEL, SCALER, FEATURE_COLS

    # Load as Polars for speed
    train_pl = pl.read_csv(TRAIN_FILE)

    if TARGET_COL not in train_pl.columns:
        raise ValueError(
            f"Expected target column '{TARGET_COL}' in train.csv; got {train_pl.columns}"
        )

    # Exclude non-feature columns
    EXCLUDE_COLS = {
        "row_id",
        "id",
        "date_id",
        "forward_returns",
        "risk_free_rate",
        # TARGET_COL 은 아래에서 따로 제외
    }

    numeric_types = (pl.Float64, pl.Float32, pl.Int64, pl.Int32)

    feature_cols = []
    for name, dtype in zip(train_pl.columns, train_pl.dtypes):
        if name in EXCLUDE_COLS:
            continue
        if name == TARGET_COL:
            continue
        if dtype in numeric_types:
            feature_cols.append(name)

    if not feature_cols:
        raise ValueError("No numeric feature columns found for regression model.")

    FEATURE_COLS = feature_cols

    # Select features + target, convert to Pandas
    train_sel = train_pl.select(FEATURE_COLS + [TARGET_COL])
    train_pd = train_sel.to_pandas()

    # Handle missing values simply
    X = train_pd[FEATURE_COLS].fillna(0.0)
    y = train_pd[TARGET_COL].fillna(0.0)

    # Scale features (simple standardization)
    SCALER = StandardScaler()
    X_scaled = SCALER.fit_transform(X)

    # Simple Linear Regression model
    MODEL = LinearRegression()
    MODEL.fit(X_scaled, y)


def ensure_model_trained():
    """Train the regression model once, at first use."""
    global MODEL
    if MODEL is not None:
        return
    train_simple_regression_model()


def excess_return_to_allocation(ret: float) -> float:
    """
    Map predicted excess return to an allocation in [0, 2].

    - Start from neutral allocation 1.0
    - Scale linearly by a small factor k
    - Clip strictly to [0, 2]

    This keeps the logic extremely simple while still using a
    regression-based excess return prediction.
    """
    k = 50.0  # scaling factor; small returns → small tilts around 1.0

    alloc = 1.0 + k * ret
    # Clip to [0, 2] as required by the competition
    if alloc < 0.0:
        alloc = 0.0
    elif alloc > 2.0:
        alloc = 2.0
    return float(alloc)


def predict(test: pl.DataFrame) -> float:
    """Simple regression-based inference.

    1) Ensure model is trained (lazy)
    2) Extract one-row feature vector from `test`
    3) Predict daily excess return
    4) Convert predicted excess return to allocation in [0, 2]
    """
    ensure_model_trained()

    # `test` is a Polars DataFrame with a single row.
    if not isinstance(test, pl.DataFrame):
        raise TypeError("predict(test): expected Polars DataFrame as input")

    if test.height != 1:
        raise ValueError(
            f"predict(test): expected a single-row Polars DataFrame, got {test.height} rows"
        )

    # Convert to Pandas and align feature columns
    test_pd = test.to_pandas()

    # Ensure all training feature columns exist; fill missing with 0
    for col in FEATURE_COLS:
        if col not in test_pd.columns:
            test_pd[col] = 0.0

    X_test = test_pd[FEATURE_COLS].fillna(0.0)

    # Apply the same scaler as training
    X_test_scaled = SCALER.transform(X_test)

    # Predict excess return and map to allocation
    pred_ret = float(MODEL.predict(X_test_scaled)[0])
    alloc = excess_return_to_allocation(pred_ret)

    return alloc


# When your notebook is run on the hidden test set, inference_server.serve must be called within 15 minutes of the notebook starting
# or the gateway will throw an error. If you need more than 15 minutes to load your model you can do so during the very
# first `predict` call, which does not have the usual 1 minute response deadline.
inference_server = kaggle_evaluation.default_inference_server.DefaultInferenceServer(
    predict
)

if os.getenv("KAGGLE_IS_COMPETITION_RERUN"):
    inference_server.serve()
else:
    inference_server.run_local_gateway(("/kaggle/input/hull-tactical-market-prediction/",))