In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import polars as pl # data processing, CSV file I/O (e.g. pd.read_csv)
import pandas as pd
import os
from pathlib import Path
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import gc
import re
import random
import itertools

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
np.random.seed(15234)
random.seed(15234)
os.environ['PYTHONHASHSEED'] = '42'
tf.random.set_seed(15234)

## Create Data Pipeline

In [3]:
base_path = Path("/kaggle/input/jane-street-real-time-market-data-forecasting/")
preprocess_path = Path('/kaggle/input/js-preprocessing/')
train_path = preprocess_path / Path("train/")
train_read_paths = [train_path / Path(f"partition_id={i}/part-0.parquet") for i in [0, 1, 2, 3, 4, 5, 6, 7]]
val_read_paths = [train_path / Path(f"partition_id={i}/part-0.parquet") for i in [8]]
test_read_paths = [train_path / Path(f"partition_id={i}/part-0.parquet") for i in [9]]

data_base_path = Path("/kaggle/input/js-preprocessing/")
feature_data_path = data_base_path / Path("feature_data.csv")

target = "responder_6"

In [4]:
class Config:
    input_format = {
        "date_id": tf.TensorSpec(shape=(None,), dtype=tf.float32),
        "time_id": tf.TensorSpec(shape=(None,), dtype=tf.float32),
        "symbol_id": tf.TensorSpec(shape=(None,), dtype=tf.float32),
        "weight": tf.TensorSpec(shape=(None,), dtype=tf.float32),
        "features": tf.TensorSpec(shape=(None, 79), dtype=tf.float32),
        "responders": tf.TensorSpec(shape=(None, 9), dtype=tf.float32),
        "lags": tf.TensorSpec(shape=(None, 9), dtype=tf.float32),
        "target": tf.TensorSpec(shape=(None,), dtype=tf.float32)
    }
    train_batch_size = 32768
    val_batch_size = 512

config = Config()

In [5]:
feature_format = 'feature_\d\d'
responder_format = 'responder_\d'
lag_format = 'responder_\d_lag'

def chunk_features(chunk):
    return chunk[[i for i in chunk.columns if re.fullmatch(feature_format, i)]]
def chunk_lags(chunk):
    return chunk[[i for i in chunk.columns if re.fullmatch(lag_format, i)]]

def from_files(paths):
    def to_ret():
        for filepath in paths:
            chunk = pl.read_parquet(filepath)
            yield {
                "date_id": chunk["date_id"],
                "time_id": chunk["time_id"],
                "symbol_id": chunk["symbol_id"],
                "weight": chunk["weight"],
                "features": chunk_features(chunk),
                "responders": chunk[[i for i in chunk.columns if re.fullmatch(responder_format, i)]],
                "lags": chunk_lags(chunk),
                "target": chunk[target]
            }
    return to_ret

train_raw = tf.data.Dataset.from_generator(
    from_files(train_read_paths),
    output_signature=config.input_format
).prefetch(tf.data.AUTOTUNE)

val_raw = tf.data.Dataset.from_generator(
    from_files(val_read_paths),
    output_signature=config.input_format
).cache()

test_raw = tf.data.Dataset.from_generator(
    from_files(test_read_paths),
    output_signature=config.input_format
)

In [6]:
train_raw.element_spec

{'date_id': TensorSpec(shape=(None,), dtype=tf.float32, name=None),
 'time_id': TensorSpec(shape=(None,), dtype=tf.float32, name=None),
 'symbol_id': TensorSpec(shape=(None,), dtype=tf.float32, name=None),
 'weight': TensorSpec(shape=(None,), dtype=tf.float32, name=None),
 'features': TensorSpec(shape=(None, 79), dtype=tf.float32, name=None),
 'responders': TensorSpec(shape=(None, 9), dtype=tf.float32, name=None),
 'lags': TensorSpec(shape=(None, 9), dtype=tf.float32, name=None),
 'target': TensorSpec(shape=(None,), dtype=tf.float32, name=None)}

