# Elliptic2 Data Pre-processing
**Research workflow, reproducible artifacts, and auditable quality checks**


## Abstract
This notebook implements a reproducible pre-processing pipeline for the Elliptic2 dataset for subgraph-level classification. The workflow is structured to be auditable, reproducible (fixed seed, persisted splits, mappings, and train-only statistics), and efficient for iteration (CSV→Parquet conversion and cached subsets).

---

### Objectivesof notebook
Given the raw Elliptic2 dataset files, we will implement a pre-processing pipeline to prepare:
- **Parquet copies** of key tables (fast reads, stable dtypes)
- **Node re-indexing** (map raw IDs → contiguous 0..N-1)
- **Subgraph membership mapping** (component/subgraph id per node)
- **Sanity checks**:
  - edges reference existing nodes
  - labels consistent within subgraphs
  - no accidental leakage when scaling
- **Feature statistics** computed on training set only
- **Train/val/test splits** at the **subgraph level**
- **Model-ready artifacts** saved to `processed/`



### Inputs (expected files)
Required:
- `nodes.csv`: labeled nodes participating in subgraphs
- `edges.csv`: edges restricted to labeled subgraphs
- `connected_components.csv`: mapping from node identifier to component/subgraph identifier
- `background_nodes.csv`, `background_edges.csv`: full background graph; used only for advanced context augmentation (not required for initial baselines)


### Outputs (written to `processed/`)
- `processed/parquet/*.parquet`: Parquet copies of the core tables
- `processed/arrays/node_features.npy`: node feature matrix aligned to `nodes.csv` row order
- `processed/arrays/edge_index.npy`: 2×E edge index in contiguous node index space
- `processed/arrays/node_components.npy`: component/subgraph id per node aligned to `nodes.csv`
- `processed/artifacts/node_index.json`: mapping raw node_id → contiguous idx
- `processed/artifacts/feature_columns.json`: ordered list of feature columns used
- `processed/artifacts/subgraph_labels.json`: mapping subgraph_id → label
- `processed/artifacts/splits.json`: persisted train/val/test split ids
- `processed/artifacts/feature_stats_train.json`: training-only feature statistics

---

All random processes (splitting) are controlled by a fixed RNG seed. Split definitions are serialized to disk and should be reused for all experiments to ensure comparability across models.

In [1]:
from __future__ import annotations

import json
import hashlib
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Tuple, List, Optional

import numpy as np
import pandas as pd

from sklearn.model_selection import StratifiedShuffleSplit
from tqdm import tqdm

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 160)

RNG_SEED = 42
np.random.seed(RNG_SEED)

print("Environment initialized.")

Environment initialized.


## 1. Configuration

Everything written by this notebook goes under `processed/`.

Set `DATA_DIR` to the folder containing your CSV files.

In [2]:
# Update this path to your local Elliptic2 directory
DATA_DIR = Path("DATA")

OUT_DIR = Path("DATA/processed")
PARQUET_DIR = OUT_DIR / "parquet"
ARTIFACTS_DIR = OUT_DIR / "artifacts"
ARRAYS_DIR = OUT_DIR / "arrays"

