# From Rolling Compare To Data Requirements (Delta Sharing vs Metadata)

This notebook starts from the downstream goal: analyze routing trajectory differences in rolling compare.

Backward reasoning:
1. The compare task needs per-transaction path behavior and amount facts.
2. Amount facts can be fetched from Delta Sharing tables.
3. Intra-transaction AMM/CLOB execution order cannot be reliably reconstructed from Delta Sharing fields alone.
4. Transaction metadata (`AffectedNodes`) is required to reconstruct execution sequence.

This notebook demonstrates that flow using one target tx selected from rolling compare outputs.



In [1]:
from __future__ import annotations

import json
import urllib.request
from typing import Any

import pandas as pd
from pyspark.sql import SparkSession, functions as F

pd.set_option('display.max_columns', 200)
pd.set_option('display.width', 200)


In [2]:
# Config
from pathlib import Path

REPO_ROOT = Path.cwd().resolve().parent if Path.cwd().name == 'notebooks' else Path.cwd().resolve()
PROFILE = str(REPO_ROOT / 'data' / 'config.share')
SHARE = 'ripple-ubri-share'
SCHEMA = 'ripplex'
TARGET_DATE = '2025-12-15'
RPC_URL = 'https://s1.ripple.com:51234/'
RLUSD_HEX = '524C555344000000000000000000000000000000'
XRP = 'XRP'
TARGET_TX_HASH = '6AA281DD3ED17153F5149EE82965563D8B44BAB69226717EAD4E68326CF96A64'
LEDGER_WINDOW = 300
DELTA_SHARING_SPARK_PACKAGE = 'io.delta:delta-sharing-spark_2.12:3.1.0'
CACHE_FORCE_REFRESH = False
SHOW_SOURCE_DATA_IN_SECTION4 = True

def ds_url(table: str) -> str:
    return f"{PROFILE}#{SHARE}.{SCHEMA}.{table}"

def _fetch_tx_metadata(tx_hash: str, rpc_url: str) -> dict[str, Any]:
    payload = {
        'method': 'tx',
        'params': [{'transaction': tx_hash, 'binary': False}],
    }
    req = urllib.request.Request(
        rpc_url,
        data=json.dumps(payload).encode('utf-8'),
        headers={'Content-Type': 'application/json'},
        method='POST',
    )
    with urllib.request.urlopen(req, timeout=30) as resp:
        return json.loads(resp.read().decode('utf-8'))


## 1) Read metadata context for fixed tx_hash

Use metadata first to get precise ledger/time context for the target tx.



In [3]:
from datetime import datetime, timezone

tx_hash = TARGET_TX_HASH.strip()
if not tx_hash:
    raise RuntimeError('TARGET_TX_HASH is empty. Please set a valid hash in config.')

tx_response = _fetch_tx_metadata(tx_hash, RPC_URL)
tx_result = tx_response.get('result') or {}

target_ledger_index = tx_result.get('ledger_index')
ripple_date = tx_result.get('date')

if target_ledger_index is None:
    raise RuntimeError('metadata does not include ledger_index for TARGET_TX_HASH')

target_ledger_index = int(target_ledger_index)
ledger_window = LEDGER_WINDOW
ledger_start = target_ledger_index - ledger_window
ledger_end = target_ledger_index + ledger_window

if ripple_date is not None:
    tx_dt = datetime.fromtimestamp(int(ripple_date) + 946684800, tz=timezone.utc)
    cache_date = str(tx_dt.date())
else:
    tx_dt = None
    cache_date = TARGET_DATE

print('chosen tx_hash:', tx_hash)
print('target_ledger_index:', target_ledger_index)
print('ledger_window:', f'[{ledger_start}, {ledger_end}]')
print('metadata tx datetime (UTC):', tx_dt)
print('cache_date for offers/facts:', cache_date)


chosen tx_hash: 6AA281DD3ED17153F5149EE82965563D8B44BAB69226717EAD4E68326CF96A64
target_ledger_index: 100894775
ledger_window: [100894475, 100895075]
metadata tx datetime (UTC): 2025-12-15 14:59:22+00:00
cache_date for offers/facts: 2025-12-15


## 2) Reconstruct full intra-tx trajectory from metadata

Use `AffectedNodes` order as the transaction-internal step sequence and expose detailed per-step fields.




In [4]:
def _offer_order_key(body: dict[str, Any]) -> str:
    ledger_index = body.get('LedgerIndex')
    if ledger_index:
        return str(ledger_index)

    final = body.get('FinalFields', {}) or body.get('NewFields', {}) or {}
    account = final.get('Account')
    seq = final.get('Sequence')
    if account is not None and seq is not None:
        return f"{account}:{seq}"

    return 'unknown-offer'

def _amount_to_text(v: Any) -> str | None:
    if v is None:
        return None
    if isinstance(v, dict):
        ccy = v.get('currency')
        val = v.get('value')
        iss = v.get('issuer')
        if ccy is not None and val is not None:
            return f"{ccy}:{val}:{iss or ''}"
        return json.dumps(v, ensure_ascii=False)
    return str(v)

def _extract_step_detail(node: dict[str, Any], step_idx: int) -> dict[str, Any] | None:
    node_kind = next((k for k in ('CreatedNode', 'ModifiedNode', 'DeletedNode') if k in node), None)
    if node_kind is None:
        return None

    body = node[node_kind]
    entry = body.get('LedgerEntryType')
    prev = body.get('PreviousFields', {}) or {}
    final = body.get('FinalFields', {}) or body.get('NewFields', {}) or {}

    if entry == 'AMM':
        source = str(body.get('LedgerIndex') or final.get('Account') or 'amm-pool')
        return {
            'step_idx': step_idx,
            'kind': 'AMM',
            'source': source,
            'node_type': node_kind,
            'ledger_entry_type': entry,
            'offer_gets_before': None,
            'offer_gets_after': None,
            'offer_pays_before': None,
            'offer_pays_after': None,
        }

    if entry == 'AccountRoot' and ('AMMID' in final or 'AMMID' in prev):
        source = str(final.get('AMMID') or prev.get('AMMID') or final.get('Account') or 'amm-accountroot')
        return {
            'step_idx': step_idx,
            'kind': 'AMM',
            'source': source,
            'node_type': node_kind,
            'ledger_entry_type': entry,
            'offer_gets_before': None,
            'offer_gets_after': None,
            'offer_pays_before': None,
            'offer_pays_after': None,
        }

    if entry == 'Offer':
        gets_before = _amount_to_text(prev.get('TakerGets'))
        gets_after = _amount_to_text(final.get('TakerGets'))
        pays_before = _amount_to_text(prev.get('TakerPays'))
        pays_after = _amount_to_text(final.get('TakerPays'))

        changed_gets = gets_before is not None and gets_after is not None and gets_before != gets_after
        changed_pays = pays_before is not None and pays_after is not None and pays_before != pays_after
        removed_offer = node_kind == 'DeletedNode'

        if changed_gets or changed_pays or removed_offer:
            return {
                'step_idx': step_idx,
                'kind': 'CLOB',
                'source': _offer_order_key(body),
                'node_type': node_kind,
                'ledger_entry_type': entry,
                'offer_gets_before': gets_before,
                'offer_gets_after': gets_after,
                'offer_pays_before': pays_before,
                'offer_pays_after': pays_after,
            }

    return None

def _dedupe_adjacent_same_source(steps: list[dict[str, Any]]) -> list[dict[str, Any]]:
    out: list[dict[str, Any]] = []
    for s in steps:
        if not out or not (out[-1]['kind'] == s['kind'] and out[-1]['source'] == s['source']):
            out.append(s)
    return out

affected = (tx_result.get('meta') or {}).get('AffectedNodes') or []
raw_steps = [s for i, n in enumerate(affected) for s in [_extract_step_detail(n, i)] if s is not None]
segments = _dedupe_adjacent_same_source(raw_steps)

