In [1]:
from __future__ import annotations
from pathlib import Path
from typing import Dict, Tuple, List
import os
import json
import pandas as pd
import numpy as np
from loguru import logger

In [2]:
BASE_DIR = os.getcwd()
HOME_DIR = os.path.dirname(BASE_DIR)
new_data_dates = os.listdir(os.path.join(HOME_DIR, "hl-node-fills"))

# --- Config/paths ---
DATA_DIR = Path(os.path.join(HOME_DIR, "data"))
DATA_DIR.mkdir(parents=True, exist_ok=True)

WALLETS_CSV = DATA_DIR / "wallet_db.csv"

In [3]:
# --- Wallet DB helpers ---
def load_wallet_db(csv_path: Path = WALLETS_CSV) -> Tuple[Dict[str, int], int]:
    """
    Load wallets from CSV into a dict {wallet: wallet_id}, return dict and next_id.
    If file doesn't exist, start fresh at 1.
    """
    mapping: Dict[str, int] = {}
    next_id = 1
    if csv_path.exists():
        df = pd.read_csv(csv_path, dtype={"wallet_id": "uint32", "wallet": "string"})
        if not df.empty:
            for wid, wal in zip(df["wallet_id"].astype("uint32"), df["wallet"].astype("string")):
                mapping[str(wal)] = int(wid)
            next_id = int(df["wallet_id"].max()) + 1
    else:
        csv_path.parent.mkdir(parents=True, exist_ok=True)
        pd.DataFrame(columns=["wallet_id", "wallet"]).to_csv(csv_path, index=False)
    return mapping, next_id

def append_wallet(csv_path: Path, wallet: str, wallet_id: int) -> None:
    """Append a single wallet row to the CSV."""
    pd.DataFrame([{"wallet_id": wallet_id, "wallet": wallet}]).to_csv(
        csv_path, mode="a", header=False, index=False
    )

def get_wallet_id(wallet: str, mapping: Dict[str, int], next_id_ref: List[int], csv_path: Path) -> int:
    """
    Return wallet_id for wallet, creating a new id if needed.
    next_id_ref is a single-item list to allow in-place increment.
    """
    w = str(wallet)
    wid = mapping.get(w)
    if wid is not None:
        return wid
    wid = next_id_ref[0]
    mapping[w] = wid
    next_id_ref[0] += 1
    append_wallet(csv_path, w, wid)
    return wid

# --- ETL helpers ---
def _infer_is_ask(trade: dict, idx_in_side_info: int) -> bool:
    """
    Decide if the row belongs to the ask side.
    Heuristic:
      1) If top-level 'side' is present ('A'/'B'), we tag A as asks, B as bids.
      2) Otherwise fall back to index parity inside side_info: even->A(ask), odd->B(bid).
    Adjust here if your ground truth differs.
    """
    side_top = trade.get("side")
    if side_top in ("A", "B"):
        # We still need to label both rows A/B for the two entries—we'll mirror top-level:
        is_ask = (side_top == "A")
    else:
        is_ask = (idx_in_side_info % 2 == 0)  # even -> "A" (ask), odd -> "B" (bid)
    return bool(is_ask)

def retrieve_data(file_path: Path, wallet_map: Dict[str, int], next_id_ref: List[int], wallets_csv: Path = WALLETS_CSV) -> pd.DataFrame:
    """
    Read a newline-delimited JSON file of trades and produce a normalized DataFrame
    for later partitioned saving.
    Output columns: coin, price, size, time, is_ask, wallet_id
    """
    records = []
    with open(file_path) as f:
        append = records.append
        for line in f:
            if not line.strip():
                continue
            wallet, trade = json.loads(line)
            wallet_id = get_wallet_id(wallet, wallet_map, next_id_ref, wallets_csv)
    
            px = trade.get("px")
            sz = trade.get("sz")
            # skip malformed
            if px is None or sz is None:
                continue
    
            append(
                {
                    "coin": trade.get("coin"),
                    "price": float(px),
                    "size": float(sz),
                    "time": trade.get("time"),
                    "is_ask": trade.get("side") == "A",
                    "wallet_id": wallet_id,
                }
            )
    
    df = pd.DataFrame.from_records(records)
    if df.empty:
        return df
    
    # Types & cleaning
    df["time"] = pd.to_datetime(df["time"], errors="coerce", unit="ms")
    df = df.dropna(subset=["time"])
    # enforce dtypes
    df["price"] = df["price"].astype("float32")
    df["size"] = df["size"].astype("float32")
    df["is_ask"] = df["is_ask"].astype("bool")
    df["wallet_id"] = df["wallet_id"].astype("uint32")
    
    return df[["coin", "price", "size", "time", "is_ask", "wallet_id"]]

