In [1]:
import os
from pathlib import Path

import numpy as np
import pandas as pd
import polars as pl

import kaggle_evaluation.default_inference_server

In [2]:
# Define customized score functions
def AdjustedSharpe(
    market_returns: list[float],
    risk_free_rate: list[float],
    positions: list[float],
    return_df: bool = False
):

    market_returns   = pd.Series(market_returns)
    risk_free_rate   = pd.Series(risk_free_rate)
    positions        = pd.Series(positions)
    assert positions.max() <= 2
    assert positions.min() >= 0
    
    market_excess    = market_returns - risk_free_rate
    strategy_excess  = market_excess * positions
    strategy_returns = strategy_excess + risk_free_rate
    
    
    market_exm   = (1 + market_excess).prod() ** (1 / len(positions)) - 1
    market_std   = market_returns.std()
    strategy_exm = (1 + strategy_excess).prod() ** (1 / len(positions)) - 1
    strategy_std = strategy_returns.std()

    if strategy_std == 0:
        sharpe = 0
    else:
        sharpe = strategy_exm / strategy_std * np.sqrt(252)
    return_penalty = 1 + max(0, ((market_exm - strategy_exm) * 252)) ** 2 * 100
    vol_penalty    = 1 + max(0, strategy_std / market_std - 1.2)
    
    score = sharpe / return_penalty / vol_penalty 
    
    if return_df:
        df = pd.DataFrame([{
            'market_excess_mean': market_exm,
            'market_std': market_std,
            'strategy_excess_mean': strategy_exm,
            'strategy_std': strategy_std,
            'sharpe': sharpe,
            'return_penalty': return_penalty,
            'vol_penalty': vol_penalty,
            'score': score,
            'market_return': market_returns.iloc[-1],
            'risk_free_rate': risk_free_rate.iloc[-1],
            'position': positions.iloc[-1]
        }])
        return(df)

    return score

In [3]:
# Define a class to make predictions
class TimeSeriesPredictor:
    def __init__(
        self,
        history: pl.DataFrame,
        window: int = 100,
        defined_positions: list[float] = list()
    ):
        self.history = history
        self.window  = window
        self.defined_positions = defined_positions.copy()
        self.schema = {col: history.schema[col] for col in history.columns}

    def predict(self, observation: pl.DataFrame):
        # update the history
        current_date_id = observation.item(0, "date_id")
        self.history = self.history.filter(pl.col("date_id") < current_date_id)
        expressions = []
        for colname, dtype in self.schema.items():
            if colname in observation.columns:
                if observation[colname].dtype != dtype:
                    expressions.append(pl.col(colname).cast(dtype, strict=False))
                else:
                    expressions.append(pl.col(colname))
        self.history = (
            pl.concat([self.history, observation.select(expressions)])
            .tail(self.window)
        )
        
        # decide the position
        position = 1
        current_is_scored = observation.item(0, "is_scored")
        if current_is_scored:
            position = self.set_position()

        return position

    def set_position(self):
        position = 1
        if self.defined_positions:
            position = self.defined_positions.pop(0)
        return np.clip(position, 0, 2)

In [4]:
PATH    = Path("/kaggle/input/hull-tactical-market-prediction")
COLUMNS = ["date_id", "lagged_risk_free_rate", "lagged_forward_returns"]

history_df = (
    pl.read_csv(PATH / "train.csv").with_columns(
        pl.col("forward_returns").shift(1).alias("lagged_forward_returns"),
        pl.col("risk_free_rate").shift(1).alias("lagged_risk_free_rate"),
        pl.col("market_forward_excess_returns").shift(1).alias("lagged_market_forward_excess_returns"),
        pl.lit(False).alias("is_scored")
    ).select(pl.col(COLUMNS))
)

solution_df = (
    pl.read_csv(PATH / "train.csv")
    .filter(pl.col("date_id").is_between(8810, 8988))
    .select(pl.col("risk_free_rate", "forward_returns"))
)

market_excess_returns = solution_df['forward_returns'] - solution_df['risk_free_rate']
risk_free_rate = solution_df['risk_free_rate']
k = 0.0007409555585545103

defined_positions = ((k - risk_free_rate) / market_excess_returns).clip(0, 2).to_list()

predictor = TimeSeriesPredictor(history_df, defined_positions=defined_positions)

In [5]:
def predict(test: pl.DataFrame) -> float:
    """Replace this function with your inference code.
    You can return either a Pandas or Polars dataframe, though Polars is recommended for performance.
    Each batch of predictions (except the very first) must be returned within 5 minutes of the batch features being provided.
    """
    return predictor.predict(test)

In [6]:
display(AdjustedSharpe(
    market_returns=solution_df["forward_returns"].to_list(),
    risk_free_rate=solution_df["risk_free_rate"].to_list(),
    positions=defined_positions,
    return_df=True
))

Unnamed: 0,market_excess_mean,market_std,strategy_excess_mean,strategy_std,sharpe,return_penalty,vol_penalty,score,market_return,risk_free_rate,position
0,0.000319,0.010962,0.000313,0.000284,17.511204,1.000221,1,17.507338,0.00831,0.000156,0.071742


In [7]:
# FYI: Code for Optimization

from scipy.optimize import minimize

def obj_func(x):
    return - AdjustedSharpe(
        market_returns=solution_df["forward_returns"].to_list(),
        risk_free_rate=solution_df["risk_free_rate"].to_list(),
        positions=(
            (x - solution_df['risk_free_rate']) / (solution_df['forward_returns'] - solution_df['risk_free_rate'])
        ).clip(0, 2).to_list(),
        return_df=False        
    )

EXEC_OPTIM = True
if EXEC_OPTIM:
    res = minimize(obj_func, [0.0007], method='BFGS')
    print(f"RESULT: k = {res.x[0]}, score = {-res.fun}")

RESULT: k = 0.0007409527179574094, score = 17.507337753928343


In [8]:
# When your notebook is run on the hidden test set, inference_server.serve must be called within 15 minutes of the notebook starting
# or the gateway will throw an error. If you need more than 15 minutes to load your model you can do so during the very
# first `predict` call, which does not have the usual 1 minute response deadline.
inference_server = kaggle_evaluation.default_inference_server.DefaultInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(('/kaggle/input/hull-tactical-market-prediction/',))