raw_df = pd.DataFrame(raw_steps)
seg_df = pd.DataFrame(segments)

# Keep metadata dataframes for downstream sections; no inline display here.


## 3) Build local cache for full-tx facts and query by tx_hash

Cache day/window slices once, then query target tx locally without pair-only AMM filtering.




In [5]:
import pyspark

if not pyspark.__version__.startswith('3.'):
    raise RuntimeError(
        f"pyspark {pyspark.__version__} detected. Please use Spark 3.x for Delta Sharing connector."
    )

spark = globals().get('spark') or (
    SparkSession.builder
    .appName('delta_sharing_tx_demo')
    .config('spark.jars.packages', DELTA_SHARING_SPARK_PACKAGE)
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
    .config('spark.sql.shuffle.partitions', '8')
    .getOrCreate()
)

cache_root = REPO_ROOT / 'artifacts' / 'delta_sharing_cache' / f'date_{cache_date}'
cache_root.mkdir(parents=True, exist_ok=True)

swaps_cache = cache_root / 'fact_amm_swaps_fulltx_window.parquet'
offers_cache = cache_root / 'offers_fact_tx_fulltx_day.parquet'
fees_cache = cache_root / 'fact_amm_fees_fulltx_window.parquet'

def _read_remote(table: str):
    return spark.read.format('deltaSharing').load(ds_url(table))

def _build_cache_if_needed(path: Path, builder_fn) -> str:
    if CACHE_FORCE_REFRESH or (not path.exists()):
        df = builder_fn()
        df.write.mode('overwrite').parquet(str(path))
        return 'rebuilt'
    return 'reused'

sw_mode = _build_cache_if_needed(
    swaps_cache,
    lambda: (_read_remote('fact_amm_swaps')
        .where((F.col('ledger_index') >= F.lit(int(ledger_start))) & (F.col('ledger_index') <= F.lit(int(ledger_end))))
        .select('transaction_hash', 'ledger_index', 'transaction_index', 'close_time_datetime',
                'asset_in_currency', 'asset_out_currency', 'asset_in_value', 'asset_out_value', 'amm_account')
    ),
)

of_mode = _build_cache_if_needed(
    offers_cache,
    lambda: (_read_remote('offers_fact_tx')
        .where(F.col('close_time_date') == F.lit(cache_date))
        .select('tx_hash', 'close_time', 'close_time_date', 'transaction_type',
                'offer_base_currency', 'offer_counter_currency', 'offer_base_amount', 'offer_counter_amount',
                'base_currency', 'counter_currency', 'base_amount', 'counter_amount', 'price', 'amm_account', 'account')
    ),
)

fe_mode = _build_cache_if_needed(
    fees_cache,
    lambda: (_read_remote('fact_amm_fees')
        .where((F.col('ledger_index') >= F.lit(int(ledger_start))) & (F.col('ledger_index') <= F.lit(int(ledger_end))))
        .select('transaction_hash', 'ledger_index', 'transaction_index', 'close_time_datetime',
                'trading_fee', 'discounted_fee', 'amm_account', 'amm_asset_currency', 'amm_asset2_currency')
    ),
)

sw_local = spark.read.parquet(str(swaps_cache)).where(F.col('transaction_hash') == F.lit(tx_hash))
of_by_tx = spark.read.parquet(str(offers_cache)).where(F.col('tx_hash') == F.lit(tx_hash))
fe_local = spark.read.parquet(str(fees_cache)).where(F.col('transaction_hash') == F.lit(tx_hash))

print('pyspark version:', pyspark.__version__)
print('spark package:', DELTA_SHARING_SPARK_PACKAGE)
print('cache_date:', cache_date)
print('swaps cache:', swaps_cache, '|', sw_mode)
print('offers cache:', offers_cache, '|', of_mode)
print('fees cache:', fees_cache, '|', fe_mode)
print('swaps rows (local by tx):', sw_local.count())
print('offers rows (tx_hash == tx):', of_by_tx.count())
print('fees rows (local by tx):', fe_local.count())

print('\n[AMM swap legs from Delta Sharing for this tx]')
display(sw_local.orderBy(F.col('transaction_index').asc()).toPandas())

print('\n[CLOB offers_fact_tx rows where tx_hash == target tx]')
display(of_by_tx.orderBy(F.col('close_time').asc_nulls_last()).toPandas())

print('\n[AMM fees rows from Delta Sharing for this tx]')
display(fe_local.orderBy(F.col('transaction_index').asc()).toPandas())


26/02/17 12:29:20 WARN Utils: Your hostname, MacBook-Pro-5.local resolves to a loopback address: 127.0.0.1; using 192.168.31.179 instead (on interface en0)
26/02/17 12:29:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/homebrew/anaconda3/envs/xrpl-amm-clob/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/guohanze/.ivy2/cache
The jars for the packages stored in: /Users/guohanze/.ivy2/jars
io.delta#delta-sharing-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-78c130e7-ed5b-4ced-a3f3-519c5e6241a3;1.0
	confs: [default]
	found io.delta#delta-sharing-spark_2.12;3.1.0 in central
	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found io.delta#delta-sharing-client_2.12;1.0.4 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found commons-logging#commons-logging;1.2 in central
	found commons-codec#commons-codec;1.11 in central
:: resolution report :: resolve 852ms :: artifacts dl 39ms
	:: modules in use:
	commons-codec#commons-codec;1.11 from central in [default]
	commons-logging#commons-logging;1.2 from central in [default]
	io.delta

pyspark version: 3.5.3
spark package: io.delta:delta-sharing-spark_2.12:3.1.0
cache_date: 2025-12-15
swaps cache: /Users/guohanze/Documents/Codebase/xrpl-amm-clob/artifacts/delta_sharing_cache/date_2025-12-15/fact_amm_swaps_fulltx_window.parquet | reused
offers cache: /Users/guohanze/Documents/Codebase/xrpl-amm-clob/artifacts/delta_sharing_cache/date_2025-12-15/offers_fact_tx_fulltx_day.parquet | reused
fees cache: /Users/guohanze/Documents/Codebase/xrpl-amm-clob/artifacts/delta_sharing_cache/date_2025-12-15/fact_amm_fees_fulltx_window.parquet | reused


                                                                                

swaps rows (local by tx): 1
offers rows (tx_hash == tx): 13
fees rows (local by tx): 1

[AMM swap legs from Delta Sharing for this tx]


Unnamed: 0,transaction_hash,ledger_index,transaction_index,close_time_datetime,asset_in_currency,asset_out_currency,asset_in_value,asset_out_value,amm_account
0,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,100894775,4,2025-12-15 14:59:22,XRP,524C555344000000000000000000000000000000,526.257185,1020.051024,rhWTXC2m2gGGA9WozUaoMm6kLAVPb1tcS3



[CLOB offers_fact_tx rows where tx_hash == target tx]


Unnamed: 0,tx_hash,fulfilled_by,close_time,close_time_date,transaction_type,offer_base_currency,offer_counter_currency,offer_base_amount,offer_counter_amount,base_currency,counter_currency,base_amount,counter_amount,price,amm_account,account
0,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,2860.75869,1475.72848,0.515852,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN
1,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,1910.99952,983.81899,0.514819,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN
2,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,2238.938272,1155.76,0.516209,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN
3,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,11640.0,6000.0,0.515464,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN
4,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,9.883,5.099133,0.51595,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN
5,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,5.42432,2.796041,0.515464,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN
6,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,9.883,5.091257,0.515153,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN
7,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,8.412263,4.336217,0.515464,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN
8,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,38.828862,20.0,0.515081,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN
9,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,Offer,2025-12-15 14:59:22,2025-12-15,OfferCreate,XRP,524C555344000000000000000000000000000000,25000000000,48424.326635966,524C555344000000000000000000000000000000,XRP,106.0,54.611306,0.515201,,rneAz9jqhCgGyBmDtCwn3Cu5zqgwuWT3mN



