In [1]:
import polars as pl
import pandas as pd
import numpy as np
import json
import sys
from datetime import datetime, timedelta
from pathlib import Path
from sklearn.model_selection import KFold
from xgboost import XGBRegressor
import gc

project_root = Path().resolve().parent
sys.path.insert(0, str(project_root / "src"))

from dspy.hdb import get_dataset
from dspy.sim.market_simulator import MarketSimulator
from dspy.utils import to_ns, ts_to_str
from dspy.features.feature_utils import apply_batch_features, extract_features , flatten_features
from dspy.agents.agent_utils import get_agent
from dspy.features.utils import get_products
from dspy.utils import add_ts_dt



# ---------- Load run config file ----------

def load_config(path: Path) -> dict:
    with open(path, "r") as f:
        return json.load(f)

config_path = project_root / "run/run_config.json"
config = load_config(config_path)

dataset_name     = config["dataset"]
product          = config["product"]
depth            = config["depth"]
latency_ns       = config["latency_micros"] * 1_000
max_inventory    = config["max_inventory"]
inv_penalty      = config["inventory_penalty"]
initial_cash     = config["initial_cash"]
agent_config     = config["agent"]
intervals        = config["intervals"]
min_order_size   = config["min_order_size"]
tick_size        = config["tick_size"]
initial_cash     = config["initial_cash"]
cost_in_bps      = config["cost_in_bps"]
fixed_cost       = config["fixed_cost"]
simulator_mode   = config["simulator_mode"]
system_pc        = config["system_pc"]

loader = get_dataset(dataset_name)
all_books, all_ts = [], []
feature_path = project_root / "run/features.json"
feature_config = load_config(feature_path)
inventory_feature_flag = "inventory" in feature_config.keys()


from dspy.features.book_features import add_mid,add_vwap,add_ts_dt
# df=add_mid(df)
# df.head()

########


#function to add targets
def add_target_ret_time(
    df: pl.DataFrame,
    delta: int = 50,  # in milliseconds
    base_col: str = "mid",  # 'mid' or 'vwap'
    levels: int = 1,
    depth: int = 1,
    time_col: str = "ts_dt",
    products: list[str] | None = None
) -> pl.DataFrame:

    if time_col not in df.columns:
        df = add_ts_dt(df)

    # Ensure time_col is datetime[ns]
    if df[time_col].dtype != pl.Datetime("ns"):
        df = df.with_columns(pl.col(time_col).cast(pl.Datetime("ns")))

    # Ensure price column exists
    if base_col == "mid":
        col_prefix = "mid"
        if col_prefix not in df.columns:
            df = add_mid(df, products=products)
    elif base_col == "vwap":
        col_prefix = f"vwap_level{levels}"
        if col_prefix not in df.columns:
            df = add_vwap(df, levels=levels, depth=depth, products=products)
    else:
        raise ValueError("base_col must be 'mid' or 'vwap'.")

    

    price_col = col_prefix
    future_df = (
        df.select([time_col, price_col])
        .with_columns(
            (pl.col(time_col) + pl.duration(milliseconds=-delta))
            .cast(pl.Datetime("ns"))
            .alias(time_col)
        )
        .rename({price_col: f"{price_col}_fut"})
    )
    ret_col_name = (
            f"ret_{delta}ms_fut"
        )
    df = df.join_asof(
        future_df,
        left_on=time_col,
        right_on=time_col,
        strategy="backward",
        tolerance=timedelta(milliseconds=1000000)  # generous tolerance
    ).with_columns([
        (pl.col(f"{price_col}_fut")/ (pl.col(price_col) ) - 1).alias(ret_col_name)
    ]).drop([f"{price_col}_fut"])


    return df.drop_nulls()


# --- All your existing function definitions and config loading can stay the same ---
# correlation_xgb_feature_selection(), add_target_ret_time(), etc.

# --- MAIN EXECUTION LOGIC (REVISED WITH YOUR CHUNKING STRATEGY) ---

# 1. Load the single, massive interval as you are doing now
print('Loading the full dataset...')
# ... (your existing code to load the single interval into `df`) ...
for interval in intervals:
        start_str = interval["start"]
        end_str   = interval["end"]
        print('dataframe from:', start_str,'to:',end_str)

        start_ts = datetime.strptime(interval["start"], "%Y-%m-%d %H:%M:%S").strftime("%y%m%d.%H%M%S")
        end_ts   = datetime.strptime(interval["end"],   "%Y-%m-%d %H:%M:%S").strftime("%y%m%d.%H%M%S")

        df = loader.load_book(
            product=product,
            times=[start_ts, end_ts],
            depth=depth,
            type="book_snapshot_25",
            lazy=False
        )
print('Full dataset loaded. Shape:', df.shape)
print( ' Estimated size of df in MB: ', df.estimated_size("mb"))

# 2. Define the number of chunks and the directory for processed files
num_chunks = 10
chunk_size = len(df) // num_chunks # Use integer division
processed_data_dir = Path(__file__).parent/ "chunk_data"
print(processed_data_dir)
processed_data_dir.mkdir(parents=True, exist_ok=True)
print(f"Splitting the DataFrame into {num_chunks} chunks of approximately {chunk_size} rows each.")

# 3. Loop through each chunk, process it, and save it to disk
feature_cols = []
target_list = []
for i in range(num_chunks):
    print(f"--- Processing chunk {i+1}/{num_chunks} ---")
    
    # Calculate the start offset for the slice
    offset = i * chunk_size
    
    # Get the current chunk using a simple slice
    df_chunk = df.slice(offset, chunk_size)

    # Apply feature engineering to the chunk
    print(f"  - Applying batch features...")
    df_processed_chunk, feature_cols = apply_batch_features(df_chunk, feature_config)
    feature_cols = [col for col in feature_cols if df_processed_chunk[col].dtype != pl.Datetime]
    
    # Add target variables to the chunk
    print(f"  - Adding targets...")
    time_horizons = [50, 500, 5000]
    price = 'mid'
    for t in time_horizons:
        if i==0:
            target_list.append(f"ret_{t}ms_fut")
        df_processed_chunk = add_target_ret_time(df_processed_chunk, t, price)

    # Save the processed chunk to disk
    output_path = processed_data_dir / f"processed_chunk_{i}.parquet"
    print(f"  - Saving processed chunk to {output_path}")
    df_processed_chunk.write_parquet(output_path)

# 4. Release memory of the huge raw DataFrame
print("\nReleasing memory of the original raw DataFrame...")
del df
gc.collect()


# 5. Load all the processed chunks from disk
print("--- Loading all processed chunks from disk ---")
all_processed_files = list(processed_data_dir.glob("*.parquet"))
# df = pl.read_parquet(all_processed_files)
df = pl.scan_parquet(all_processed_files) 

Loading the full dataset...
dataframe from: 2025-04-01 00:00:00 to: 2025-05-20 23:59:59
Full dataset loaded. Shape: (75432606, 22)
 Estimated size of df in MB:  12661.11245727539


NameError: name '__file__' is not defined