for d in [OUT_DIR, PARQUET_DIR, ARTIFACTS_DIR, ARRAYS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# Expected filenames
FILES = {
    "nodes": DATA_DIR / "nodes.csv",
    "edges": DATA_DIR / "edges.csv",
    "cc": DATA_DIR / "connected_components.csv",
    "bg_nodes": DATA_DIR / "background_nodes.csv",
    "bg_edges": DATA_DIR / "background_edges.csv",
}

present = {k: p for k, p in FILES.items() if p.exists()}
missing = {k: str(p) for k, p in FILES.items() if not p.exists()}

print("Present:", list(present.keys()))
if missing:
    print("Missing (optional where applicable):")
    for k, p in missing.items():
        print(f" - {k}: {p}")

Present: ['nodes', 'edges', 'cc', 'bg_nodes', 'bg_edges']


## 2. Lightweight schema introspection

Lets inspect the first few rows and check:
- node identifier column
- edge endpoint columns (source/target)
- component/subgraph identifier column
- label column (if stored at node-level)


In [None]:
def read_csv_head(path: Path, nrows: int = 5) -> pd.DataFrame:
    return pd.read_csv(path, nrows=nrows, low_memory=False)

nodes_head = read_csv_head(present["nodes"])
edges_head = read_csv_head(present["edges"])
cc_head = read_csv_head(present["cc"])

display(nodes_head.head())
display(edges_head.head())
display(cc_head.head())

Unnamed: 0,clId,ccId
0,515498410,41121
1,630366534,27974
2,903790945,108020
3,449108887,6544
4,877994419,27234


Unnamed: 0,clId1,clId2,txId
0,753456251,753456254,29911377
1,756183927,759736869,51855
2,623574254,622935561,27784128
3,751464959,751464964,76668
4,751464834,751464959,2592471


Unnamed: 0,ccId,ccLabel
0,0,licit
1,1,licit
2,2,licit
3,3,licit
4,4,licit


## 3. Infer key columns

### Rationale
For this Elliptic2 release we can see the main columns are named as follows:

- `nodes.csv` contains node membership: clId (node id) and ccId (subgraph id)

- `edges.csv` contains topology: clId1 and clId2 (edge endpoints), with txId as an optional edge identifier

- `connected_components.csv` contains supervision: ccId and ccLabel (one label per subgraph)

In [None]:
NODE_ID_COL      = "clId"     # nodes.csv
NODES_COMP_COL   = "ccId"     # nodes.csv

SRC_COL          = "clId1"    # edges.csv
DST_COL          = "clId2"    # edges.csv
TX_ID_COL        = "txId"     # edges.csv 

CC_COMP_COL      = "ccId"     # connected_components.csv
CC_LABEL_COL     = "ccLabel"  # connected_components.csv

def require_columns(df: pd.DataFrame, required: list[str], table_name: str) -> None:
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(
            f"{table_name} is missing expected columns: {missing}\n"
            f"Available columns: {df.columns.tolist()}"
        )

require_columns(nodes_head, [NODE_ID_COL, NODES_COMP_COL], "nodes.csv")
require_columns(edges_head, [SRC_COL, DST_COL], "edges.csv")

# this is not a necessity so we can just warn
if TX_ID_COL not in edges_head.columns:
    print(f"Note: {TX_ID_COL} not found in edges.csv (ok if you don't use edge attributes).")

require_columns(cc_head, [CC_COMP_COL, CC_LABEL_COL], "connected_components.csv")

# Cross-table sanity
if NODES_COMP_COL != CC_COMP_COL:
    raise ValueError(
        f"Component column mismatch: nodes uses '{NODES_COMP_COL}' but connected_components uses '{CC_COMP_COL}'."
    )

print("Inferred columns:")
print(f" - nodes.csv: node_id={NODE_ID_COL}, component_id={NODES_COMP_COL}")
print(f" - edges.csv: src={SRC_COL}, dst={DST_COL}, tx_id={TX_ID_COL if TX_ID_COL in edges_head.columns else '(none)'}")
print(f" - connected_components.csv: component_id={CC_COMP_COL}, label={CC_LABEL_COL}")

Inferred columns:
 - nodes.csv: node_id=clId, component_id=ccId
 - edges.csv: src=clId1, dst=clId2
 - connected_components.csv: component_id=ccId, label=ccLabel


## 4. Full table ingestion with dtype controls

We will:
- load the full tables
- coerce ID columns to integers (after validating missingness)
- keep feature columns as numeric types when possible

If coercion fails, we stop

In [None]:
def read_csv_full(path: Path) -> pd.DataFrame:
    return pd.read_csv(path, low_memory=False)

nodes = read_csv_full(present["nodes"])
edges = read_csv_full(present["edges"])
cc = read_csv_full(present["cc"])

print("Shapes:", {"nodes": nodes.shape, "edges": edges.shape, "cc": cc.shape})

# Enforce non-null for identity columns
identity_checks = [
    ("nodes", nodes, NODE_ID_COL),        # clId
    ("nodes", nodes, NODES_COMP_COL),     # ccId
    ("edges", edges, SRC_COL),            # clId1
    ("edges", edges, DST_COL),            # clId2
    ("cc", cc, CC_COMP_COL),              # ccId
    ("cc", cc, CC_LABEL_COL),             # ccLabel
]
for name, df, col in identity_checks:
    na = int(df[col].isna().sum())
    if na:
        raise ValueError(
            f"Non-null violation: {name}.{col} contains {na} missing values. "
            "Fix upstream before proceeding."
        )

# Enforce integer identifiers
nodes[NODE_ID_COL] = nodes[NODE_ID_COL].astype("int64")
nodes[NODES_COMP_COL] = nodes[NODES_COMP_COL].astype("int64")

edges[SRC_COL] = edges[SRC_COL].astype("int64")
edges[DST_COL] = edges[DST_COL].astype("int64")

cc[CC_COMP_COL] = cc[CC_COMP_COL].astype("int64")
# ccLabel is categorical (strings like 'licit', 'illicit', 'unknown'); do NOT cast to int
cc[CC_LABEL_COL] = cc[CC_LABEL_COL].astype("string")

display(nodes.dtypes)
display(edges.dtypes)
display(cc.dtypes)


Shapes: {'nodes': (444521, 2), 'edges': (367137, 3), 'cc': (121810, 2)}


clId    int64
ccId    int64
dtype: object

clId1    int64
clId2    int64
txId     int64
dtype: object

ccId                int64
ccLabel    string[python]
dtype: object

## 5. SHA256 file fingerprints 

In [8]:
def partial_sha256(path: Path, nbytes: int = 5_000_000) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        h.update(f.read(nbytes))
    return h.hexdigest()

fingerprints = {}
for k in ["nodes", "edges", "cc"]:
    p = present[k]
    fingerprints[k] = {"path": str(p), "partial_sha256_first5MB": partial_sha256(p)}

(ARTIFACTS_DIR / "input_fingerprints.json").write_text(json.dumps(fingerprints, indent=2))
print("Saved:", ARTIFACTS_DIR / "input_fingerprints.json")
fingerprints

Saved: DATA\processed\artifacts\input_fingerprints.json


{'nodes': {'path': 'DATA\\nodes.csv',
  'partial_sha256_first5MB': '1fc3f79a211e2544f21490f4b4d9a616314f7f64c3e3478189fc678589e12063'},
 'edges': {'path': 'DATA\\edges.csv',
  'partial_sha256_first5MB': 'c026e367ae38ab6a46806d877c9a98310f0fc28a5bb9e1bef1c050d93d55ca10'},
 'cc': {'path': 'DATA\\connected_components.csv',
  'partial_sha256_first5MB': '79511c49f76110d0feae7feda7e85279bc442efe31f5e7a9f53b2caf0019062b'}}

## 6. Parquet conversion 

Parquet benefits:
- **Faster** reads (columnar)
- **Smaller** on disk (compression)
- **Stable dtypes** across sessions
- Easy to select subsets of columns without reading the entire file

We save Parquet copies to `processed/parquet/`.


In [9]:
nodes.to_parquet(PARQUET_DIR / "nodes.parquet", index=False)
edges.to_parquet(PARQUET_DIR / "edges.parquet", index=False)
cc.to_parquet(PARQUET_DIR / "connected_components.parquet", index=False)

for p in [PARQUET_DIR / "nodes.parquet", PARQUET_DIR / "edges.parquet", PARQUET_DIR / "connected_components.parquet"]:
    print(f"{p.name}: {p.stat().st_size/1e6:.1f} MB")

nodes.parquet: 4.4 MB
edges.parquet: 7.0 MB
connected_components.parquet: 0.8 MB


## 7. Graph integrity checks
We validate:
- Every `src` and `dst` exists in `nodes.csv`.
- Every node in `nodes.csv` has a component ID (if `connected_components.csv` is meant to cover all labeled nodes).

In [None]:
# 7.1 Referential integrity: edges to nodes
node_set = set(nodes[NODE_ID_COL].tolist())  
unknown_src = int((~edges[SRC_COL].isin(node_set)).sum()) 
unknown_dst = int((~edges[DST_COL].isin(node_set)).sum())  

print("Unknown endpoints:", {"unknown_src": unknown_src, "unknown_dst": unknown_dst})

if unknown_src or unknown_dst:
    bad = edges.loc[
        (~edges[SRC_COL].isin(node_set)) | (~edges[DST_COL].isin(node_set)),
        [SRC_COL, DST_COL]
    ].head(20)
    raise ValueError(
        "Referential integrity failure: edges reference clIds not present in nodes.csv. Example:\n"
        + str(bad)
    )

# 7.2 Component completeness: nodes ids all mentioned in connected components
ccid_set = set(cc[CC_COMP_COL].tolist())
missing_ccid = int((~nodes[NODES_COMP_COL].isin(ccid_set)).sum())  
print("Nodes whose ccId is missing in connected_components.csv:", missing_ccid)

if missing_ccid:
    ex = nodes.loc[~nodes[NODES_COMP_COL].isin(ccid_set), [NODE_ID_COL, NODES_COMP_COL]].head(20)
    raise ValueError(
        "Component completeness failure: some nodes have ccId values not present in connected_components.csv. Example:\n"
        + str(ex)
    )

print("Integrity checks passed.")


Unknown endpoints: {'unknown_src': 0, 'unknown_dst': 0}
Nodes whose ccId is missing in connected_components.csv: 0
Integrity checks passed.


## 8. Contiguous node re-indexing

#Raw node IDs are huge and sparse. For model inputs we want:
- compact integer range: `0..N-1`
- stable mapping saved to disk
- fast conversion of edge lists to `edge_index` arrays

We build:
- `node_ids_sorted`: array of raw IDs
- `node2idx`: dict mapping raw ID → compact index

In [12]:
node_ids_sorted = np.sort(nodes[NODE_ID_COL].to_numpy(dtype=np.int64))
node2idx: Dict[int, int] = {int(nid): int(i) for i, nid in enumerate(node_ids_sorted)}

(ARTIFACTS_DIR / "node_index.json").write_text(json.dumps(node2idx))
print("Saved:", ARTIFACTS_DIR / "node_index.json")

src_idx = edges[SRC_COL].map(node2idx).to_numpy(dtype=np.int64)
dst_idx = edges[DST_COL].map(node2idx).to_numpy(dtype=np.int64)

edge_index = np.vstack([src_idx, dst_idx])
assert edge_index.min() >= 0 and edge_index.max() < len(node_ids_sorted)

np.save(ARRAYS_DIR / "edge_index.npy", edge_index)
print("Saved:", ARRAYS_DIR / "edge_index.npy", "shape:", edge_index.shape)

Saved: DATA\processed\artifacts\node_index.json
Saved: DATA\processed\arrays\edge_index.npy shape: (2, 367137)


## 9. Subgraph membership per node

ML libraries represent edges as a 2×E integer matrix:
- first row: sources
- second row: targets

We can map raw IDs through `node2idx` and then validate the range.

In [14]:
# Elliptic2: nodes.csv already contains ccId per node row
node_components = nodes[NODES_COMP_COL].to_numpy(dtype=np.int64)  # NODES_COMP_COL = "ccId"

np.save(ARRAYS_DIR / "node_components.npy", node_components)
print("Saved:", ARRAYS_DIR / "node_components.npy")
print("Unique components:", int(np.unique(node_components).shape[0]))


Saved: DATA\processed\arrays\node_components.npy
Unique components: 121810


## 10. Label integrity 

We assert that for each component id, there is exactly one unique label.

Then we derive:
- `subgraph_labels`: dict {component_id: label}

In [None]:
# Build subgraph label mapping directly from connected_components.csv
subgraph_labels = cc.set_index(CC_COMP_COL)[CC_LABEL_COL].to_dict()  # ccId -> "licit"/"illicit"/"unknown"

# Audit: every component used by nodes must have a label row
comps = node_components
used_ccids = set(np.unique(comps).astype(int).tolist())
labeled_ccids = set(cc[CC_COMP_COL].astype(int).tolist())

missing_labels = sorted(list(used_ccids - labeled_ccids))
if missing_labels:
    print("Components present in nodes.csv but missing in connected_components.csv:", len(missing_labels))
    print("Examples:", missing_labels[:20])
    raise ValueError(
        "Label integrity check failed: some ccIds used by nodes have no ccLabel entry. "
        "Verify connected_components.csv completeness."
    )

# Audit: connected_components may contain ccIds not used by nodes
extra_labels = sorted(list(labeled_ccids - used_ccids))
print("Label mapping size (cc):", len(subgraph_labels))
print("Components used by nodes:", len(used_ccids))
print("Components labeled but unused by nodes:", len(extra_labels))

(ARTIFACTS_DIR / "subgraph_labels.json").write_text(json.dumps(subgraph_labels))
print("Saved:", ARTIFACTS_DIR / "subgraph_labels.json")


Label mapping size (cc): 121810
Components used by nodes: 121810
Components labeled but unused by nodes: 0
Saved: DATA\processed\artifacts\subgraph_labels.json


## 11. Feature matrix construction and missingness policy

Elliptic2 are frequently **binned** (ordinal categories). This affects preprocessing:
- You must preserve them as integers where possible.
- Missing values should be represented as a dedicated bin (e.g., 0 or max+1).
- Converting to float and leaving NaNs causes:
  - slow models
  - broken embeddings
  - inconsistent scaling

We will:
- define `feature_cols` as all columns excluding ID and label
- compute missingness
- choose a missing value policy

In [None]:
# --- Elliptic2 feature construction (node features are in background_nodes.csv) ---
# Fast path: Polars (streaming semi-join)
# Fallback: Pandas chunking with progress-based ETA
# Outputs:
# - processed/parquet/background_nodes_subset.parquet
# - processed/artifacts/feature_columns.json
# - processed/arrays/node_features.npy

import os, time, json
from tqdm import tqdm
import numpy as np
import pandas as pd

# 1) Identify labeled node universe
labeled_clids = nodes[NODE_ID_COL].astype("int64").tolist()
labeled_clid_set = set(labeled_clids)

# 2) Ensure background_nodes.csv exists
if "bg_nodes" not in present:
    raise FileNotFoundError(
        "background_nodes.csv not found. Elliptic2 node features live in background_nodes.csv, "
        "so you need this file to build node_features.npy."
    )

bg_nodes_path = present["bg_nodes"]

# 3) Determine ID column in background_nodes.csv (usually clId)
bg_head = pd.read_csv(bg_nodes_path, nrows=5, low_memory=False)
display(bg_head.head())

BG_NODE_ID_COL = "clId"
if BG_NODE_ID_COL not in bg_head.columns:
    raise ValueError(
        f"Expected '{BG_NODE_ID_COL}' in background_nodes.csv but found: {bg_head.columns.tolist()}"
    )

# 4) If we already cached the subset as Parquet
bg_subset_parquet = PARQUET_DIR / "background_nodes_subset.parquet"
if bg_subset_parquet.exists():
    print(f"Loading cached subset: {bg_subset_parquet}")
    bg_subset = pd.read_parquet(bg_subset_parquet)
else:
    # 5) Create subset (try Polars, else Pandas)
    try:
        import polars as pl

        print("Using Polars streaming semi-join (fast path).")

        # Semi-join keeps only rows whose clId appears in labeled set
        labeled_lazy = pl.DataFrame({BG_NODE_ID_COL: labeled_clids}).lazy()

        bg_subset_pl = (
            pl.scan_csv(str(bg_nodes_path))
              .join(labeled_lazy, on=BG_NODE_ID_COL, how="semi")
              .collect(streaming=True)
        )

        bg_subset = bg_subset_pl.to_pandas()
        print("Polars subset rows:", len(bg_subset))

    except Exception as e:
        print("Polars path unavailable or failed; falling back to Pandas chunking.")
        print("Reason:", repr(e))

        chunk_size = 1_000_000  # tune based on RAM/SSD; 1M is usually safe
        file_size = os.path.getsize(bg_nodes_path)

        kept = []
        bytes_est = 0
        start = time.time()
        last_print = 0.0

        reader = pd.read_csv(bg_nodes_path, chunksize=chunk_size, low_memory=False)

        for chunk in tqdm(reader, desc="Subsetting background_nodes (pandas)"):
            # approximate progress using chunk memory footprint
            bytes_est += int(chunk.memory_usage(deep=True).sum())

            # ensure ID dtype is comparable
            chunk[BG_NODE_ID_COL] = chunk[BG_NODE_ID_COL].astype("int64", copy=False)

            sub = chunk[chunk[BG_NODE_ID_COL].isin(labeled_clid_set)]
            if not sub.empty:
                kept.append(sub)

            elapsed = time.time() - start
            if elapsed - last_print >= 10:  # print every ~10s
                rate = bytes_est / max(elapsed, 1e-9)  # bytes/sec (approx)
                eta_sec = (file_size / max(rate, 1e-9)) - elapsed
                print(f"[ETA] ~{eta_sec/60:.1f} min remaining | ~{rate/1e6:.1f} MB/s (approx)")
                last_print = elapsed

        if not kept:
            raise ValueError("No rows in background_nodes.csv matched labeled clIds. Check ID columns/types.")
        bg_subset = pd.concat(kept, ignore_index=True)

    # 6) Cache subset to Parquet 
    bg_subset.to_parquet(bg_subset_parquet, index=False)
    print(f"Cached subset written to: {bg_subset_parquet} (rows={len(bg_subset)})")

# 7) Define feature columns (exclude the id column)
feature_cols = [c for c in bg_subset.columns if c != BG_NODE_ID_COL]
if len(feature_cols) == 0:
    raise ValueError("No feature columns found in background_nodes subset (only ID column present).")

# 8) Missingness diagnostics (on subset)
missing_counts = bg_subset[feature_cols].isna().sum().sort_values(ascending=False)
missing_rate = (missing_counts / len(bg_subset)).sort_values(ascending=False)
diagnostics = pd.DataFrame({"missing_count": missing_counts, "missing_rate": missing_rate})
display(diagnostics.head(20))

# 9) Missingness policy (binned/ordinal features → fill with reserved bin)
MISSING_FILL_VALUE = 0
features_df = bg_subset[feature_cols].copy().fillna(MISSING_FILL_VALUE)

# Conservative integer coercion / downcast
for c in feature_cols:
    if pd.api.types.is_float_dtype(features_df[c]):
        as_int = features_df[c].astype(np.int64)
        if np.allclose(features_df[c].to_numpy(), as_int.to_numpy()):
            features_df[c] = as_int
    if pd.api.types.is_integer_dtype(features_df[c]):
        features_df[c] = pd.to_numeric(features_df[c], downcast="integer")

# 10) Align features to nodes.csv row order
# Create mapping clId -> feature row (expect 1-to-1 for background_nodes)
bg_subset_indexed = pd.concat([bg_subset[[BG_NODE_ID_COL]], features_df], axis=1).set_index(BG_NODE_ID_COL)

# Assert all labeled nodes have background features
missing_feat = [cl for cl in labeled_clids if cl not in bg_subset_indexed.index]
if missing_feat:
    print("Labeled nodes missing background features:", len(missing_feat))
    print("Examples:", missing_feat[:20])
    raise ValueError(
        "Some labeled clIds have no row in background_nodes.csv (after subsetting). "
        "Verify that background_nodes.csv covers the same clId universe."
    )

# Build X aligned to nodes.csv row order (same row order as nodes)
X = bg_subset_indexed.loc[labeled_clids, feature_cols].to_numpy()

# 11) Persist artifacts
(ARTIFACTS_DIR / "feature_columns.json").write_text(json.dumps(feature_cols))
np.save(ARRAYS_DIR / "node_features.npy", X)

print("Saved:", ARTIFACTS_DIR / "feature_columns.json")
print("Saved:", ARRAYS_DIR / "node_features.npy", "shape:", X.shape)
print("Feature dtype summary:")
display(bg_subset_indexed[feature_cols].dtypes.value_counts())


Unnamed: 0,clId,feat#1,feat#2,feat#3,feat#4,feat#5,feat#6,feat#7,feat#8,feat#9,feat#10,feat#11,feat#12,feat#13,feat#14,feat#15,feat#16,feat#17,feat#18,feat#19,feat#20,feat#21,feat#22,feat#23,feat#24,feat#25,feat#26,feat#27,feat#28,feat#29,feat#30,feat#31,feat#32,feat#33,feat#34,feat#35,feat#36,feat#37,feat#38,feat#39,feat#40,feat#41,feat#42,feat#43
0,284528470,75,68,65,56,56,57,91,42,69,44,69,35,51,5,17,13,16,72,84,68,82,7,85,62,67,2,1,1,4,3,6,59,66,8,85,58,49,2,1,0,1,0,0
1,563997622,41,25,0,4,4,0,50,35,0,37,0,0,0,7,0,15,0,67,0,38,44,7,48,35,32,1,0,0,0,0,0,50,47,9,42,39,26,1,0,0,0,0,0
2,1577338,92,91,88,89,87,80,96,52,85,54,85,46,73,17,54,24,51,82,89,66,80,5,87,66,75,2,2,2,4,3,5,74,85,5,90,70,69,2,2,2,5,2,3
3,725905686,50,36,0,0,6,0,17,46,0,47,0,0,0,1,0,10,0,44,0,56,0,7,0,47,0,2,0,1,0,0,0,62,0,9,0,45,0,2,0,0,0,0,0
4,96159393,99,99,99,99,96,89,93,51,88,53,88,53,81,18,53,24,48,83,90,62,80,3,99,59,70,1,2,1,4,2,4,73,85,7,90,66,64,2,1,4,6,2,4


Using Polars streaming semi-join (fast path).


  .collect(streaming=True)


Polars subset rows: 444521
Cached subset written to: DATA\processed\parquet\background_nodes_subset.parquet (rows=444521)


Unnamed: 0,missing_count,missing_rate
feat#1,0,0.0
feat#2,0,0.0
feat#3,0,0.0
feat#4,0,0.0
feat#5,0,0.0
feat#6,0,0.0
feat#7,0,0.0
feat#8,0,0.0
feat#9,0,0.0
feat#10,0,0.0


Saved: DATA\processed\artifacts\feature_columns.json
Saved: DATA\processed\arrays\node_features.npy shape: (444521, 43)
Feature dtype summary:


int8    43
Name: count, dtype: int64

## 12. Subgraph-level stratified splits (train/validation/test)

1. **Correct unit of evaluation**  
   Each subgraph is one training instance. Splitting at node-level would leak subgraph structure across splits.

2. **Class imbalance**  
   Elliptic2 has strong imbalance between licit and suspicious. Stratification ensures each split contains
   representative class proportions.

We will:
- build a dataframe of one row per subgraph: (subgraph_id, label)
- stratified split into train/test
- stratified split train into train/val
- save `splits.json` with lists of subgraph IDs

In [19]:
subgraph_df = pd.DataFrame({
    "subgraph_id": list(subgraph_labels.keys()),
    "label": list(subgraph_labels.values()),
}).sort_values("subgraph_id").reset_index(drop=True)

display(subgraph_df["label"].value_counts())

sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=RNG_SEED)
trainval_idx, test_idx = next(sss1.split(subgraph_df["subgraph_id"], subgraph_df["label"]))

