In [2]:
import pandas as pd
import numpy as np

In [28]:
df_quotes = pd.read_parquet('../data/quotes/GS_quotes_1s_2025-01-01_2025-01-31.parquet')

In [29]:
df_quotes

Unnamed: 0,ts,bid_price,ask_price,bid_size,ask_size,mid,spread,imbalance_l1,microprice,microprice_dev
19800,2025-01-02 09:30:00-05:00,574.56,579.60,200.0,200.0,577.080,5.04,0.000000,577.080000,1.136868e-13
19801,2025-01-02 09:30:01-05:00,577.05,579.60,600.0,600.0,578.325,2.55,0.000000,578.325000,0.000000e+00
19802,2025-01-02 09:30:02-05:00,577.05,579.60,600.0,600.0,578.325,2.55,0.000000,578.325000,0.000000e+00
19803,2025-01-02 09:30:03-05:00,577.05,579.60,600.0,200.0,578.325,2.55,0.500000,578.962500,6.375000e-01
19804,2025-01-02 09:30:04-05:00,577.06,579.60,600.0,600.0,578.330,2.54,0.000000,578.330000,1.136868e-13
...,...,...,...,...,...,...,...,...,...,...
2462395,2025-01-30 15:59:55-05:00,645.77,645.94,700.0,1100.0,645.855,0.17,-0.222222,645.836111,-1.888889e-02
2462396,2025-01-30 15:59:56-05:00,645.51,645.94,100.0,1100.0,645.725,0.43,-0.833333,645.545833,-1.791667e-01
2462397,2025-01-30 15:59:57-05:00,645.42,645.91,800.0,2800.0,645.665,0.49,-0.555556,645.528889,-1.361111e-01
2462398,2025-01-30 15:59:58-05:00,645.42,645.91,1300.0,3000.0,645.665,0.49,-0.395349,645.568140,-9.686047e-02


In [26]:
def ensure_ts_eastern(df: pd.DataFrame) -> pd.DataFrame:
    """
    Ensures:
    - 'ts' exists as a column
    - tz-aware
    - converted to US/Eastern
    """
    df = df.copy()

    # If ts is index, move it to column
    if "ts" not in df.columns:
        if df.index.name == "ts":
            df = df.reset_index()
        else:
            raise ValueError("No 'ts' column or index found.")

    # Parse to datetime if needed
    if not pd.api.types.is_datetime64_any_dtype(df["ts"]):
        df["ts"] = pd.to_datetime(df["ts"], errors="coerce")

    # If tz-naive, assume UTC (consistent with your pipeline)
    if df["ts"].dt.tz is None:
        df["ts"] = df["ts"].dt.tz_localize("UTC")

    # Convert to Eastern
    df["ts"] = df["ts"].dt.tz_convert("US/Eastern")

    return df


def filter_rth(df: pd.DataFrame) -> pd.DataFrame:
    return df[
        (df["ts"].dt.time >= pd.to_datetime("09:30").time()) &
        (df["ts"].dt.time < pd.to_datetime("16:00").time())
    ]


In [27]:
from pathlib import Path
import pandas as pd


# -----------------------
# CONFIG
# -----------------------
QUOTES_DIR = Path("../data/quotes")
DRY_RUN = False

files = sorted(QUOTES_DIR.glob("*.parquet"))
if not files:
    print("No parquet files found.")

for fp in files:
    print(f"\nProcessing: {fp.name}")

    for fp in files:
        print(f"\nProcessing: {fp.name}")

        df = pd.read_parquet(fp)
        n_before = len(df)

        df = (
            df
            .pipe(ensure_ts_eastern)
            .pipe(filter_rth)
            .sort_values("ts")
        )

        n_after = len(df)

        print(f"Rows before: {n_before:,}")
        print(f"Rows after : {n_after:,}")

        if not DRY_RUN:
            df.to_parquet(fp, engine="pyarrow", compression="zstd")
            print("→ Overwritten")

        else:
            print("→ DRY RUN (not saved)")



