In [1]:
import pandas as pd
import polars as pl
import numpy as np
import os
import pyarrow as pa
from tqdm import tqdm
from matplotlib import pyplot as plt
import pickle

from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsRegressor
from sklearn.metrics import mean_squared_error, r2_score
from concurrent.futures import ProcessPoolExecutor

In [2]:
class CONFIG:
    seed = 2025
    target_col = "responder_6"
    # data_id is not included as it's not relavant
    feature_cols = [f"feature_{idx:02d}" for idx in range(79)] \
        + [f"responder_{idx}_lag_1" for idx in range(9)]
    encoded_cols = [f'encoded_feature_{i}' for i in range(16)]
    categorical_cols = []

In [3]:
# train = pl.scan_parquet("/root/autodl-tmp/jane-street-2024/training.parquet").collect().to_pandas()
# valid = pl.scan_parquet("/root/autodl-tmp/jane-street-2024/validation.parquet").collect().to_pandas()
# train.shape, valid.shape

In [4]:
train = pl.scan_parquet("/root/autodl-tmp/jane-street-2024/encoded_train.parquet").collect().to_pandas()
valid = pl.scan_parquet("/root/autodl-tmp/jane-street-2024/encoded_valid.parquet").collect().to_pandas()
train.shape, valid.shape

((33290488, 21), (1643664, 21))

In [5]:
train.head()

Unnamed: 0,encoded_feature_0,encoded_feature_1,encoded_feature_2,encoded_feature_3,encoded_feature_4,encoded_feature_5,encoded_feature_6,encoded_feature_7,encoded_feature_8,encoded_feature_9,...,encoded_feature_11,encoded_feature_12,encoded_feature_13,encoded_feature_14,encoded_feature_15,responder_6,weight,symbol_id,date_id,time_id
0,-0.168524,-0.057372,0.102818,-0.243955,-0.202454,-0.104967,0.321823,-0.222371,-0.26224,-0.18043,...,-0.258119,-0.242541,-0.157287,-0.14576,0.0056,0.496563,3.324375,0,1000,0
1,-0.162565,-0.060451,0.094373,-0.243832,-0.207173,-0.107619,0.317791,-0.22277,-0.263774,-0.184074,...,-0.257707,-0.245912,-0.156141,-0.148324,-0.013658,0.529877,4.711303,1,1000,0
2,-0.164049,-0.259337,-0.201851,-0.278407,-0.252687,-0.257245,0.225601,-0.24953,-0.273658,-0.24442,...,-0.042617,-0.223356,-0.231683,-0.271294,0.407031,0.746983,3.028847,2,1000,0
3,-0.271798,-0.055786,-0.239978,-0.229345,-0.060237,-0.220229,0.733118,-0.208833,-0.147414,-0.217443,...,-0.012005,-0.107234,-0.278464,-0.162567,-0.258254,0.941218,2.099438,3,1000,0
4,-0.269069,-0.106385,-0.235731,-0.240414,-0.079836,-0.237433,0.665825,-0.209488,-0.180263,-0.203104,...,-0.017777,-0.128438,-0.277465,-0.186881,-0.27737,0.204584,3.166049,4,1000,0


In [6]:
# # Trick of boosting LB score, data leakage on the validation set
# train = pd.concat([train, valid]).reset_index(drop=True)
# train.shape

In [7]:
# # 2.3 Use KNN imputation (yes, KNN can be used to impute missing values!)
# from sklearn.impute import KNNImputer

# imputer = KNNImputer(n_neighbors=5)
# X_train = imputer.fit_transform(train[CONFIG.feature_cols])
# X_valid = imputer.transform(valid[CONFIG.feature_cols])

In [8]:
# X_train = train[ CONFIG.feature_cols ]
# X_train = X_train.ffill().fillna(0)
# y_train = train[ CONFIG.target_col ]
# w_train = train["weight"]

# X_valid = valid[ CONFIG.feature_cols ]
# X_valid = X_valid.ffill().fillna(0)
# y_valid = valid[ CONFIG.target_col ]
# w_valid = valid["weight"]

In [9]:
X_train = train[ CONFIG.encoded_cols ]
y_train = train[ CONFIG.target_col ]
w_train = train["weight"]

X_valid = valid[ CONFIG.encoded_cols ]
y_valid = valid[ CONFIG.target_col ]
w_valid = valid["weight"]

In [10]:
class SymbolKNNRegressor:
    def __init__(self, n_neighbors=5, window_dates=50):
        self.n_neighbors = n_neighbors
        self.window_dates = window_dates
        self.symbol_models = {}  # Dictionary to store KNN models for each symbol
        
    def fit(self, X, y, symbol_ids, date_ids):
        # Get unique symbols
        unique_symbols = np.unique(symbol_ids)
        
        # For each symbol, create and fit a KNN model
        for symbol in tqdm(unique_symbols, desc="Fitting models for symbols"):
            # Get data for this symbol
            symbol_mask = symbol_ids == symbol
            X_symbol = X[symbol_mask]
            y_symbol = y[symbol_mask]
            dates_symbol = date_ids[symbol_mask]
            
            # Get the most recent dates for this symbol
            recent_dates = sorted(set(dates_symbol))[-self.window_dates:]
            recent_mask = np.isin(dates_symbol, recent_dates)
            
            # Filter data to recent dates
            X_recent = X_symbol[recent_mask]
            y_recent = y_symbol[recent_mask]
            
            # Only create model if we have enough data
            if len(X_recent) > self.n_neighbors:
                # Create and fit KNN model for this symbol
                knn = KNeighborsRegressor(
                    n_neighbors=min(self.n_neighbors, len(X_recent)),
                    weights='distance',
                    algorithm='ball_tree',
                    n_jobs=-1
                )
                knn.fit(X_recent, y_recent)
                self.symbol_models[symbol] = knn
        
        return self
    
    def predict(self, X, symbol_ids):
        predictions = np.zeros(len(X))
        
        # Group test data by symbol for batch processing
        unique_symbols = np.unique(symbol_ids)
        
        for symbol in unique_symbols:
            # Get mask for current symbol
            symbol_mask = symbol_ids == symbol
            
            # If we have a model for this symbol
            if symbol in self.symbol_models:
                predictions[symbol_mask] = self.symbol_models[symbol].predict(X[symbol_mask])
            # If symbol is not in training data, prediction remains 0
        
        return predictions

