In [1]:
!pip install dask dask_ml xgboost

Collecting dask_ml
  Downloading dask_ml-2024.4.4-py3-none-any.whl.metadata (5.9 kB)
Collecting dask-glm>=0.2.0 (from dask_ml)
  Downloading dask_glm-0.3.2-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting distributed>=2.4.0 (from dask_ml)
  Downloading distributed-2024.11.2-py3-none-any.whl.metadata (3.3 kB)
Collecting sparse>=0.7.0 (from dask-glm>=0.2.0->dask_ml)
  Downloading sparse-0.15.4-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting dask-expr<1.2,>=1.1 (from dask[array,dataframe]>=2.4.0->dask_ml)
  Downloading dask_expr-1.1.19-py3-none-any.whl.metadata (2.6 kB)
INFO: pip is looking at multiple versions of distributed to determine which version is compatible with other requirements. This could take a while.
Collecting distributed>=2.4.0 (from dask_ml)
  Downloading distributed-2024.11.1-py3-none-any.whl.metadata (3.3 kB)
  Downloading distributed-2024.11.0-py3-none-any.whl.metadata (3.3 kB)
  Downloading distributed-2024.10.0-py3-none-any.whl.metadata (3.3 kB)
Collecting sorted

In [3]:
import os

kaggle_dir = '/root/.kaggle'
os.makedirs(kaggle_dir, exist_ok=True)

!cp kaggle.json {kaggle_dir}/

!kaggle competitions download -c jane-street-real-time-market-data-forecasting

!unzip jane-street-real-time-market-data-forecasting.zip

!rm jane-street-real-time-market-data-forecasting.zip

Downloading jane-street-real-time-market-data-forecasting.zip to /content
100% 11.5G/11.5G [01:29<00:00, 156MB/s]
100% 11.5G/11.5G [01:29<00:00, 137MB/s]
Archive:  jane-street-real-time-market-data-forecasting.zip
  inflating: features.csv            
  inflating: kaggle_evaluation/__init__.py  
  inflating: kaggle_evaluation/core/__init__.py  
  inflating: kaggle_evaluation/core/base_gateway.py  
  inflating: kaggle_evaluation/core/generated/__init__.py  
  inflating: kaggle_evaluation/core/generated/kaggle_evaluation_pb2.py  
  inflating: kaggle_evaluation/core/generated/kaggle_evaluation_pb2_grpc.py  
  inflating: kaggle_evaluation/core/kaggle_evaluation.proto  
  inflating: kaggle_evaluation/core/relay.py  
  inflating: kaggle_evaluation/core/templates.py  
  inflating: kaggle_evaluation/jane_street_gateway.py  
  inflating: kaggle_evaluation/jane_street_inference_server.py  
  inflating: lags.parquet/date_id=0/part-0.parquet  
  inflating: responders.csv          
  inflating: sam

In [4]:
import dask.dataframe as dd
from xgboost.dask import DaskXGBRegressor
from dask_ml.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from dask_ml.model_selection import train_test_split
import pandas as pd
import numpy as np

In [5]:
# Define features and target
target = 'responder_6'
features = [f"feature_{i:02d}" for i in range(79)] + ["date_id", "time_id", "symbol_id"]

# Define the imputer
imputer = SimpleImputer(strategy='constant', fill_value=0)

# Read the data
data_dir = '/content/train.parquet'
data = dd.read_parquet(f"{data_dir}/*/*", columns=features + [target] + ['weight'])
# Impute missing values
def impute_partition(df):
    # Convert the numpy array back to a DataFrame with the original columns
    imputed_array = imputer.fit_transform(df)
    return pd.DataFrame(imputed_array, columns=df.columns)

data = data.map_partitions(impute_partition)

# Save the imputed dataset
data.to_parquet("imputed_dataset")


In [6]:
from dask.distributed import Client, LocalCluster

# Create a local cluster
cluster = LocalCluster()
client = Client(cluster)

print(client)

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:38169
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33069'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38063'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:42823', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:42823
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:56162
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:45539', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker co

<Client: 'tcp://127.0.0.1:38169' processes=2 threads=2, memory=12.67 GiB>


In [None]:
data_dir = '/content/imputed_dataset'
data = dd.read_parquet(f"{data_dir}/*")
data = data.sort_values(by=["date_id", "time_id"]).reset_index(drop=True)


X = data[features]
Y = data[target]
weights = data['weight']


num_valid_dates = 100
dates = data['date_id'].unique().compute()
valid_dates = dates[-num_valid_dates:]
train_dates = dates[:-num_valid_dates]

train_mask = data['date_id'].isin(train_dates)
valid_mask = data['date_id'].isin(valid_dates)

X_Train = X[train_mask]
Y_Train = Y[train_mask]
X_Valid = X[valid_mask]
Y_Valid = Y[valid_mask]

dask_model = DaskXGBRegressor(
    objective="reg:squarederror",
    n_estimators=1000,
    max_depth=6,
    learning_rate=0.1,
    tree_method="hist"
)

dask_model.fit(X_Train, Y_Train)

INFO:distributed.core:Connection to tcp://127.0.0.1:56162 has been closed.
INFO:distributed.scheduler:Remove worker <WorkerState 'tcp://127.0.0.1:42823', name: 1, status: running, memory: 11, processing: 5> (stimulus_id='handle-worker-cleanup-1732472627.818779')
INFO:distributed.nanny:Worker process 4434 was killed by signal 9
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:42505', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:42505
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:48008
INFO:distributed.nanny:Worker process 7747 was killed by signal 9
INFO:distributed.core:Connection to tcp://127.0.0.1:48008 has been closed.
INFO:distributed.scheduler:Remove worker <WorkerState 'tcp://127.0.0.1:42505', name: 1, status: running, memory: 12, processing: 2> (stimulus_id='handle-worker-cleanup-1732472965.6645908')
INFO:distributed.scheduler:Register worker <WorkerS

In [None]:
def compute_weighted_r2(y_true, y_pred, weights):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    weights = np.array(weights)

    # Zero-mean adjustment for y_true
    y_true_zero_mean = y_true - np.average(y_true, weights=weights)

    # Calculate numerator and denominator
    numerator = np.sum(weights * (y_true - y_pred) ** 2)
    denominator = np.sum(weights * (y_true_zero_mean) ** 2)

    # Compute R^2 score
    r2_score = 1 - numerator / denominator
    return r2_score



print(Y_Valid, dask_model.predict(X_Valid), weights)


test_data_dir = '/content/test.parquet/date_id=0/part-0.parquet'
test_data = dd.read_parquet(f"{data_dir}", columns=features + [target])



prediction = dask_model.predict(test_data)

prediction.to_csv("predictions.csv", index=True, header=True)