Processing: GS_quotes_1s_2025-01-01_2025-01-31.parquet

Processing: GS_quotes_1s_2025-01-01_2025-01-31.parquet
Rows before: 678,629
Rows after : 678,600
→ Overwritten

Processing: GS_quotes_1s_2025-02-01_2025-02-28.parquet
Rows before: 585,025
Rows after : 585,000
→ Overwritten

Processing: GS_quotes_1s_2025-03-01_2025-03-31.parquet
Rows before: 608,426
Rows after : 608,400
→ Overwritten

Processing: GS_quotes_1s_2025-04-01_2025-04-30.parquet
Rows before: 678,629
Rows after : 678,600
→ Overwritten

Processing: GS_quotes_1s_2025-05-01_2025-05-31.parquet
Rows before: 702,030
Rows after : 702,000
→ Overwritten

Processing: GS_quotes_1s_2025-06-01_2025-06-30.parquet
Rows before: 608,426
Rows after : 608,400
→ Overwritten

Processing: IVV_quotes_1s_2025-01-01_2025-01-31.parquet
Rows before: 678,629
Rows after : 678,600
→ Overwritten

Processing: IVV_quotes_1s_2025-02-01_2025-02-28.parquet
Rows before: 585,025
Rows after : 585,000
→ Overwritten

Processing: IVV_quotes_1s_2025-03-01_2025-03-

In [30]:
df_prices = pd.read_parquet('../data/prices/GS_1s_2025-01-01_2025-01-31.parquet')
df_prices

Unnamed: 0,ts,symbol,open,high,low,close,volume,vwap,n_trades
0,2025-01-02 09:30:00-05:00,GS,,,,,0.0,,0.0
1,2025-01-02 09:30:01-05:00,GS,579.3100,579.310,578.080,578.080,20990.0,579.2914,37.0
2,2025-01-02 09:30:02-05:00,GS,578.3250,578.325,578.325,578.325,846.0,578.5489,60.0
3,2025-01-02 09:30:03-05:00,GS,578.3250,578.325,578.325,578.325,231.0,578.4025,24.0
4,2025-01-02 09:30:04-05:00,GS,577.6800,578.330,577.440,578.110,2120.0,578.0346,38.0
...,...,...,...,...,...,...,...,...,...
467995,2025-01-31 15:59:55-05:00,GS,640.4800,640.480,640.480,640.480,319.0,640.4666,11.0
467996,2025-01-31 15:59:56-05:00,GS,640.3150,640.315,639.950,639.980,3306.0,640.0664,59.0
467997,2025-01-31 15:59:57-05:00,GS,640.0400,640.040,639.850,639.870,1436.0,639.9244,20.0
467998,2025-01-31 15:59:58-05:00,GS,640.1200,640.380,640.120,640.370,1292.0,640.2798,22.0


In [25]:
df_trades = pd.read_parquet('../data/trades/IVV_trades_1s_2025-01-01_2025-01-31.parquet')
df_trades

Unnamed: 0,ts,last,volume,n_trades,vwap,symbol
0,2025-01-02 09:30:00-05:00,591.97,99232.0,141,592.164819,IVV
1,2025-01-02 09:30:01-05:00,591.62,2766.0,221,592.154205,IVV
2,2025-01-02 09:30:02-05:00,592.19,2057.0,285,591.772921,IVV
3,2025-01-02 09:30:03-05:00,592.19,1007.0,229,591.878175,IVV
4,2025-01-02 09:30:04-05:00,592.19,1790.0,155,591.755935,IVV
...,...,...,...,...,...,...
2442595,2025-01-30 15:59:55-05:00,607.92,264.0,6,607.994735,IVV
2442596,2025-01-30 15:59:56-05:00,607.75,3481.0,24,607.913371,IVV
2442597,2025-01-30 15:59:57-05:00,607.64,2636.0,31,607.723534,IVV
2442598,2025-01-30 15:59:58-05:00,607.82,3436.0,29,607.780536,IVV