## Filter & Clean Data

In [7]:
feature_data_df = pl.read_csv(feature_data_path)
means = np.asarray(feature_data_df['mean']).astype('float32')
stds = np.asarray(feature_data_df['std']).astype('float32')

In [8]:
def clean_features(features):
    return tf.where(
        tf.logical_or(tf.math.is_nan(features), tf.math.is_inf(features)), 
        means, 
        features)

def normalize_features(features):
    return (features - means) / tf.math.maximum(1.0, stds)

def clean_lags(lags):
    return tf.where(
        tf.logical_or(tf.math.is_nan(lags), tf.math.is_inf(lags)), 
        0.0, 
        lags)

def format_data(features, lags):
    return {
        'feature': normalize_features(clean_features(features)),
        'lags': clean_lags(lags)
    }

train_ds = train_raw.map(lambda i: (
                        format_data(i['features'], i['lags']),
                        i['target']
                        )).unbatch().shuffle(10000).batch(config.train_batch_size)
val_ds = val_raw.map(lambda i: (
    format_data(i['features'], i['lags']),
    i['target']
)).unbatch().batch(config.val_batch_size)

## Train a Model!

In [9]:
feat_layer = keras.Input(shape=(79,), name='feature')
lag_layer = keras.Input(shape=(9,), name='lags')
inp = keras.layers.Concatenate()([feat_layer, lag_layer])
x = layers.Dropout(rate=0.2)(inp)
x = layers.Dense(units=128, activation="silu", kernel_regularizer="l1l2")(x)
x = layers.Dropout(rate=0.1)(x)
x = layers.Dense(units=64, activation="silu", kernel_regularizer="l1l2")(x)
x = layers.Dropout(rate=0.2)(x)
x = layers.Dense(units=32, activation="silu", kernel_regularizer="l1l2")(x)
x = layers.Dense(units=16, activation="silu")(x)
x = layers.Dense(units=4, activation="silu")(x)
x = layers.Dense(units=1)(x)
model = keras.Model(inputs=[feat_layer, lag_layer], outputs=x)

In [10]:
def r2_loss(y_true, y_pred):
    return tf.math.reduce_sum((y_true - y_pred) ** 2) / tf.math.reduce_sum(y_true ** 2)

model.compile(optimizer="rmsprop", loss="mse", metrics=[keras.metrics.MeanAbsoluteError(), keras.metrics.R2Score()])

In [11]:
callbacks = [
    keras.callbacks.ModelCheckpoint('/kaggle/working/fitted.keras', save_best_only=True),
    keras.callbacks.ModelCheckpoint('/kaggle/working/intermediate.keras', save_best_only=False),
    keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3, min_delta=5e-5),
    keras.callbacks.EarlyStopping(patience=8)
]

history = model.fit(x=train_ds, validation_data=val_ds, epochs=1000, callbacks = callbacks)