trainval = subgraph_df.iloc[trainval_idx].reset_index(drop=True)
test = subgraph_df.iloc[test_idx].reset_index(drop=True)

sss2 = StratifiedShuffleSplit(n_splits=1, test_size=0.125, random_state=RNG_SEED)
train_idx, val_idx = next(sss2.split(trainval["subgraph_id"], trainval["label"]))

train = trainval.iloc[train_idx].reset_index(drop=True)
val = trainval.iloc[val_idx].reset_index(drop=True)

splits = {
    "seed": RNG_SEED,
    "train": train["subgraph_id"].astype(int).tolist(),
    "val": val["subgraph_id"].astype(int).tolist(),
    "test": test["subgraph_id"].astype(int).tolist(),
}

(ARTIFACTS_DIR / "splits.json").write_text(json.dumps(splits))
print("Saved:", ARTIFACTS_DIR / "splits.json")
print({k: len(v) for k, v in splits.items() if k in {"train","val","test"}})

label
licit         119047
suspicious      2763
Name: count, dtype: int64

Saved: DATA\processed\artifacts\splits.json
{'train': 85267, 'val': 12181, 'test': 24362}


## 13. Training-only feature statistics (leakage-safe diagnostics)

Any scaling parameters must be computed on **training data only** to avoid leakage.