# Modified parallel prediction function
def predict_batch(model, X_batch, symbol_ids_batch):
    return model.predict(X_batch, symbol_ids_batch)

def parallel_predict(model, X, symbol_ids, batch_size=10000, n_workers=8):
    n_samples = len(X)
    predictions = np.zeros(n_samples)
    
    # Create batches
    batches = [(model, X[i:i+batch_size], symbol_ids[i:i+batch_size]) 
               for i in range(0, n_samples, batch_size)]
    
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        futures = []
        for i, (model, batch, symbol_batch) in enumerate(batches):
            future = executor.submit(predict_batch, model, batch, symbol_batch)
            futures.append((i, future))
        
        # Collect results
        for i, future in tqdm(futures, desc="Processing batches"):
            start_idx = i * batch_size
            end_idx = min((i + 1) * batch_size, n_samples)
            predictions[start_idx:end_idx] = future.result()
    
    return predictions

In [11]:
# Usage example:
symbol_knn = SymbolKNNRegressor(n_neighbors=3, window_dates=20)
symbol_knn.fit(
    X_train, 
    y_train,
    train["symbol_id"],
    train["date_id"]
)

Fitting models for symbols: 100%|██████████| 39/39 [00:17<00:00,  2.27it/s]


<__main__.SymbolKNNRegressor at 0x7f3838c24940>

In [12]:
# Make predictions
y_pred_valid = parallel_predict(
    symbol_knn, 
    X_valid, 
    valid["symbol_id"]
)

In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

or by setting POLARS_ALLOW_FORKING_THREAD=1.

  self.pid = os.fork()
In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

or by setting POLARS_ALLOW_FORKING_THREAD=1.

  self.pid = os.fork()



In [13]:
# Calculate score
valid_score = r2_score(y_valid, y_pred_valid, sample_weight=w_valid)
print(f"Validation Score: {valid_score}")

Validation Score: 0.185186169717505


In [22]:
result = {
    "model" : symbol_knn,
    "cv" : valid_score
}
with open("knn_group.pkl", "wb") as fp:
    pickle.dump(result, fp)

In [15]:
# class RecentKNNRegressor:
#     def __init__(self, n_neighbors=5, window_size=None):
#         self.n_neighbors = n_neighbors
#         self.window_size = window_size
#         self.knn = KNeighborsRegressor(
#             n_neighbors=n_neighbors,
#             weights='distance',
#             algorithm='ball_tree',
#             n_jobs=-1
#         )
#         # self.scaler = StandardScaler()
        
#     def fit(self, X, y, date_ids=None):
#         # Filter recent data if window_size is specified
#         if self.window_size and date_ids is not None:
#             recent_dates = sorted(set(date_ids))[-self.window_size:]
#             mask = np.isin(date_ids, recent_dates)
#             X = X[mask]
#             y = y[mask]
        
#         # # Scale features
#         # X_scaled = self.scaler.fit_transform(X)
        
#         # Fit KNN with sample weights
#         self.knn.fit(X, y)
#         return self
    
#     def predict(self, X):
#         # X_scaled = self.scaler.transform(X)
#         return self.knn.predict(X)

In [16]:
# # Train single model with recent data and weights
# recent_knn = RecentKNNRegressor(n_neighbors=5, window_size=200)
# recent_knn.fit(
#     X_train, 
#     y_train,
#     date_ids=train["date_id"]
# )

In [17]:
# knn_reg = KNeighborsRegressor(
#     n_neighbors=5,
#     weights='distance',
#     algorithm='ball_tree',
#     n_jobs=-1
# )

In [18]:
# knn_reg.fit(X_train, y_train)

In [19]:
# def predict_batch(model, X_batch):
#     return model.predict(X_batch)

# def parallel_predict(model, X, batch_size=10000, n_workers=8):
#     n_samples = len(X)
#     predictions = np.zeros(n_samples)
    
#     # Create batches
#     batches = [(model, X[i:i+batch_size]) 
#                for i in range(0, n_samples, batch_size)]
    
#     with ProcessPoolExecutor(max_workers=n_workers) as executor:
#         futures = []
#         for i, (model, batch) in enumerate(batches):
#             future = executor.submit(predict_batch, model, batch)
#             futures.append((i, future))
        
#         # Collect results
#         for i, future in tqdm(futures, desc="Processing batches"):
#             start_idx = i * batch_size
#             end_idx = min((i + 1) * batch_size, n_samples)
#             predictions[start_idx:end_idx] = future.result()
    
#     return predictions

# # Usage
# y_pred_valid = parallel_predict(recent_knn, X_valid)

In [20]:
# # y_pred_valid = recent_knn.predict(X_valid)
# valid_score = r2_score(y_valid, y_pred_valid, sample_weight=w_valid )
# valid_score

In [21]:
# result = {
#     "model" : recent_knn,
#     "cv" : valid_score
# }
# with open("knn_model.pkl", "wb") as fp:
#     pickle.dump(result, fp)