[AMM fees rows from Delta Sharing for this tx]


Unnamed: 0,transaction_hash,ledger_index,transaction_index,close_time_datetime,trading_fee,discounted_fee,amm_account,amm_asset_currency,amm_asset2_currency
0,6AA281DD3ED17153F5149EE82965563D8B44BAB6922671...,100894775,4,2025-12-15 14:59:22,0.00529,0.000529,rhWTXC2m2gGGA9WozUaoMm6kLAVPb1tcS3,524C555344000000000000000000000000000000,XRP


## 4) Single-tx dual replay (Real engine vs Single-path model)

Set `SHOW_SOURCE_DATA_IN_SECTION4 = True` in config to display metadata/DS source rows used by this section.


In [6]:
import glob
import re
from decimal import Decimal, ROUND_FLOOR

from xrpl_router.amm import AMM
from xrpl_router.book_step import RouterQuoteView
from xrpl_router.core import IOUAmount, XRPAmount
from xrpl_router.core.constants import IOU_QUANTUM, XRP_QUANTUM
from xrpl_router.core.datatypes import Segment
from xrpl_router.core.fmt import amount_to_decimal, quantize_down
from xrpl_router.clob import from_levels as clob_from_levels

RUSD_HEX = '524C555344000000000000000000000000000000'
XRP = 'XRP'
IOU_IN_TRANSFER_RATE = Decimal('1')

def _to_decimal(x: Any) -> Decimal:
    try:
        return Decimal(str(x))
    except Exception:
        return Decimal('0')

def _fmt_user_request_amount(a: Any) -> str:
    if a is None:
        return 'None'
    if isinstance(a, str):
        return f"XRP: {Decimal(a) / Decimal(1000000)}"
    if isinstance(a, dict):
        cur = str(a.get('currency') or '')
        val = a.get('value')
        pretty_cur = 'rUSD' if cur == RUSD_HEX else cur
        return f"{pretty_cur}: {val}"
    return str(a)

def _load_compare_row(txh: str) -> tuple[pd.Series | None, str | None]:
    compare_root = REPO_ROOT / 'artifacts' / 'compare' / 'rlusd_xrp'
    patterns = [
        str(compare_root / 'ledger_*' / 'compare_rolling_*pairscoped_v2.parquet'),
        str(compare_root / 'ledger_*' / 'compare_rolling_*_v1.parquet'),
    ]
    txu = txh.upper()
    cols = [
        'transaction_hash', 'ledger_index', 'transaction_index', 'direction',
        'used_prebook_ledger', 'real_in', 'real_out', 'model_in', 'model_out',
        'diff_in', 'path_sig_real', 'path_sig_model', 'real_clob_legs',
    ]
    for pat in patterns:
        for path in sorted(glob.glob(pat), reverse=True):
            try:
                df = pd.read_parquet(path, columns=cols)
            except Exception:
                continue
            m = df[df['transaction_hash'].astype(str).str.upper() == txu]
            if len(m) > 0:
                return m.iloc[0], path
    return None, None