We compute per-feature:
- min, max
- mean, std
- unique count (useful for binned/categorical features)
- missing count (after fill)

In [20]:
train_sg = set(splits["train"])
train_mask = np.array([int(comp) in train_sg for comp in comps], dtype=bool)

X_train = X[train_mask]
print("Training node count:", X_train.shape[0])

feature_stats = {}
for j, col in enumerate(feature_cols):
    col_data = np.asarray(X_train[:, j])
    feature_stats[col] = {
        "min": float(np.nanmin(col_data)),
        "max": float(np.nanmax(col_data)),
        "mean": float(np.nanmean(col_data)),
        "std": float(np.nanstd(col_data)),
        "unique": int(len(np.unique(col_data))),
    }

(ARTIFACTS_DIR / "feature_stats_train.json").write_text(json.dumps(feature_stats))
print("Saved:", ARTIFACTS_DIR / "feature_stats_train.json")

# Display top-cardinality features (often informative for embeddings vs scaling decisions)
top_unique = sorted(feature_stats.items(), key=lambda kv: kv[1]["unique"], reverse=True)[:12]
display(pd.DataFrame([{**{"feature": k}, **v} for k, v in top_unique]))

Training node count: 310253
Saved: DATA\processed\artifacts\feature_stats_train.json


Unnamed: 0,feature,min,max,mean,std,unique
0,feat#19,0.0,98.0,18.971697,30.509784,98
1,feat#23,0.0,96.0,18.516163,27.171481,97
2,feat#35,0.0,96.0,8.538167,22.252337,96
3,feat#20,3.0,96.0,44.941996,13.192262,94
4,feat#7,0.0,98.0,34.971736,25.411256,91
5,feat#21,0.0,94.0,22.506574,28.853333,91
6,feat#11,0.0,93.0,12.985434,20.246851,90
7,feat#9,0.0,93.0,12.993818,20.262075,89
8,feat#25,0.0,86.0,16.362424,21.951214,87
9,feat#33,0.0,89.0,6.868768,16.919134,85


## Summary of produced artifacts

### Artifacts produced

- `arrays/edge_index.npy`: graph topology in contiguous index space
- `arrays/node_components.npy`: ccId membership per node (aligned to nodes.csv)
- `arrays/node_features.npy`: node feature matrix X aligned to labelled clIds
- `artifacts/subgraph_labels.json`: ccId → ccLabel mapping
- `artifacts/splits.json`: persisted train/val/test ccId partitions
- `artifacts/feature_stats_train.json`: train-only feature diagnostics


### What is now validated

- The dataset is explicitly subgraph-labelled (ccLabel at component level), so the prediction unit is ccId.
- Structural integrity holds for the labelled subset: edges are closed over labelled nodes and component/label coverage is complete.
- Feature coverage is complete for labelled nodes via background subsetting, producing a consistent (444521, 43) node-feature matrix.

### Immediate implications for modelling
Class imbalance is severe (~2.2% suspicious by component), so evaluation should prioritise PR-AUC / minority-class F1 and use stratified, component-level splits.

Features are discretised/binned integers; embeddings (per-feature or per-bin) are a natural baseline to compare against scaling-based approaches.