In [None]:
import math
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
@dataclass
class VenuePoint:
    ts: datetime
    venue: str
    price: float
    liquidity: float

@dataclass
class BenchmarkPoint:
    ts: datetime
    benchmark_price: float
    total_liq: float
    staleness_flag: bool
    thin_liquidity_flag: bool
    flash_loan_flag: bool


In [None]:
def generate_synthetic_series(
    start: datetime,
    n_points: int,
    base_price: float = 2000.0,
    drift_per_step: float = 0.2,
    noise_std: float = 3.0,
):
    venues = ["CEX_A", "CEX_B", "DEX_POOL"]
    rows = []

    price = base_price
    flash_start = int(n_points * 0.55)
    flash_end = flash_start + 3

    for i in range(n_points):
        price += drift_per_step + np.random.normal(0, noise_std)
        ts = start + timedelta(seconds=i * 10)

        for v in venues:
            p = price
            if v == "CEX_A":
                p += np.random.normal(0, 1.5)
                liq = np.random.uniform(800_000, 1_200_000)
            elif v == "CEX_B":
                p += np.random.normal(0, 2.0)
                liq = np.random.uniform(400_000, 800_000)
            else:
                p += np.random.normal(0, 3.0)
                liq = np.random.uniform(150_000, 350_000)
                if flash_start <= i <= flash_end:
                    p *= 1.12
                    liq = np.random.uniform(30_000, 60_000)

            rows.append(
                {
                    "ts": ts,
                    "venue": v,
                    "price": p,
                    "liquidity": liq,
                    "idx": i,
                }
            )
    return pd.DataFrame(rows)


In [None]:
def compute_liquidity_weighted_benchmark(df_slice: pd.DataFrame):
    liq_sum = df_slice["liquidity"].sum()
    if liq_sum <= 0:
        return math.nan, 0.0
    weights = df_slice["liquidity"] / liq_sum
    bench = float((weights * df_slice["price"]).sum())
    return bench, float(liq_sum)


def detect_staleness(current_ts, last_update_ts, max_staleness_seconds=40):
    if last_update_ts is None:
        return False
    return (current_ts - last_update_ts).total_seconds() > max_staleness_seconds


def detect_thin_liquidity(total_liq, liq_history, quantile_threshold=0.2):
    if len(liq_history) < 10:
        return False
    q = float(np.quantile(liq_history, quantile_threshold))
    return total_liq < q


def detect_flash_loan_pattern(bench_history, spike_threshold_pct=4.0):
    if len(bench_history) < 3:
        return False
    prev_prev, prev, current = bench_history[-3:]
    if any(math.isnan(x) for x in (prev_prev, prev, current)):
        return False
    jump_pct = (current - prev_prev) / prev_prev * 100.0
    revert_toward_prev = abs(prev - prev_prev) < abs(current - prev_prev)
    return jump_pct >= spike_threshold_pct and revert_toward_prev


In [None]:
start = datetime.utcnow().replace(microsecond=0)
raw = generate_synthetic_series(start=start, n_points=60)

bench_rows = []
last_update_ts = None
liq_history = []
bench_history = []

for ts, group in raw.groupby("ts"):
    bench_price, total_liq = compute_liquidity_weighted_benchmark(group)

    stale_flag = detect_staleness(ts, last_update_ts)
    liq_history.append(total_liq)
    thin_flag = detect_thin_liquidity(total_liq, liq_history)
    bench_history.append(bench_price)
    flash_flag = detect_flash_loan_pattern(bench_history)

    if not math.isnan(bench_price):
        last_update_ts = ts

    bench_rows.append(
        {
            "ts": ts,
            "benchmark_price": bench_price,
            "total_liq": total_liq,
            "stale": stale_flag,
            "thin_liq": thin_flag,
            "flash_pattern": flash_flag,
        }
    )

bench_df = pd.DataFrame(bench_rows)
bench_df.head()


In [None]:
fig, ax = plt.subplots(figsize=(12, 5))

ax.plot(bench_df["ts"], bench_df["benchmark_price"], label="benchmark")

flash_points = bench_df[bench_df["flash_pattern"]]
ax.scatter(
    flash_points["ts"],
    flash_points["benchmark_price"],
    marker="o",
    s=60,
    label="flash-like spike",
)

thin_points = bench_df[bench_df["thin_liq"]]
ax.scatter(
    thin_points["ts"],
    thin_points["benchmark_price"],
    marker="x",
    s=60,
    label="thin-liquidity window",
)

ax.set_title("Benchmark price with anomaly flags (synthetic demo)")
ax.set_xlabel("time")
ax.set_ylabel("price")
ax.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
bench_df[bench_df[["stale", "thin_liq", "flash_pattern"]].any(axis=1)]