def _load_offers_for_ledger(book_path: Path, ledger_idx: int) -> list[dict[str, Any]]:
    with open(book_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            obj = json.loads(line)
            li = obj.get('ledger_index') or obj.get('ledger') or obj.get('ledger_current_index')
            if li is None or int(li) != int(ledger_idx):
                continue
            offers = obj.get('offers')
            if offers is None:
                offers = (obj.get('result') or {}).get('offers', [])
            return offers or []
    return []

def _direction_to_currencies(direction: str) -> tuple[str, str]:
    d = str(direction or '')
    if 'XRP->rUSD' in d:
        return XRP, RUSD_HEX
    if 'rUSD->XRP' in d:
        return RUSD_HEX, XRP
    raise RuntimeError(f'Unexpected direction label: {direction}')

def _parse_amount(a: Any) -> tuple[str, Decimal]:
    if isinstance(a, str):
        return XRP, Decimal(a) * XRP_QUANTUM
    if isinstance(a, dict):
        return str(a.get('currency')), Decimal(str(a.get('value')))
    raise ValueError(f'Unsupported amount payload: {a!r}')

def _amt_floor(cur: str, v: Decimal):
    if cur == XRP:
        drops = int((v / XRP_QUANTUM).to_integral_value(rounding=ROUND_FLOOR))
        return XRPAmount(value=drops)
    units = int((v / IOU_QUANTUM).to_integral_value(rounding=ROUND_FLOOR))
    return IOUAmount.from_components(units, -15)

def _book_offer_id(o: dict[str, Any]) -> str:
    if o.get('index'):
        return str(o.get('index'))
    if o.get('Account') is not None and o.get('Sequence') is not None:
        return f"{o.get('Account')}:{o.get('Sequence')}"
    return 'unknown-offer-id'

def _build_tiers_with_offer_ids(offers: list[dict[str, Any]], in_cur: str, out_cur: str) -> list[tuple[Decimal, Decimal, str]]:
    tiers: list[tuple[Decimal, Decimal, str]] = []
    for o in offers:
        gets_cur, gets_val = _parse_amount(o.get('TakerGets'))
        pays_cur, pays_val = _parse_amount(o.get('TakerPays'))

        if 'taker_gets_funded' in o:
            try:
                fg_cur, fg_val = _parse_amount(o.get('taker_gets_funded'))
                if fg_cur == gets_cur:
                    gets_val = fg_val
            except Exception:
                pass
        if 'taker_pays_funded' in o:
            try:
                fp_cur, fp_val = _parse_amount(o.get('taker_pays_funded'))
                if fp_cur == pays_cur:
                    pays_val = fp_val
            except Exception:
                pass

        try:
            if 'owner_funds' in o and Decimal(str(o.get('owner_funds'))) <= 0:
                continue
        except Exception:
            pass

        if gets_cur == out_cur and pays_cur == in_cur:
            out_max_dec = gets_val
            in_need_dec = pays_val
        elif pays_cur == out_cur and gets_cur == in_cur:
            out_max_dec = pays_val
            in_need_dec = gets_val
        else:
            continue

        if out_max_dec <= 0 or in_need_dec <= 0:
            continue

        if in_cur != XRP and IOU_IN_TRANSFER_RATE > 0:
            in_need_dec = in_need_dec * IOU_IN_TRANSFER_RATE

        out_q = XRP_QUANTUM if out_cur == XRP else IOU_QUANTUM
        out_max_dec = quantize_down(out_max_dec, out_q)
        if out_max_dec <= 0:
            continue

        tiers.append((out_max_dec, in_need_dec, _book_offer_id(o)))

    tiers.sort(key=lambda t: (t[0] / t[1]), reverse=True)
    return tiers

def _tiers_to_segments(tiers: list[tuple[Decimal, Decimal, str]], in_cur: str, out_cur: str) -> list[Segment]:
    if not tiers:
        return []

    levels: list[tuple[Decimal, Decimal]] = []
    for out_dec, in_dec, _offer_id in tiers:
        if out_dec <= 0 or in_dec <= 0:
            continue
        levels.append((out_dec / in_dec, out_dec))

    segs_base = clob_from_levels(
        levels,
        in_is_xrp=(in_cur == XRP),
        out_is_xrp=(out_cur == XRP),
    )
    if len(segs_base) != len(tiers):
        raise RuntimeError(
            f'src clob.from_levels length mismatch: segs={len(segs_base)} tiers={len(tiers)}'
        )

    out: list[Segment] = []
    for seg_b, (_out_dec, _in_dec, offer_id) in zip(segs_base, tiers):
        out.append(
            Segment(
                src='CLOB',
                quality=seg_b.quality,
                out_max=seg_b.out_max,
                in_at_out_max=seg_b.in_at_out_max,
                raw_quality=seg_b.raw_quality or seg_b.quality,
                source_id=offer_id,
            )
        )
    return out

def _load_tx_rows(manifest_inputs: dict[str, str], txh: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    swaps = pd.read_parquet(REPO_ROOT / manifest_inputs['amm_swaps'])
    fees = pd.read_parquet(REPO_ROOT / manifest_inputs['amm_fees'])
    clob = pd.read_parquet(REPO_ROOT / manifest_inputs['clob_legs'])
    txu = txh.upper()
    sw = swaps[swaps['transaction_hash'].astype(str).str.upper() == txu].copy()
    fe = fees[fees['transaction_hash'].astype(str).str.upper() == txu].copy()
    clob_key = 'tx_hash' if 'tx_hash' in clob.columns else 'transaction_hash'
    cl = clob[clob[clob_key].astype(str).str.upper() == txu].copy()
    return sw, fe, cl

def _orient_reserves(sw_row: pd.Series, in_cur: str, out_cur: str) -> tuple[Decimal, Decimal]:
    a = str(sw_row['amm_asset_currency'])
    b = str(sw_row['amm_asset2_currency'])
    a_bal = Decimal(str(sw_row['amm_asset_balance_before']))
    b_bal = Decimal(str(sw_row['amm_asset2_balance_before']))
    if in_cur == a and out_cur == b:
        return a_bal, b_bal
    if in_cur == b and out_cur == a:
        return b_bal, a_bal
    raise RuntimeError(f'Pool currencies do not match tx direction: in={in_cur}, out={out_cur}, pool=({a},{b})')

def _short_offer_id(v: Any) -> str | None:
    if v is None:
        return None
    s = str(v)
    return s[:4] if s else None

def _parse_meta_amt_text(s: Any) -> Decimal | None:
    if s is None:
        return None
    txt = str(s)
    if ':' in txt:
        parts = txt.split(':')
        if len(parts) >= 2:
            try:
                return Decimal(parts[1])
            except Exception:
                return None
        return None
    try:
        return Decimal(txt) / Decimal(1_000_000)
    except Exception:
        return None

def _pair_key(out_d: Decimal, in_d: Decimal, scale: int = 12) -> tuple[int, int]:
    q = Decimal(1).scaleb(-scale)
    return (
        int((out_d / q).to_integral_value(rounding=ROUND_FLOOR)),
        int((in_d / q).to_integral_value(rounding=ROUND_FLOOR)),
    )

def _to_dec_or_zero(v: Any) -> Decimal:
    if v is None:
        return Decimal('0')
    try:
        return Decimal(str(v))
    except Exception:
        try:
            return Decimal(str(amount_to_decimal(v)))
        except Exception:
            return Decimal('0')

def _sum_track_by_kind(df: pd.DataFrame, kind: str) -> tuple[Decimal, Decimal]:
    if df is None or len(df) == 0:
        return Decimal('0'), Decimal('0')
    m = df[df['segment_kind'].astype(str).str.upper() == kind.upper()]
    in_sum = sum((_to_dec_or_zero(v) for v in m.get('in_take', [])), Decimal('0'))
    out_sum = sum((_to_dec_or_zero(v) for v in m.get('out_take', [])), Decimal('0'))
    return in_sum, out_sum

def _fmt_dec(v: Decimal | None, digits: int = 6) -> str | None:
    if v is None:
        return None
    q = Decimal(1).scaleb(-digits)
    return str(v.quantize(q))

def _quality_from_in_out(out_v: Decimal | None, in_v: Decimal | None, digits: int = 9) -> str | None:
    if out_v is None or in_v is None or in_v == 0:
        return None
    return _fmt_dec(out_v / in_v, digits)

def _infer_model_node_type(kind: str, offer_id_full: str | None, out_take_raw: Any, out_cap_map: dict[str, Decimal]) -> str | None:
    k = str(kind).upper()
    if k == 'AMM':
        return 'ModifiedNode(model-AMM)'
    if k != 'CLOB':
        return None
    if offer_id_full is None:
        return 'ModifiedNode(model-CLOB?)'
    cap = out_cap_map.get(str(offer_id_full))
    if cap is None:
        return 'ModifiedNode(model-CLOB?)'
    out_dec = Decimal(str(amount_to_decimal(out_take_raw)))
    tol = Decimal('1e-15')
    return 'DeletedNode(model)' if abs(out_dec - cap) <= tol else 'ModifiedNode(model)'

TRACK_COLUMNS = ['segment_no', 'segment_kind', 'node_type', 'clob_offer_id', 'out_take', 'in_take', 'avg_quality']

real_track_df = seg_df[['step_idx', 'kind', 'source', 'node_type']].copy()
real_track_df = real_track_df.rename(columns={'kind': 'segment_kind', 'source': 'segment_source'})
real_track_df.insert(0, 'segment_no', list(range(1, len(real_track_df) + 1)))
real_track_df['clob_offer_id_full'] = [s if k == 'CLOB' else None for k, s in zip(real_track_df['segment_kind'], real_track_df['segment_source'])]
real_track_df['clob_offer_id'] = [_short_offer_id(v) for v in real_track_df['clob_offer_id_full']]
real_track_df['out_take'] = None
real_track_df['in_take'] = None
real_track_df['avg_quality'] = None
real_track_df = real_track_df[['segment_no', 'segment_kind', 'node_type', 'clob_offer_id', 'out_take', 'in_take', 'avg_quality', 'clob_offer_id_full']]

tx_type = tx_result.get('TransactionType')
cmp_row, cmp_path = _load_compare_row(tx_hash)
if cmp_row is None:
    print('compare row not found for this tx in artifacts/compare')
else:
    real_in = _to_decimal(cmp_row.get('real_in'))
    real_out = _to_decimal(cmp_row.get('real_out'))
    model_in = _to_decimal(cmp_row.get('model_in'))
    model_out = _to_decimal(cmp_row.get('model_out'))
    extra_in = _to_decimal(cmp_row.get('diff_in'))
    rel_bps = (extra_in / real_in * Decimal('10000')) if real_in != 0 else Decimal('0')
    gap_pct = (extra_in / real_in * Decimal('100')) if real_in != 0 else Decimal('0')

    used_prebook = cmp_row.get('used_prebook_ledger')

    print('[Context]')
    print(f"tx_hash: {tx_hash}")
    print(f"transaction_type: {tx_type}")
    print(f"direction: {cmp_row.get('direction')}")
    if tx_type == 'OfferCreate':
        print(f"user_request_gets: {_fmt_user_request_amount(tx_result.get('TakerGets'))}")
        print(f"user_request_pays: {_fmt_user_request_amount(tx_result.get('TakerPays'))}")
        print('mapping: user_request_* are request bounds; real/model *_in/*_out are executed replay totals.')

    print(f"used_prebook_ledger: {int(used_prebook) if used_prebook is not None and not pd.isna(used_prebook) else None}")

    model_track_df = pd.DataFrame()

    if cmp_path is None or used_prebook is None or pd.isna(used_prebook):
        raise RuntimeError('Missing compare source or used_prebook_ledger; cannot run DS-only real metric fill.')

    cmp_dir = Path(cmp_path).parent
    manifest_path = cmp_dir / 'manifest.json'
    if not manifest_path.exists():
        raise RuntimeError('manifest.json not found for compare output; cannot run DS-only real metric fill.')

    manifest = json.loads(manifest_path.read_text())
    inputs = manifest.get('inputs', {})
    required_inputs = ['amm_swaps', 'amm_fees', 'clob_legs', 'book_gets_xrp', 'book_gets_rusd']
    for req in required_inputs:
        if req not in inputs:
            raise RuntimeError(f'manifest inputs missing required key: {req}')

    in_cur, out_cur = _direction_to_currencies(str(cmp_row.get('direction')))

    # DS-only fill for REAL in/out/quality
    sw_tx, fe_tx, clob_tx = _load_tx_rows(inputs, tx_hash)
    n_real_clob = int((real_track_df['segment_kind'] == 'CLOB').sum())
    n_real_amm = int((real_track_df['segment_kind'] == 'AMM').sum())

    if len(clob_tx) != n_real_clob:
        raise RuntimeError(
            f'DS CLOB rows mismatch metadata segment count: ds={len(clob_tx)}, real_clob_segments={n_real_clob}. DS-only mode refuses fallback.'
        )
    if len(sw_tx) != n_real_amm:
        raise RuntimeError(
            f'DS AMM rows mismatch metadata segment count: ds={len(sw_tx)}, real_amm_segments={n_real_amm}. DS-only mode refuses fallback.'
        )

    # Build DS CLOB numeric legs; map to metadata offer ids by (out,in) key (DS has no offer-id column).
    ds_clob_rows: list[dict[str, Decimal]] = []
    for _, r in clob_tx.iterrows():
        bc = str(r.get('base_currency'))
        cc = str(r.get('counter_currency'))
        ba = Decimal(str(r.get('base_amount')))
        ca = Decimal(str(r.get('counter_amount')))
        if bc == out_cur and cc == in_cur:
            out_d, in_d = ba, ca
        elif cc == out_cur and bc == in_cur:
            out_d, in_d = ca, ba
        else:
            raise RuntimeError(f'DS CLOB row currency mismatch for tx direction: base={bc}, counter={cc}, direction={in_cur}->{out_cur}')
        ds_clob_rows.append({'out': out_d, 'in': in_d})

    ds_clob_bucket: dict[tuple[int, int], list[dict[str, Decimal]]] = {}
    for row in ds_clob_rows:
        k = _pair_key(row['out'], row['in'], scale=12)
        ds_clob_bucket.setdefault(k, []).append(row)

    amm_pairs: list[tuple[Decimal, Decimal]] = []
    sw_tx = sw_tx.sort_values([c for c in ['ledger_index', 'transaction_index'] if c in sw_tx.columns]).reset_index(drop=True)
    for _, r in sw_tx.iterrows():
        ain_c = str(r.get('asset_in_currency'))
        aout_c = str(r.get('asset_out_currency'))
        ain_v = Decimal(str(r.get('asset_in_value')))
        aout_v = Decimal(str(r.get('asset_out_value')))
        if ain_c == in_cur and aout_c == out_cur:
            in_d, out_d = ain_v, aout_v
        elif ain_c == out_cur and aout_c == in_cur:
            in_d, out_d = aout_v, ain_v
        else:
            raise RuntimeError(f'DS AMM row currency mismatch for tx direction: asset_in={ain_c}, asset_out={aout_c}, direction={in_cur}->{out_cur}')
        amm_pairs.append((out_d, in_d))

    # Assign DS numeric values to real trajectory rows in metadata order.
    a_i = 0
    for idx in real_track_df.index:
        kind = str(real_track_df.at[idx, 'segment_kind']).upper()
        if kind == 'CLOB':
            md_out_before = _parse_meta_amt_text(seg_df.iloc[idx].get('offer_gets_before'))
            md_out_after = _parse_meta_amt_text(seg_df.iloc[idx].get('offer_gets_after'))
            md_in_before = _parse_meta_amt_text(seg_df.iloc[idx].get('offer_pays_before'))
            md_in_after = _parse_meta_amt_text(seg_df.iloc[idx].get('offer_pays_after'))
            if None in (md_out_before, md_out_after, md_in_before, md_in_after):
                raise RuntimeError('Cannot build metadata key for CLOB segment; DS-only mode refuses positional fallback.')
            key = _pair_key(md_out_before - md_out_after, md_in_before - md_in_after, scale=12)
            cand = ds_clob_bucket.get(key) or []
            if not cand:
                raise RuntimeError(f'DS CLOB leg not found for metadata key={key}; DS-only mode refuses positional fallback.')
            row = cand.pop(0)
            out_d, in_d = row['out'], row['in']
        elif kind == 'AMM':
            out_d, in_d = amm_pairs[a_i]
            a_i += 1
        else:
            out_d, in_d = None, None

        real_track_df.at[idx, 'out_take'] = _fmt_dec(out_d, 6) if out_d is not None else None
        real_track_df.at[idx, 'in_take'] = _fmt_dec(in_d, 6) if in_d is not None else None
        real_track_df.at[idx, 'avg_quality'] = _quality_from_in_out(out_d, in_d, 9)

    # Build model table
    book_xrp = REPO_ROOT / inputs.get('book_gets_xrp', '')
    book_rusd = REPO_ROOT / inputs.get('book_gets_rusd', '')
    selected_book = book_rusd if (in_cur == XRP and out_cur == RUSD_HEX) else book_xrp
    offers = _load_offers_for_ledger(selected_book, int(used_prebook))
    tiers = _build_tiers_with_offer_ids(offers, in_cur, out_cur)
    segs = _tiers_to_segments(tiers, in_cur, out_cur)

    tier_out_cap_map: dict[str, Decimal] = {}
    for seg_i in segs:
        if seg_i.source_id is not None:
            tier_out_cap_map[str(seg_i.source_id)] = Decimal(str(amount_to_decimal(seg_i.out_max)))

    if len(sw_tx) == 0:
        raise RuntimeError('No DS AMM swap row found for tx; cannot run model replay in DS-only mode.')

    sw_row = sw_tx.iloc[0]
    fee_rate = Decimal('0')
    if len(fe_tx) > 0 and 'trading_fee' in fe_tx.columns and pd.notna(fe_tx.iloc[0].get('trading_fee')):
        fee_rate = Decimal(str(fe_tx.iloc[0].get('trading_fee')))

    x_res, y_res = _orient_reserves(sw_row, in_cur, out_cur)
    amm = AMM(x_res, y_res, fee_rate, x_is_xrp=(in_cur == XRP), y_is_xrp=(out_cur == XRP))
    target_out_amt = _amt_floor(out_cur, real_out)
    q = RouterQuoteView(lambda: segs, amm=amm).preview_out(target_out_amt)

    model_slice_df = pd.DataFrame(q.get('slices', []) or [])
    if not model_slice_df.empty:
        model_track_df = model_slice_df.copy()
        model_track_df.insert(0, 'segment_no', list(range(1, len(model_track_df) + 1)))
        model_track_df['segment_kind'] = [str(x).upper() for x in model_track_df['src']]
        model_track_df['clob_offer_id_full'] = [
            sid if str(kind).upper() == 'CLOB' else None
            for kind, sid in zip(model_track_df['segment_kind'], model_track_df.get('source_id', [None] * len(model_track_df)))
        ]
        model_track_df['clob_offer_id'] = [_short_offer_id(v) for v in model_track_df['clob_offer_id_full']]
        model_track_df['node_type'] = [
            _infer_model_node_type(kind, oid, out_raw, tier_out_cap_map)
            for kind, oid, out_raw in zip(
                model_track_df['segment_kind'],
                model_track_df['clob_offer_id_full'],
                model_track_df.get('out_take', [None] * len(model_track_df)),
            )
        ]
        out_raw_dec = [Decimal(str(amount_to_decimal(v))) if v is not None else None for v in model_track_df.get('out_take', [None] * len(model_track_df))]
        in_raw_dec = [Decimal(str(amount_to_decimal(v))) if v is not None else None for v in model_track_df.get('in_take', [None] * len(model_track_df))]
        model_track_df['out_take'] = [_fmt_dec(v, 6) if v is not None else None for v in out_raw_dec]
        model_track_df['in_take'] = [_fmt_dec(v, 6) if v is not None else None for v in in_raw_dec]
        model_track_df['avg_quality'] = [_quality_from_in_out(o, i, 9) for o, i in zip(out_raw_dec, in_raw_dec)]
        for c in TRACK_COLUMNS:
            if c not in model_track_df.columns:
                model_track_df[c] = None
        model_track_df = model_track_df[['segment_no', 'segment_kind', 'node_type', 'clob_offer_id', 'out_take', 'in_take', 'avg_quality', 'clob_offer_id_full']]

    print('[Real trajectory table]')
    display(real_track_df[TRACK_COLUMNS].style.hide(axis='index'))

    print('[Model trajectory table]')
    if not model_track_df.empty:
        display(model_track_df[TRACK_COLUMNS].style.hide(axis='index'))
    else:
        display(pd.DataFrame(columns=TRACK_COLUMNS).style.hide(axis='index'))

    real_offer_ids = [x for x in real_track_df['clob_offer_id_full'].tolist() if x is not None]
    model_offer_ids = [x for x in model_track_df['clob_offer_id_full'].tolist() if x is not None] if not model_track_df.empty else []
    overlap_count = len(set(real_offer_ids) & set(model_offer_ids))

    real_amm_in, _ = _sum_track_by_kind(real_track_df, 'AMM')
    model_amm_in, _ = _sum_track_by_kind(model_track_df, 'AMM')

    real_offer_unique = len(real_offer_ids) == len(set(real_offer_ids))
    model_offer_unique = len(model_offer_ids) == len(set(model_offer_ids))

    def _pct(part: Decimal, total: Decimal) -> str:
        if total == 0:
            return '0.00%'
        return f"{_fmt_dec((part / total) * Decimal('100'), 2)}%"

    print('[Comparison]')
    print(f"path: real={cmp_row.get('path_sig_real')} | model={cmp_row.get('path_sig_model')}")
    # print(f"amm_count_compare: real={real_amm_count} | model={model_amm_count}")
    print(
        f"amm_in_compare: real={_fmt_dec(real_amm_in, 6)} [{_pct(real_amm_in, real_in)}] | "
        f"model={_fmt_dec(model_amm_in, 6)} [{_pct(model_amm_in, model_in)}]"
    )
    print(
        f"offer_id_compare: real_unique={real_offer_unique} | model_unique={model_offer_unique} | "
        f"model_overlap_with_real={int(overlap_count)}"
    )
    print(f"segment_count: real={len(real_track_df)} | model={len(model_track_df)}")
    print(f"in_gap_pct_vs_real: model={_fmt_dec(gap_pct, 9)}%")

    if SHOW_SOURCE_DATA_IN_SECTION4:
        export_dir = REPO_ROOT / 'artifacts' / 'notebook_exports' / f'source_data_tx_{tx_hash[:12].lower()}'
        export_dir.mkdir(parents=True, exist_ok=True)

        # metadata source: keep one full raw tx response JSON
        (export_dir / 'metadata__tx_response.json').write_text(
            json.dumps(tx_response, ensure_ascii=False, indent=2)
        )

        # delta sharing source rows actually used in section 4
        sw_tx.to_csv(export_dir / 'delta_sharing__fact_amm_swaps__tx_rows.csv', index=False)
        clob_tx.to_csv(export_dir / 'delta_sharing__clob_legs_input__tx_rows.csv', index=False)
        fe_tx.to_csv(export_dir / 'delta_sharing__fact_amm_fees__tx_rows.csv', index=False)

        print(f"source_data_saved_to: {export_dir}")









[Context]
tx_hash: 6AA281DD3ED17153F5149EE82965563D8B44BAB69226717EAD4E68326CF96A64
transaction_type: OfferCreate
direction: XRP->rUSD
user_request_gets: XRP: 25000
user_request_pays: rUSD: 48424.326635966
mapping: user_request_* are request bounds; real/model *_in/*_out are executed replay totals.
used_prebook_ledger: 100894774
[Real trajectory table]


segment_no,segment_kind,node_type,clob_offer_id,out_take,in_take,avg_quality
1,CLOB,DeletedNode,1626,2860.75869,1475.72848,1.938540002
2,CLOB,DeletedNode,27CC,1910.99952,983.81899,1.942429999
3,CLOB,DeletedNode,2D60,2238.938272,1155.76,1.9372
4,CLOB,DeletedNode,3695,11640.0,6000.0,1.94
5,CLOB,DeletedNode,4A8B,9.883,5.099133,1.938172627
6,CLOB,DeletedNode,4D56,5.42432,2.796041,1.940000128
7,CLOB,DeletedNode,655A,9.883,5.091257,1.941170913
8,CLOB,DeletedNode,9075,8.412263,4.336217,1.940000363
9,AMM,ModifiedNode,,1020.051024,526.257185,1.938312773
10,CLOB,DeletedNode,AE48,38.828862,20.0,1.941443123


[Model trajectory table]


segment_no,segment_kind,node_type,clob_offer_id,out_take,in_take,avg_quality
1,CLOB,DeletedNode(model),27CC,1910.99952,983.818991,1.942429997
2,CLOB,DeletedNode(model),AE48,38.828862,20.0,1.941443123
3,CLOB,DeletedNode(model),655A,9.883,5.091257,1.941170913
4,CLOB,DeletedNode(model),C576,106.0,54.611306,1.940990021
5,CLOB,DeletedNode(model),FABB,1909.08107,983.81899,1.940479996
6,CLOB,DeletedNode(model),4D56,5.42432,2.796041,1.940000128
7,CLOB,DeletedNode(model),EBC1,3571.946277,1841.209421,1.94
8,CLOB,DeletedNode(model),3695,11640.0,6000.0,1.94
9,CLOB,DeletedNode(model),9075,8.412263,4.336218,1.939999916
10,AMM,ModifiedNode(model-AMM),,856.202188,441.673727,1.938540003


[Comparison]
path: real=AMM+CLOB | model=AMM+CLOB
amm_in_compare: real=526.257185 [4.02%] | model=526.257187 [4.02%]
offer_id_compare: real_unique=True | model_unique=True | model_overlap_with_real=13
segment_count: real=14 | model=16
in_gap_pct_vs_real: model=-0.035263127%
source_data_saved_to: /Users/guohanze/Documents/Codebase/xrpl-amm-clob/artifacts/notebook_exports/source_data_tx_6aa281dd3ed1


## 5) What-if Test: Force AMM To One Execution

This is an isolated what-if analysis. It does not modify the main replay pipeline outputs.



In [7]:
from decimal import Decimal
from typing import Any
import pandas as pd

from xrpl_router.amm import AMM
from xrpl_router.book_step import RouterQuoteView
from xrpl_router.core import IOUAmount, XRPAmount
from xrpl_router.core.fmt import amount_to_decimal

required_names = [
    'TRACK_COLUMNS', 'real_track_df', 'tx_hash', 'REPO_ROOT',
    '_load_compare_row', '_direction_to_currencies', '_load_tx_rows',
    '_build_tiers_with_offer_ids', '_tiers_to_segments', '_orient_reserves',
    '_amt_floor', '_short_offer_id', '_fmt_dec', '_quality_from_in_out',
]
missing = [n for n in required_names if n not in globals()]
if missing:
    raise RuntimeError(f'Please run Section 4 first. Missing globals: {missing}')

cmp_row, cmp_path = _load_compare_row(tx_hash)
if cmp_row is None:
    raise RuntimeError('compare row not found; cannot run section 5 what-if')
used_prebook = cmp_row.get('used_prebook_ledger')
if used_prebook is None or pd.isna(used_prebook):
    raise RuntimeError('used_prebook_ledger is missing; cannot run section 5 what-if')
if cmp_path is None:
    raise RuntimeError('compare source path is missing; cannot run section 5 what-if')

manifest_path = Path(cmp_path).parent / 'manifest.json'
if not manifest_path.exists():
    raise RuntimeError('manifest.json not found for compare output')
manifest = json.loads(manifest_path.read_text())
inputs = manifest.get('inputs', {})
for k in ['amm_swaps', 'amm_fees', 'clob_legs', 'book_gets_xrp', 'book_gets_rusd']:
    if k not in inputs:
        raise RuntimeError(f'manifest inputs missing key: {k}')

in_cur, out_cur = _direction_to_currencies(str(cmp_row.get('direction')))
book_xrp = REPO_ROOT / inputs.get('book_gets_xrp', '')
book_rusd = REPO_ROOT / inputs.get('book_gets_rusd', '')
selected_book = book_rusd if (in_cur == XRP and out_cur == globals().get('RUSD_HEX')) else book_xrp
offers = _load_offers_for_ledger(selected_book, int(used_prebook))
tiers = _build_tiers_with_offer_ids(offers, in_cur, out_cur)
segs = _tiers_to_segments(tiers, in_cur, out_cur)

tier_out_cap_map: dict[str, Decimal] = {}
for seg_i in segs:
    if seg_i.source_id is not None:
        tier_out_cap_map[str(seg_i.source_id)] = Decimal(str(amount_to_decimal(seg_i.out_max)))

sw_tx, fe_tx, _clob_tx = _load_tx_rows(inputs, tx_hash)
if len(sw_tx) == 0:
    raise RuntimeError('No DS AMM swap row found for tx; cannot run what-if')
sw_row = sw_tx.iloc[0]
fee_rate = Decimal('0')
if len(fe_tx) > 0 and 'trading_fee' in fe_tx.columns and pd.notna(fe_tx.iloc[0].get('trading_fee')):
    fee_rate = Decimal(str(fe_tx.iloc[0].get('trading_fee')))

x_res, y_res = _orient_reserves(sw_row, in_cur, out_cur)
amm_obj = AMM(x_res, y_res, fee_rate, x_is_xrp=(in_cur == XRP), y_is_xrp=(out_cur == XRP))
target_out_amt = _amt_floor(out_cur, Decimal(str(cmp_row.get('real_out'))))

# What-if replay: engine-internal one-shot AMM constraint (AMM allowed at most once).
def _preview_out_one_shot_amm(view: RouterQuoteView, out_req: Any):
    try:
        segs_local = list(view._provider())
    except Exception:
        segs_local = []

    need = out_req
    total_out = out_req - out_req
    if segs_local:
        total_in = segs_local[0].in_at_out_max - segs_local[0].in_at_out_max
    else:
        total_in = XRPAmount(0) if in_cur == 'XRP' else IOUAmount.zero()

    shadow_amm = view._amm.clone() if view._amm is not None else None
    slices = []
    amm_locked = False

    for _ in range(64):
        if need.is_zero():
            break
        shadow_for_iter = None if amm_locked else shadow_amm
        filled, spent, next_segs, amm_used, _tier_q, amm_dx, amm_dy, itr_trace = view._pick_iteration(
            segs_local, need, shadow_for_iter
        )

        for t in itr_trace:
            rec = {
                'src': t.get('src'),
                'source_id': t.get('source_id'),
                'out_take': t.get('take_out'),
                'in_take': t.get('take_in'),
                'avg_quality': t.get('quality'),
            }
            slices.append(rec)
            if str(t.get('src')).upper() == 'AMM':
                amm_locked = True

        if filled.is_zero():
            break

        total_in = spent if total_in.is_zero() else (total_in + spent)
        total_out = filled if total_out.is_zero() else (total_out + filled)
        need = out_req - total_out
        segs_local = next_segs

        if (not amm_locked) and amm_used and (shadow_amm is not None) and (not amm_dx.is_zero() or not amm_dy.is_zero()):
            shadow_amm.apply_fill_st(amm_dx, amm_dy)

    return {
        'slices': slices,
        'summary': {'total_out': total_out, 'total_in': total_in},
    }

one_view = RouterQuoteView(lambda: segs, amm=amm_obj)
one_quote = _preview_out_one_shot_amm(one_view, target_out_amt)
one_slices = one_quote.get('slices', []) or []

def _build_model_track_from_slices(slices: list[dict[str, Any]]) -> pd.DataFrame:
    d = pd.DataFrame(slices)
    if d.empty:
        return pd.DataFrame(columns=TRACK_COLUMNS + ['clob_offer_id_full'])
    d.insert(0, 'segment_no', list(range(1, len(d) + 1)))
    d['segment_kind'] = [str(x).upper() for x in d['src']]
    d['clob_offer_id_full'] = [
        sid if str(kind).upper() == 'CLOB' else None
        for kind, sid in zip(d['segment_kind'], d.get('source_id', [None] * len(d)))
    ]
    d['clob_offer_id'] = [_short_offer_id(v) for v in d['clob_offer_id_full']]
    d['node_type'] = [
        _infer_model_node_type(kind, oid, out_raw, tier_out_cap_map)
        for kind, oid, out_raw in zip(
            d['segment_kind'],
            d['clob_offer_id_full'],
            d.get('out_take', [None] * len(d)),
        )
    ]

    out_raw = [Decimal(str(amount_to_decimal(v))) if v is not None else None for v in d.get('out_take', [None] * len(d))]
    in_raw = [Decimal(str(amount_to_decimal(v))) if v is not None else None for v in d.get('in_take', [None] * len(d))]
    d['out_take'] = [_fmt_dec(v, 6) if v is not None else None for v in out_raw]
    d['in_take'] = [_fmt_dec(v, 6) if v is not None else None for v in in_raw]
    d['avg_quality'] = [_quality_from_in_out(o, i, 9) for o, i in zip(out_raw, in_raw)]

    for c in TRACK_COLUMNS:
        if c not in d.columns:
            d[c] = None
    return d[['segment_no', 'segment_kind', 'node_type', 'clob_offer_id', 'out_take', 'in_take', 'avg_quality', 'clob_offer_id_full']]

model_track_one_amm = _build_model_track_from_slices(one_slices)

# Enforce one-shot AMM out_take to align with real AMM out_take for this experiment.
# Recompute AMM in_take from the same AMM curve so in/out/quality remain internally consistent.
real_amm_out_vals = [Decimal(str(v)) for v in real_track_df.loc[real_track_df['segment_kind'].astype(str).str.upper() == 'AMM', 'out_take'] if v is not None]
if len(real_amm_out_vals) == 1 and not model_track_one_amm.empty:
    amm_mask = model_track_one_amm['segment_kind'].astype(str).str.upper() == 'AMM'
    if int(amm_mask.sum()) == 1:
        idx_amm = model_track_one_amm[amm_mask].index[0]
        aligned_out = real_amm_out_vals[0]

        aligned_out_amt = _amt_floor(out_cur, aligned_out)
        amm_for_align = amm_obj.clone()
        aligned_in_amt = amm_for_align.swap_in_given_out_st(aligned_out_amt)
        aligned_in = Decimal(str(amount_to_decimal(aligned_in_amt)))

        model_track_one_amm.at[idx_amm, 'out_take'] = _fmt_dec(aligned_out, 6)
        model_track_one_amm.at[idx_amm, 'in_take'] = _fmt_dec(aligned_in, 6)
        model_track_one_amm.at[idx_amm, 'avg_quality'] = _quality_from_in_out(aligned_out, aligned_in, 9)

# Enforce total out == real_out by adjusting model rows safely.
# Adjust from front to back, prefer non-AMM first, and allow zeroed rows to be dropped.
real_out_total = Decimal(str(cmp_row.get('real_out')))
if not model_track_one_amm.empty:
    out_vals = [Decimal(str(v)) for v in model_track_one_amm.get('out_take', []) if v is not None]
    model_out_total = sum(out_vals, Decimal('0'))
    out_delta = model_out_total - real_out_total
    tol = Decimal('0.000001')

    if abs(out_delta) > tol:
        non_amm = model_track_one_amm.index[
            model_track_one_amm['segment_kind'].astype(str).str.upper() != 'AMM'
        ].tolist()
        amm = model_track_one_amm.index[
            model_track_one_amm['segment_kind'].astype(str).str.upper() == 'AMM'
        ].tolist()
        candidate_idxs = non_amm + amm
        remaining = out_delta
        rows_to_drop: list[int] = []

        for idx_adj in candidate_idxs:
            if abs(remaining) <= tol:
                break

            old_out = Decimal(str(model_track_one_amm.at[idx_adj, 'out_take']))
            old_in = Decimal(str(model_track_one_amm.at[idx_adj, 'in_take']))

            if remaining > 0:
                cut = min(remaining, max(Decimal('0'), old_out))
                if cut <= 0:
                    continue
                new_out = old_out - cut
                remaining -= cut
            else:
                add = -remaining
                new_out = old_out + add
                remaining = Decimal('0')

            if new_out <= tol:
                rows_to_drop.append(idx_adj)
                continue

            ratio_in_per_out = (old_in / old_out) if old_out != 0 else None
            new_in = (new_out * ratio_in_per_out) if ratio_in_per_out is not None else old_in

            model_track_one_amm.at[idx_adj, 'out_take'] = _fmt_dec(new_out, 6)
            model_track_one_amm.at[idx_adj, 'in_take'] = _fmt_dec(new_in, 6)
            model_track_one_amm.at[idx_adj, 'avg_quality'] = _quality_from_in_out(new_out, new_in, 9)

        if rows_to_drop:
            model_track_one_amm = model_track_one_amm.drop(index=rows_to_drop).reset_index(drop=True)
            model_track_one_amm['segment_no'] = list(range(1, len(model_track_one_amm) + 1))

        if abs(remaining) > tol:
            raise RuntimeError(
                f'Cannot enforce total-out alignment exactly; residual={_fmt_dec(remaining, 9)}'
            )

# Metrics: only what-if vs real
real_in = Decimal(str(cmp_row.get('real_in')))
one_in = sum((Decimal(str(v)) for v in model_track_one_amm.get('in_take', []) if v is not None), Decimal('0'))

def _pct_gap(curr: Decimal, ref: Decimal) -> str:
    if ref == 0:
        return '0.00%'
    return f"{_fmt_dec(((curr - ref) / ref) * Decimal('100'), 9)}%"

def _seq_tokens(df: pd.DataFrame) -> list[str]:
    out = []
    for _, r in df.iterrows():
        k = str(r.get('segment_kind') or '').upper()
        if k == 'CLOB':
            out.append(f"CLOB:{r.get('clob_offer_id_full')}")
        elif k == 'AMM':
            out.append('AMM')
    return out

def _lcs_len(a: list[str], b: list[str]) -> int:
    n, m = len(a), len(b)
    dp = [[0] * (m + 1) for _ in range(n + 1)]
    for i in range(1, n + 1):
        ai = a[i - 1]
        row = dp[i]
        prev = dp[i - 1]
        for j in range(1, m + 1):
            if ai == b[j - 1]:
                row[j] = prev[j - 1] + 1
            else:
                row[j] = max(prev[j], row[j - 1])
    return dp[n][m]

real_tokens = _seq_tokens(real_track_df)
one_tokens = _seq_tokens(model_track_one_amm)
one_lcs = _lcs_len(real_tokens, one_tokens)

real_amm_count = int((real_track_df['segment_kind'].astype(str).str.upper() == 'AMM').sum())
one_amm_count = int((model_track_one_amm['segment_kind'].astype(str).str.upper() == 'AMM').sum()) if not model_track_one_amm.empty else 0

real_offer_ids = [
    str(v) for v in real_track_df.loc[
        (real_track_df['segment_kind'].astype(str).str.upper() == 'CLOB') & real_track_df['clob_offer_id_full'].notna(),
        'clob_offer_id_full',
    ].tolist()
]
one_offer_ids = [
    str(v) for v in model_track_one_amm.loc[
        (model_track_one_amm['segment_kind'].astype(str).str.upper() == 'CLOB') & model_track_one_amm['clob_offer_id_full'].notna(),
        'clob_offer_id_full',
    ].tolist()
] if not model_track_one_amm.empty else []

real_offer_unique = len(real_offer_ids) == len(set(real_offer_ids))
one_offer_unique = len(one_offer_ids) == len(set(one_offer_ids))
one_overlap_count = len(set(real_offer_ids) & set(one_offer_ids))

print('[Context]')
print('experiment: engine-internal constraint, AMM can be used at most once')
print(f'amm_count: real={real_amm_count} | whatif_model={one_amm_count}')
print('scope: section 5 is isolated; section 4 outputs stay unchanged')
print('constraint tweak: AMM out_take aligned to real; total out aligned front-to-back with zero-row drop')

print('[Real trajectory table]')
display(real_track_df[TRACK_COLUMNS].style.hide(axis='index'))

print('[What-if model trajectory table]')
display(model_track_one_amm[TRACK_COLUMNS].style.hide(axis='index'))

print('[Comparison]')
print(f'order_alignment_lcs_ratio: whatif={one_lcs}/{max(len(real_tokens),1)}')
print(
    f'offer_id_compare: real_unique={real_offer_unique} | whatif_unique={one_offer_unique} | '
    f'whatif_overlap_with_real={one_overlap_count}'
)
print(f'in_gap_pct_vs_real: whatif={_pct_gap(one_in, real_in)}')




[Context]
experiment: engine-internal constraint, AMM can be used at most once
amm_count: real=1 | whatif_model=1
scope: section 5 is isolated; section 4 outputs stay unchanged
constraint tweak: AMM out_take aligned to real; total out aligned front-to-back with zero-row drop
[Real trajectory table]


segment_no,segment_kind,node_type,clob_offer_id,out_take,in_take,avg_quality
1,CLOB,DeletedNode,1626,2860.75869,1475.72848,1.938540002
2,CLOB,DeletedNode,27CC,1910.99952,983.81899,1.942429999
3,CLOB,DeletedNode,2D60,2238.938272,1155.76,1.9372
4,CLOB,DeletedNode,3695,11640.0,6000.0,1.94
5,CLOB,DeletedNode,4A8B,9.883,5.099133,1.938172627
6,CLOB,DeletedNode,4D56,5.42432,2.796041,1.940000128
7,CLOB,DeletedNode,655A,9.883,5.091257,1.941170913
8,CLOB,DeletedNode,9075,8.412263,4.336217,1.940000363
9,AMM,ModifiedNode,,1020.051024,526.257185,1.938312773
10,CLOB,DeletedNode,AE48,38.828862,20.0,1.941443123


[What-if model trajectory table]


segment_no,segment_kind,node_type,clob_offer_id,out_take,in_take,avg_quality
1,CLOB,DeletedNode(model),27CC,1747.150684,899.466486,1.942429997
2,CLOB,DeletedNode(model),AE48,38.828862,20.0,1.941443123
3,CLOB,DeletedNode(model),655A,9.883,5.091257,1.941170913
4,CLOB,DeletedNode(model),C576,106.0,54.611306,1.940990021
5,CLOB,DeletedNode(model),FABB,1909.08107,983.81899,1.940479996
6,CLOB,DeletedNode(model),4D56,5.42432,2.796041,1.940000128
7,CLOB,DeletedNode(model),EBC1,3571.946277,1841.209421,1.94
8,CLOB,DeletedNode(model),3695,11640.0,6000.0,1.94
9,CLOB,DeletedNode(model),9075,8.412263,4.336218,1.939999916
10,AMM,ModifiedNode(model-AMM),,1020.051024,526.256901,1.93831382


[Comparison]
order_alignment_lcs_ratio: whatif=5/14
offer_id_compare: real_unique=True | whatif_unique=True | whatif_overlap_with_real=13
in_gap_pct_vs_real: whatif=0.001880381%


### Observation

- The two track tables below are the primary outputs: `real_track_df` and `model_track_df`.
- Compare segment order and CLOB `offer_id` directly between real and model paths.




## Conclusion

- For this notebook, the key deliverable is segment-by-segment trajectory comparison for one tx.
- `real_track_df` comes from metadata order (`AffectedNodes`).
- `model_track_df` comes from model replay slices on the same pre-book snapshot.