Epoch 1/1000
   1059/Unknown [1m292s[0m 268ms/step - loss: 0.8446 - mean_absolute_error: 0.5997 - r2_score: -0.0024

  self.gen.throw(typ, value, traceback)


[1m1060/1060[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m342s[0m 315ms/step - loss: 0.8445 - mean_absolute_error: 0.5997 - r2_score: -0.0024 - val_loss: 0.7436 - val_mean_absolute_error: 0.5556 - val_r2_score: 0.0116 - learning_rate: 0.0010
Epoch 2/1000
[1m1060/1060[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m298s[0m 276ms/step - loss: 0.8346 - mean_absolute_error: 0.5961 - r2_score: 0.0103 - val_loss: 0.7426 - val_mean_absolute_error: 0.5552 - val_r2_score: 0.0129 - learning_rate: 0.0010
Epoch 3/1000
[1m1060/1060[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m314s[0m 291ms/step - loss: 0.8334 - mean_absolute_error: 0.5957 - r2_score: 0.0117 - val_loss: 0.7421 - val_mean_absolute_error: 0.5551 - val_r2_score: 0.0136 - learning_rate: 0.0010
Epoch 4/1000
[1m1060/1060[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m350s[0m 318ms/step - loss: 0.8328 - mean_absolute_error: 0.5956 - r2_score: 0.0124 - val_loss: 0.7421 - val_mean_absolute_error: 0.5550 - val_r2_score: 0.0136 - le

## Evaluate Model

In [12]:
test_ds = test_raw.map(lambda i: (
                        format_data(i['features'], i['lags']),
                        i['target']
                        )).unbatch().batch(512)

In [13]:
model = keras.models.load_model('/kaggle/working/fitted.keras', custom_objects={
    'r2_loss': r2_loss
})

In [14]:
model.evaluate(x=test_ds)

[1m12256/12256[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m52s[0m 3ms/step - loss: 0.6666 - mean_absolute_error: 0.5288 - r2_score: 0.0079


[0.6577768325805664, 0.5324712991714478, 0.007621109485626221]

## Online Retrain Test

In [15]:
old_model = keras.models.load_model('/kaggle/working/fitted.keras', custom_objects={
    'r2_loss': r2_loss
})
new_model = keras.models.load_model('/kaggle/working/fitted.keras', custom_objects={
    'r2_loss': r2_loss
})

In [16]:
original_format = "(date_id)|(time_id)|(symbol_id)|(weight)|(feature_\d\d)|(responder_\d)"
test_df = pl.concat([pl.read_parquet(i) for i in test_read_paths])
test_df = test_df[[i for i in test_df.columns if re.fullmatch(original_format, i)]]

In [17]:
feature_names = ['feature_00',
 'feature_01',
 'feature_02',
 'feature_03',
 'feature_04',
 'feature_05',
 'feature_06',
 'feature_07',
 'feature_08',
 'feature_09',
 'feature_10',
 'feature_11',
 'feature_12',
 'feature_13',
 'feature_14',
 'feature_15',
 'feature_16',
 'feature_17',
 'feature_18',
 'feature_19',
 'feature_20',
 'feature_21',
 'feature_22',
 'feature_23',
 'feature_24',
 'feature_25',
 'feature_26',
 'feature_27',
 'feature_28',
 'feature_29',
 'feature_30',
 'feature_31',
 'feature_32',
 'feature_33',
 'feature_34',
 'feature_35',
 'feature_36',
 'feature_37',
 'feature_38',
 'feature_39',
 'feature_40',
 'feature_41',
 'feature_42',
 'feature_43',
 'feature_44',
 'feature_45',
 'feature_46',
 'feature_47',
 'feature_48',
 'feature_49',
 'feature_50',
 'feature_51',
 'feature_52',
 'feature_53',
 'feature_54',
 'feature_55',
 'feature_56',
 'feature_57',
 'feature_58',
 'feature_59',
 'feature_60',
 'feature_61',
 'feature_62',
 'feature_63',
 'feature_64',
 'feature_65',
 'feature_66',
 'feature_67',
 'feature_68',
 'feature_69',
 'feature_70',
 'feature_71',
 'feature_72',
 'feature_73',
 'feature_74',
 'feature_75',
 'feature_76',
 'feature_77',
 'feature_78',
 'responder_0_lag',
 'responder_1_lag',
 'responder_2_lag',
 'responder_3_lag',
 'responder_4_lag',
 'responder_5_lag',
 'responder_6_lag',
 'responder_7_lag',
 'responder_8_lag']
responder_names = ['responder_0',
 'responder_1',
 'responder_2',
 'responder_3',
 'responder_4',
 'responder_5',
 'responder_6',
 'responder_7',
 'responder_8']


In [18]:
def train_online_model(past_df, num_iterations = 10):
    print("training new model")
    model = keras.models.load_model('/kaggle/working/fitted.keras', custom_objects={
        'r2_loss': r2_loss
    })
    model.optimizer.learning_rate.assign(5e-5)

    x_train = format_data(chunk_features(past_df), chunk_lags(past_df))
    y_train = past_df[target]
    model.fit(x=x_train, y=y_train, batch_size=past_df.shape[0], epochs=num_iterations)

    return model

In [19]:
past_data_cols = feature_names + ['date_id', 'time_id', 'symbol_id']
past_responder_cols = responder_names + ['date_id', 'time_id', 'symbol_id']
past_df_cols = feature_names + responder_names + ['date_id', 'time_id', 'symbol_id']

class PastStorage:
    def __init__(self):
        self.past_df = None
        self.last_day_data = None
        self.last_day_id = None
    def reformat_lags(lags: pl.DataFrame):
        last_day_ans = lags
        for i in range(9):
            for j in [col for col in last_day_ans.columns if re.search(f'responder_{i}', col)]:
                last_day_ans = last_day_ans.rename({j: f"responder_{i}"})
        return last_day_ans
    def append_to(df: pl.DataFrame | None, chunk: pl.DataFrame):
        if df is None:
            return chunk
        else:
            return pl.concat([df, chunk])
        
    def data_inc(self, test_full: pl.DataFrame, lags: pl.DataFrame | None):
        global past_data_cols, past_responder_cols
        if self.last_day_id is None:
            self.last_day_id = test_full['date_id'][0]
        
        # use the data from the previous day for today's information
        if lags is not None:
            assert self.last_day_id != test_full['date_id'][0]

            self.last_day_ans = PastStorage.reformat_lags(lags)
            to_append = self.last_day_data.join(self.last_day_ans, ['date_id', 'time_id', 'symbol_id'], how='left')

            self.past_df = PastStorage.append_to(self.past_df, to_append[past_df_cols])
            print("Check for past dataframe cleanness:", self.past_df.shape, self.past_df['responder_0'].is_null().sum())

            self.last_day_data = None
            self.last_day_id = test_full['date_id'][0]

        # append the data to the previous day... should always be the same as last_day_id
        assert self.last_day_id == test_full['date_id'][0]
        self.last_day_data = PastStorage.append_to(self.last_day_data, test_full[past_data_cols])

In [20]:
lags_ : pl.DataFrame | None = None
storage = PastStorage()
days_per_update = 5
cur_day = 0

# Replace this function with your inference code.
# You can return either a Pandas or Polars dataframe, though Polars is recommended.
# Each batch of predictions (except the very first) must be returned within 1 minute of the batch features being provided.
def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame | pd.DataFrame:
    global lags_, storage, old_model, new_model, cur_day, days_per_update
    
    if lags is not None:
        lags_ = lags
        
    if lags_ is not None:
        last_reading = lags_.group_by(('date_id', 'symbol_id'), maintain_order=True).last()
        for i in range(9):
            for j in [col for col in last_reading.columns if re.search(f'responder_{i}', col)]:
                last_reading = last_reading.rename({j: f"responder_{i}_lag"})

        selected_cols = ['date_id', 'symbol_id'] + [i for i in last_reading.columns if re.fullmatch(lag_format, i)]
        join_to = last_reading.with_columns(last_reading['date_id'] + 1)[selected_cols]
        test_grouped = test.join(join_to, ['date_id', 'symbol_id'], how='left')
    else:
        test_grouped = test
        for i in range(9):
            test_grouped = test_grouped.with_columns(pl.lit(None).cast(pl.Float32).alias(f"responder_{i}_lag"))
    
    input_features = format_data(chunk_features(test_grouped), chunk_lags(test_grouped))
    old_y = np.asarray(old_model.predict(input_features, verbose=0)).reshape((-1,))
    new_y = np.asarray(new_model.predict(input_features, verbose=0)).reshape((-1,))
    output_y = (old_y + new_y) / 2
    predictions = pl.DataFrame({
        "row_id": test['row_id'],
        "responder_6": output_y
    })

    storage.data_inc(test_grouped, lags)
    if lags is not None:
        cur_day += 1
        if cur_day == days_per_update:
            new_model = train_online_model(storage.past_df)
            storage.past_df = None
        cur_day %= days_per_update

    if isinstance(predictions, pl.DataFrame):
        assert predictions.columns == ['row_id', 'responder_6']
    elif isinstance(predictions, pd.DataFrame):
        assert (predictions.columns == ['row_id', 'responder_6']).all()
    else:
        raise TypeError('The predict function must return a DataFrame')
    # Confirm has as many rows as the test data.
    assert len(predictions) == len(test)

    return predictions

In [21]:
by_days = test_df.group_by(['date_id', 'time_id'], maintain_order=True)

In [22]:
last_date = None
last_lag = None

predictions = pl.DataFrame(schema={'responder_6': pl.Float32})
answers = pl.DataFrame(schema={'responder_6': pl.Float32})

In [23]:
lag_send_format = "(date_id)|(time_id)|(symbol_id)|(responder_\d)"
num_samples = 100000
cur_ind = 0

for (date_id, time_id), df in by_days:
    cur_ind += 1
    if cur_ind > num_samples:
        break
    if cur_ind % 100 == 0:
        print(f"Processing {cur_ind}")
    lag_send = None
    if date_id != last_date:
        lag_send = last_lag
    else:
        lag_send = None

    df = df.with_columns(pl.Series(list(range(df.shape[0]))).alias("row_id"))
    result = predict(df, lag_send)
    
    predictions = pl.concat([predictions, result[['responder_6']]])
    answers = pl.concat([answers, df[['responder_6']]])
    
    responders = df[[i for i in df.columns if re.fullmatch(lag_send_format, i)]]

    if date_id != last_date:
        last_date = date_id
        last_lag = responders
    else:
        if last_lag is None:
            last_lag = responders
        else:
            last_lag = pl.concat([last_lag, responders])

Processing 100
Processing 200
Processing 300
Processing 400
Processing 500
Processing 600
Processing 700
Processing 800
Processing 900
Check for past dataframe cleanness: (35816, 100) 0
Processing 1000
Processing 1100
Processing 1200
Processing 1300
Processing 1400
Processing 1500
Processing 1600
Processing 1700
Processing 1800
Processing 1900
Check for past dataframe cleanness: (73568, 100) 0
Processing 2000
Processing 2100
Processing 2200
Processing 2300
Processing 2400
Processing 2500
Processing 2600
Processing 2700
Processing 2800
Processing 2900
Check for past dataframe cleanness: (109384, 100) 0
Processing 3000
Processing 3100
Processing 3200
Processing 3300
Processing 3400
Processing 3500
Processing 3600
Processing 3700
Processing 3800
Check for past dataframe cleanness: (147136, 100) 0
Processing 3900
Processing 4000
Processing 4100
Processing 4200
Processing 4300
Processing 4400
Processing 4500
Processing 4600
Processing 4700
Processing 4800
Check for past dataframe cleanness:

In [24]:
metric = tf.metrics.R2Score()
metric.update_state(np.asarray(answers['responder_6']).reshape((-1, 1)), np.asarray(predictions['responder_6']).reshape((-1, 1)))
metric.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.008329034>

In [25]:
for i in range(9):
    print((storage.past_df[f'responder_{i}'] - test_df[f'responder_{i}'][:storage.past_df.shape[0]]).max())

5.360012054443359
5.5210065841674805
5.943350315093994
5.845920562744141
6.06838321685791
5.419388294219971
7.608674049377441
7.8821516036987305
7.446694374084473


In [26]:
np.asarray(answers['responder_6']).reshape((-1, 1))

array([[ 3.071231  ],
       [ 1.9790423 ],
       [-0.5062598 ],
       ...,
       [-0.92973065],
       [-0.7898972 ],
       [ 0.21184593]], dtype=float32)

In [27]:
np.asarray(predictions['responder_6']).reshape((-1, 1))

array([[0.31701842],
       [0.33453202],
       [0.26945645],
       ...,
       [0.04558765],
       [0.02127514],
       [0.02893108]], dtype=float32)