In [30]:
import numpy as np
import pandas as pd
import polars as pl
from sklearn.linear_model import LinearRegression
import lightgbm as lgb
from pathlib import Path
import os
import kaggle_evaluation.default_inference_server

In [31]:
DATA_PATH = Path('data/')
TARGET_COLUMN = 'forward_returns'
REMOVE_COLUMNS = ['date_id', 'forward_returns','risk_free_rate','market_forward_excess_returns']

LGBM_PARAMS = {
    'objective': 'regression',
    'metric': 'rmse',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'n_estimators': 500,
    'random_state': 42
}

In [32]:
df_train = pd.read_csv(DATA_PATH / "train.csv").iloc[:-180]
df_test = pd.read_csv(DATA_PATH / "test.csv")

df_train['lagged_forward_returns'] = df_train['forward_returns'].shift(1)
df_train['lagged_risk_free_rate'] = df_train['risk_free_rate'].shift(1)
df_train['lagged_market_forward_excess_returns'] = df_train['market_forward_excess_returns'].shift(1)

SELECTED_FEATURES = [col for col in df_train.columns if col not in REMOVE_COLUMNS]

X_train = df_train[SELECTED_FEATURES]
y_train = df_train[TARGET_COLUMN]
y_train = y_train.apply(lambda x: 1 if x > 0 else 0)

model = lgb.LGBMRegressor(**LGBM_PARAMS)
model.fit(X_train, y_train)

[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.007229 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 22340
[LightGBM] [Info] Number of data points in the train set: 8810, number of used features: 97
[LightGBM] [Info] Start training from score 0.538593


0,1,2
,boosting_type,'gbdt'
,num_leaves,31
,max_depth,-1
,learning_rate,0.05
,n_estimators,500
,subsample_for_bin,200000
,objective,'regression'
,class_weight,
,min_split_gain,0.0
,min_child_weight,0.001


In [33]:
MIN_SIGNAL: float = 0.0        
MAX_SIGNAL: float = 2.0        
SIGNAL_MULTIPLIER: float = 400.0  

In [34]:
def predict(test: pl.DataFrame) -> float:
    
    if model is None:
        return 0.0

    try:
        test_pd = test.to_pandas()
        X_test = test_pd[SELECTED_FEATURES]
        X_test = X_test.fillna(0)
        pred = model.predict(X_test)[0]
        position = np.clip(1.0 + SIGNAL_MULTIPLIER * pred, 0.0, 2.0)
    
        return float(position)

    except Exception as e:
        print(e)
        return 0.0

In [35]:
inference_server = kaggle_evaluation.default_inference_server.DefaultInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(('data',))

In [36]:
submission_df = pd.read_parquet('submission.parquet')
print(submission_df.head())

   date_id  prediction
0     8810         2.0
1     8811         2.0
2     8812         2.0
3     8813         2.0
4     8814         2.0


In [37]:
import pandas.api.types

MIN_INVESTMENT = 0
MAX_INVESTMENT = 2


class ParticipantVisibleError(Exception):
    pass


def score(solution: pd.DataFrame, submission: pd.DataFrame, row_id_column_name: str) -> float:
    """
    Calculates a custom evaluation metric (volatility-adjusted Sharpe ratio).

    This metric penalizes strategies that take on significantly more volatility
    than the underlying market.

    Returns:
        float: The calculated adjusted Sharpe ratio.
    """

    if not pandas.api.types.is_numeric_dtype(submission['prediction']):
        raise ParticipantVisibleError('Predictions must be numeric')

    solution = solution
    solution['position'] = submission['prediction']

    if solution['position'].max() > MAX_INVESTMENT:
        raise ParticipantVisibleError(f'Position of {solution["position"].max()} exceeds maximum of {MAX_INVESTMENT}')
    if solution['position'].min() < MIN_INVESTMENT:
        raise ParticipantVisibleError(f'Position of {solution["position"].min()} below minimum of {MIN_INVESTMENT}')

    solution['strategy_returns'] = solution['risk_free_rate'] * (1 - solution['position']) + solution['position'] * solution['forward_returns']

    # Calculate strategy's Sharpe ratio
    strategy_excess_returns = solution['strategy_returns'] - solution['risk_free_rate']
    strategy_excess_cumulative = (1 + strategy_excess_returns).prod()
    strategy_mean_excess_return = (strategy_excess_cumulative) ** (1 / len(solution)) - 1
    strategy_std = solution['strategy_returns'].std()

    trading_days_per_yr = 252
    if strategy_std == 0:
        raise ParticipantVisibleError('Division by zero, strategy std is zero')
    sharpe = strategy_mean_excess_return / strategy_std * np.sqrt(trading_days_per_yr)
    strategy_volatility = float(strategy_std * np.sqrt(trading_days_per_yr) * 100)

    # Calculate market return and volatility
    market_excess_returns = solution['forward_returns'] - solution['risk_free_rate']
    market_excess_cumulative = (1 + market_excess_returns).prod()
    market_mean_excess_return = (market_excess_cumulative) ** (1 / len(solution)) - 1
    market_std = solution['forward_returns'].std()

    market_volatility = float(market_std * np.sqrt(trading_days_per_yr) * 100)

    if market_volatility == 0:
        raise ParticipantVisibleError('Division by zero, market std is zero')

    # Calculate the volatility penalty
    excess_vol = max(0, strategy_volatility / market_volatility - 1.2) if market_volatility > 0 else 0
    vol_penalty = 1 + excess_vol

    # Calculate the return penalty
    return_gap = max(
        0,
        (market_mean_excess_return - strategy_mean_excess_return) * 100 * trading_days_per_yr,
    )
    return_penalty = 1 + (return_gap**2) / 100

    # Adjust the Sharpe ratio by the volatility and return penalty
    adjusted_sharpe = sharpe / (vol_penalty * return_penalty)
    return min(float(adjusted_sharpe), 1_000_000)

In [38]:
try:
    solution_df = (
        pl.read_csv("data/train.csv")
        .tail(180)
        .to_pandas() # score 함수는 pandas DataFrame을 사용
    )

    submission_df = pd.read_parquet("submission.parquet")
    
    local_score = score(
        solution=solution_df, 
        submission=submission_df, 
        row_id_column_name="batch_id" 
    )
    
    print("---" * 10)
    print(f"(정답: {len(solution_df)}, 예측: {len(submission_df)})")
    print(f"Score(Adjusted Sharpe Ratio): {local_score:.5f}")
    print("---" * 10)

except Exception as e:
    print(e)

------------------------------
(정답: 180, 예측: 180)
Score(Adjusted Sharpe Ratio): 0.20703
------------------------------
