In [None]:
# Core libs
import numpy as np
import pandas as pd
import polars as pl
import pyarrow.parquet as pq
import gc
import os

# Modeling
from lightgbm import LGBMRegressor, early_stopping, log_evaluation
from sklearn.model_selection import train_test_split
import scipy.stats as st



In [None]:
#Sample and Filter Low-Signal Features
# Cell 2 – Sample & Filter Low-Signal Features
SAMPLE_MOD = 10  # take every 10th row → ~10% sample

lazy = (
    pl.scan_parquet("/kaggle/input/drw-crypto-market-prediction/train.parquet")
      .with_row_index("idx")                      # new API
      .filter(pl.col("idx") % SAMPLE_MOD == 0)    # every 10th row
)

sample = lazy.drop("idx").collect().to_pandas()

features_all = [c for c in sample.columns if c not in ("timestamp","label")]
corrs = sample[features_all].corrwith(sample["label"]).abs()
keep_n = int(len(corrs) * 0.7)
top_feats = corrs.sort_values(ascending=False).iloc[:keep_n].index.tolist()
print(f"Selected {len(top_feats)}/{len(features_all)} features")

del sample, corrs, lazy
gc.collect()



In [None]:
#Helper Functions
def optimize_dtypes(df):
    """Reduce memory usage by downcasting data types"""
    initial_memory = df.memory_usage(deep=True).sum() / 1024**3

    # Optimize float64 → float32
    for col in df.select_dtypes(include=["float64"]).columns:
        df[col] = df[col].astype("float32")
    # Optimize int64 → smaller ints
    for col in df.select_dtypes(include=["int64"]).columns:
        if df[col].min() >= 0 and df[col].max() < 65536:
            df[col] = df[col].astype("uint16")
        elif df[col].min() >= -32768 and df[col].max() < 32768:
            df[col] = df[col].astype("int16")
        else:
            df[col] = df[col].astype("int32")

    optimized_memory = df.memory_usage(deep=True).sum() / 1024**3
    print(f"{(initial_memory - optimized_memory)/initial_memory*100:.1f}% reduction in memory use")
    return df

def add_engineered(df):
    """Add microstructure features in one concat call to avoid fragmentation"""
    newf = pd.DataFrame({
        "bid_ask_spread": df["ask_qty"] - df["bid_qty"],
        "order_imbalance": (df["buy_qty"] - df["sell_qty"]) /
                           (df["buy_qty"] + df["sell_qty"] + 1e-6),
        "volume_ratio": df["volume"] /
                        (df["buy_qty"] + df["sell_qty"] + 1e-6),
    }, index=df.index)
    return pd.concat([df, newf], axis=1)

def rank_norm(df):
    """Percentile‐rank each column, then downcast to float16"""
    return df.rank(pct=True, axis=0).astype("float16")

def predict_in_chunks(model, test_path, features):
    """
    Read & predict one parquet row-group at a time, applying the same
    dtype-optimizations and feature engineering as training.
    """
    pf = pq.ParquetFile(test_path)
    preds, ids = [], []
    offset = 0

    for rg in range(pf.num_row_groups):
        # 1) Read a single row group into pandas
        tbl = pf.read_row_group(rg)
        df  = tbl.to_pandas()

        # 2) Optimize types & engineer features
        df = optimize_dtypes(df)
        df = add_engineered(df)

        # 3) Prepare model input
        X = df[features].fillna(0)
        X = rank_norm(X)

        # 4) Predict
        p = model.predict(X.values.astype("float16"))
        n = len(df)

        preds.append(p)
        ids.append(np.arange(offset + 1, offset + n + 1))
        offset += n

        # 5) Cleanup
        del df, X, p
        gc.collect()

    # 6) Build submission DataFrame
    return pd.DataFrame({
        "ID": np.concatenate(ids),
        "prediction": np.concatenate(preds)
    })

In [None]:
#Load and Prepare Training Data
# 1) Load only selected features + label
cols = top_feats + ["label"]
train = pd.read_parquet(
    "/kaggle/input/drw-crypto-market-prediction/train.parquet",
    columns=cols
)

# 2) Memory‐optimize
train = optimize_dtypes(train)

# 3) Feature‐engineer & normalize
train = add_engineered(train)
X = train[top_feats + ["bid_ask_spread","order_imbalance","volume_ratio"]].fillna(0)
X = rank_norm(X)
y = train["label"].astype("float32")

del train
gc.collect()


In [None]:
#Outlier Clip and Temporal Split 
# Clip extremes
clip_thr = 3 * y.std()
y = y.clip(-clip_thr, clip_thr)

# 80/20 time‐based split
split = int(0.8 * len(X))
X_train, X_val = X.iloc[:split], X.iloc[split:]
y_train, y_val = y.iloc[:split], y.iloc[split:]

del X, y
gc.collect()

print("Train / Val shapes:", X_train.shape, X_val.shape)


In [None]:
#Train LightGBM
model = LGBMRegressor(
    objective="regression",
    learning_rate=0.05,
    num_leaves=31,
    feature_fraction=0.8,
    bagging_fraction=0.8,
    bagging_freq=5,
    n_estimators=1000,
    lambda_l1=1.0,
    lambda_l2=1.0,
    random_state=42,
    verbosity=-1
)

model.fit(
    X_train, y_train,
    eval_set=[(X_val, y_val)],
    eval_metric="rmse",
    callbacks=[early_stopping(50), log_evaluation(50)]
)
print("Best iteration:", model.best_iteration_)


In [None]:
#Validation Correlation 
pred_val = model.predict(X_val)
print("Val Pearson:", st.pearsonr(y_val, pred_val)[0].round(4))


In [None]:
#Chunked Polars Backed Prediction
# Cell 8 – Predict on Test Set in Chunks
submission = predict_in_chunks(
    model,
    test_path='/kaggle/input/drw-crypto-market-prediction/test.parquet',
    features=top_feats + ["bid_ask_spread", "order_imbalance", "volume_ratio"]
)

# Save and preview
submission.to_csv('/kaggle/working/submission.csv', index=False)
print("Wrote /kaggle/working/submission.csv")
print("Contents:", os.listdir('/kaggle/working'))
submission.head()


In [None]:
#Save and Preview
submission.to_csv("/kaggle/working/submission.csv", index=False)
print("Files:", os.listdir("/kaggle/working"))
submission.head()