def _target_path_for(coin: str, dt: pd.Timestamp) -> Path:
    return DATA_DIR / str(coin) / f"{dt.date()}.parquet"

def _write_daily_parquet(target: Path, df_day: pd.DataFrame) -> None:
    """
    Write/merge the daily file. If target exists, read, concat, de-dup, sort, write.
    We de-dup on [time, wallet_id, price, size, is_ask] as a reasonable row identity.
    """
    target.parent.mkdir(parents=True, exist_ok=True)

    # Keep only required columns & types
    cols = ["price", "size", "time", "is_ask", "wallet_id"]
    df_day = df_day[cols].copy()

    if target.exists():
        try:
            old = pd.read_parquet(target, engine="pyarrow")
            # Cast to same dtypes to avoid upcasting surprises
            old["price"] = old["price"].astype("float32")
            old["size"] = old["size"].astype("float32")
            old["time"] = pd.to_datetime(old["time"], errors="coerce")
            old["is_ask"] = old["is_ask"].astype("bool")
            old["wallet_id"] = old["wallet_id"].astype("uint32")
            df_day = pd.concat([old, df_day], ignore_index=True)
        except Exception as e:
            logger.warning(f"Failed to read existing parquet {target}: {e}. Overwriting.")

    df_day = df_day.dropna(subset=["time"]).drop_duplicates(
        subset=["time", "wallet_id", "price", "size", "is_ask"], keep="last"
    )
    df_day = df_day.sort_values("time")
    df_day.to_parquet(target, index=False, engine="pyarrow", compression="snappy")

def save_partitioned(df: pd.DataFrame) -> None:
    """
    Save rows to data/<coin>/<YYYY-MM-DD>.parquet, merging per-day files if present.
    """
    if df.empty:
        logger.warning("No data to save.")
        return

    # Add date for grouping
    df = df.copy()
    df["date"] = df["time"].dt.date

    # Group by coin/date
    for (coin, day), g in df.groupby(["coin", "date"], sort=False):
        if pd.isna(coin) or coin == "":
            logger.warning("Skipping rows with empty coin.")
            continue
        target = DATA_DIR / str(coin) / f"{day}.parquet"
        _write_daily_parquet(target, g)

    logger.info("Data has been saved successfully.")

In [None]:
new_data_folders = os.listdir(os.path.join(HOME_DIR, "hl-node-fills"))

wallet_map, next_id = load_wallet_db()
next_id_ref = [next_id]  # mutable holder


for i, date in enumerate(new_data_folders):
    hour_file_names = os.listdir(os.path.join(HOME_DIR, "hl-node-fills", date))

    for file_name in hour_file_names:
        file_full_path = os.path.join(HOME_DIR, "hl-node-fills", date, file_name)

        logger.info(f"{file_full_path} is processing")
        df = retrieve_data(Path(file_full_path), wallet_map, next_id_ref, WALLETS_CSV)
        save_partitioned(df)

    logger.info(f"Processed {i} out of {len(new_data_folders)}")

[32m2025-08-20 17:34:32.495[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m13[0m - [1m/home/debian/hl-node-fills/20250617/10.json is processing[0m
[32m2025-08-20 17:34:36.777[0m | [1mINFO    [0m | [36m__main__[0m:[36msave_partitioned[0m:[36m158[0m - [1mData has been saved successfully.[0m
[32m2025-08-20 17:34:36.788[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m13[0m - [1m/home/debian/hl-node-fills/20250617/14.json is processing[0m
[32m2025-08-20 17:34:41.995[0m | [1mINFO    [0m | [36m__main__[0m:[36msave_partitioned[0m:[36m158[0m - [1mData has been saved successfully.[0m
[32m2025-08-20 17:34:42.009[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m13[0m - [1m/home/debian/hl-node-fills/20250617/12.json is processing[0m
[32m2025-08-20 17:34:46.584[0m | [1mINFO    [0m | [36m__main__[0m:[36msave_partitioned[0m:[36m158[0m - [1mData has been saved successfully.[0m
[32m2025-08-20 17:34:46.