# 📓 Cross-DEX Statistical Arbitrage Analysis
**Date:** 2025-04-02

## Overview
This notebook explores intra-day statistical arbitrage opportunities for tokens listed on multiple Solana-based decentralized exchanges (DEXes). The objective is to identify and quantify short-term price inefficiencies between DEXes for the same token, and evaluate the statistical validity of arbitrage signals.


## 🧪 Experiment Scope

- **Chains**: Solana  
- **Exchanges**: Raydium, Orca, Meteora  
- **Token universe**: Tokens listed on ≥2 DEXes  
- **Data granularity**: Minute-level time series (1-day partitions per token per exchange)  
- **Source schema**: `SOL_EXCHANGE_TOKEN_FAST`


## 🎯 Objectives

- Align and compare intra-day token price series across DEXes
- Detect price spread violations and convergence patterns
- Apply statistical tests to validate arbitrage opportunities
- Generate per-token/per-day summary metrics


## 🧭 Notebook Plan

### 1. Token Filtering
- Load token mapping from `SOL_EXCHANGE_TOKEN_FAST`
- Filter tokens with multiple DEX listings

### 2. Time-Aligned Price Series
- Load and align price series from each exchange
- Normalize and clean data

### 3. Statistical Diagnostics
- Calculate price spreads and visualize
- Run cointegration and stationarity tests
- Analyze rolling correlation and spread behavior

### 4. Arbitrage Signal Detection
- Detect statistical deviations from spread equilibrium
- Estimate reversion metrics

### 5. Summary Reporting
- Output summary table of opportunities
- Generate token-level charts and diagnostics


In [2]:
import numpy as np
import os
import pandas as pd
import polars as pl
import psycopg2
import pyarrow
from dotenv import load_dotenv
from statsmodels.tsa.stattools import adfuller, coint
import plotly.graph_objects as go

load_dotenv()

True

In [2]:
# 📥 Load Data
# Load token-exchange mapping from the database

filters = {
    "any raydium and any meteora": "(raydium_clmm = TRUE OR raydium_cpmm = TRUE OR raydium_lp = TRUE) AND (meteora_dlmm = TRUE OR meteora_lp = TRUE)",
    "any raydium and any orca": "(raydium_clmm = TRUE OR raydium_cpmm = TRUE OR raydium_lp = TRUE) AND (orca = TRUE)",
}

query = f"""
SELECT contract_address 
FROM SOL_EXCHANGE_TOKEN_FAST 
WHERE {' OR '.join(filters.values())}
"""

conn = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    database="crypto",
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
)
cur = conn.cursor()

cur.execute(query)

rows = cur.fetchall()

tokens = [row[0] for row in rows]


In [11]:
tokens[10:20]

['J3iCHHua352xwxm5nCUf5BSX8kVY5jx7QZPh59P25Cbb',
 'Q5UqzubMK2hqoPtT44TbiQRAwy9s8Zaf2AFxpk8pump',
 '4KRHkYYQggF3h54YjsrtD2MGZyjGRhYMASu5AJHXJn8Y',
 'BXg1LkAHk9rWWbF2tquDsZrVJSWDPK69M7HjfDgkGXSL',
 '9pWPUXoZKWNPWyaegPQeR3Kn8aFz9nrGtm5jeAFzpump',
 'HnCFZNScMWycGAnXUVmj8vHgf9AeWfhTWsVTRzzb4L7s',
 '8FGxHMgcSAuCGfvPW3eQncaRfgbiKPhFptj6c7p1pump',
 'D5BFSZSNUkyGtwHgcDjtkTvPVreAcwBW68BBJdVeNk16',
 'SonicxvLud67EceaEzCLRnMTBqzYUUYNr93DBkBdDES',
 '4Yz5zByTwnVe46AXD6hsrccbq4TKLyih2xRqPyLBDT1P']

In [10]:
print(len(tokens))

22848


In [None]:
path = f"s3://iamjakkie-public/normalized/solana/BLOCK_DATE=*/EXCHANGE=*/TOKEN=*/*.parquet",
df = pl.scan_parquet(path, 
    storage_options={
        "aws_access_key_id": os.getenv("AWS_ACCESS_KEY"),
        "aws_secret_access_key": os.getenv("AWS_SECRET_KEY"),
    },
)
df = df.filter(pl.col("TOKEN").is_in(tokens))

grouped = df.group_by(["TOKEN", "EXCHANGE"]).agg(pl.count().alias("COUNT")).collect().sort(pl.col("COUNT"), descending=True)


In [20]:
with pl.Config(fmt_str_lengths=1000, tbl_width_chars=1000, tbl_cols=-1, tbl_rows=-1):
    print(grouped)

shape: (50, 3)
┌──────────────────────────────────────────────┬──────────┬────────┐
│ TOKEN                                        ┆ EXCHANGE ┆ COUNT  │
│ ---                                          ┆ ---      ┆ ---    │
│ str                                          ┆ str      ┆ u32    │
╞══════════════════════════════════════════════╪══════════╪════════╡
│ mF94n83fqzRDNvs9wQnKAQ62e88fe1JhZwg9MiBpump  ┆ RAYDIUM  ┆ 523142 │
│ CAxoUYhQ4B1kqcsgir8LLLfKPQYbUDP65qA9tE9VMCWq ┆ RAYDIUM  ┆ 265988 │
│ Ch1JYYcXVqfoDgNLDjRMLNHRo4N9NuSJGcgCV8Q8fNs4 ┆ RAYDIUM  ┆ 248133 │
│ 5Jng6jkLKU1o8BNrCzTEMXMFvPjNJZTpdWR3Hq4RHJb6 ┆ RAYDIUM  ┆ 204238 │
│ Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB ┆ RAYDIUM  ┆ 193877 │
│ CSCvQY6t9W6mrWjQ32nvasBo44dc3NqrVA9nSb4ypump ┆ RAYDIUM  ┆ 143707 │
│ 2UBBpp7JEEZQg7ZHTavtL4cbcMXzxXkRiA6twUqGZNM1 ┆ RAYDIUM  ┆ 129434 │
│ DCW9BDV9R3i34fQYgddtyGZA8RpfBHrPxhN9hXfupump ┆ RAYDIUM  ┆ 127311 │
│ BMYiYGitLTdJLQRgcbvruSBMrHuiVvhJuNmiYLrrxEpp ┆ RAYDIUM  ┆ 110671 │
│ HAVVqPQf4MKVjvuFV

In [28]:
df.collect_schema()

Schema([('BLOCK_DATE', String),
        ('block_time', Int64),
        ('block_slot', UInt64),
        ('SIGNATURE', String),
        ('EXCHANGE', String),
        ('TOKEN', String),
        ('SIDE', String),
        ('TOKEN_AMOUNT', Float64),
        ('QUOTE_ASSET', String),
        ('QUOTE_AMOUNT', Float64),
        ('DERIVED_PRICE', Float64),
        ('USD_PRICE', Float64),
        ('VOLUME', Float64)])

## Align Price Series

In [4]:
test = df.limit(100).collect()


  test = df.limit(100).collect()


In [12]:
test.with_columns([
    (pl.col("block_time")*1_000).cast(pl.Datetime).dt.truncate('10s')
])

BLOCK_DATE,block_time,block_slot,SIGNATURE,EXCHANGE,TOKEN,SIDE,TOKEN_AMOUNT,QUOTE_ASSET,QUOTE_AMOUNT,DERIVED_PRICE,USD_PRICE,VOLUME
str,datetime[μs],u64,str,str,str,str,f64,str,f64,f64,f64,f64
"""2025-01-01""",2025-01-01 10:29:20,311173692,"""51GSuPYxxaA6MvtVJKdoZZXGvKaYQc…","""METEORA""","""12t8AvZrLWUmDJR9R1E2YzmsrYg9Qg…","""SELL""",45.613369,"""So1111111111111111111111111111…",0.000001,3.1263e-8,0.0,0.0
"""2025-01-01""",2025-01-01 19:44:40,311255172,"""2hMKbeZvUBUiPzAGTLyhtk4gxfRkjA…","""METEORA""","""12t8AvZrLWUmDJR9R1E2YzmsrYg9Qg…","""SELL""",25.220497,"""So1111111111111111111111111111…",7.7200e-7,3.0610e-8,0.0,0.0
"""2025-01-01""",2025-01-01 14:01:40,311205029,"""2gf9ZvNmCiqGcqEZGPhSFAYf7waif5…","""METEORA""","""13zb6Df31wsaBWYiEvB3uYmFdK8q7d…","""BUY""",2.969961e7,"""So1111111111111111111111111111…",2.49875,8.4134e-8,0.000016,475.089996
"""2025-01-01""",2025-01-01 16:38:20,311227917,"""5jMKMjzP35tpTbmr84VojzfoY6BkpV…","""METEORA""","""13zb6Df31wsaBWYiEvB3uYmFdK8q7d…","""SELL""",75957.976562,"""So1111111111111111111111111111…",0.005905,7.7734e-8,0.000015,1.12
"""2025-01-01""",2025-01-01 14:11:00,311206378,"""VY13T8ETDjT62i4ATGFn2dJBYGmD28…","""METEORA""","""13zb6Df31wsaBWYiEvB3uYmFdK8q7d…","""SELL""",2.968476e7,"""So1111111111111111111111111111…",2.48833,8.3825e-8,0.000016,473.109985
…,…,…,…,…,…,…,…,…,…,…,…,…
"""2025-01-01""",2025-01-01 19:44:30,311255151,"""2WZtkgoPjsPiBpRP1hVwAeKSbX9BEy…","""METEORA""","""14zP2ToQ79XWvc7FQpm4bRnp9d6Mp1…","""BUY""",604.778381,"""EPjFWdd5AufqSSqeM2qN1xzybapC8G…",333.268005,0.551058,0.551607,333.600006
"""2025-01-01""",2025-01-01 19:53:00,311256407,"""9RtavedkXEt991RiR1RgYnbLtPnLEK…","""METEORA""","""14zP2ToQ79XWvc7FQpm4bRnp9d6Mp1…","""BUY""",0.335143,"""EPjFWdd5AufqSSqeM2qN1xzybapC8G…",0.18352,0.547587,0.537084,0.18
"""2025-01-01""",2025-01-01 19:18:40,311251341,"""3av9XCtZ7nt6sUtyjFntqNJF7EgffU…","""METEORA""","""14zP2ToQ79XWvc7FQpm4bRnp9d6Mp1…","""BUY""",86.560333,"""EPjFWdd5AufqSSqeM2qN1xzybapC8G…",47.25,0.545862,0.54644,47.299999
"""2025-01-01""",2025-01-01 19:49:10,311255840,"""4DGurCyx79eqsHDWiD6TjNi2p3rVYb…","""METEORA""","""14zP2ToQ79XWvc7FQpm4bRnp9d6Mp1…","""BUY""",273.883087,"""EPjFWdd5AufqSSqeM2qN1xzybapC8G…",154.689438,0.564801,0.565351,154.839996


In [14]:
# 🔄 Align Price Series
# For each token, extract and align price data across all DEXes

# Step 1: Create 10s buckets
df = df.with_columns([
    (pl.col("block_time")*1_000).cast(pl.Datetime).alias("datetime"),
]).with_columns([
    pl.col("datetime").dt.truncate("10s").alias("bucket")
])

# Step 2: Aggregating the data (VWAP for each bucket per token and exchange)
agg_df = (
    df.group_by(["TOKEN", "EXCHANGE", "bucket"])
    .agg([
        (pl.col("USD_PRICE") * pl.col("VOLUME")).sum().alias("notional"),
        pl.col("VOLUME").sum().alias("volume")
    ])
    .with_columns([
        (pl.col("notional") / pl.col("volume")).alias("vwap")
    ])
)

token_df = (
    agg_df
    .filter(pl.col("TOKEN") == "CAxoUYhQ4B1kqcsgir8LLLfKPQYbUDP65qA9tE9VMCWq")
    .select(["bucket", "EXCHANGE", "vwap"])
    .collect()
    .pivot(index="bucket", columns="EXCHANGE", values="vwap")
    .sort("bucket")
    .fill_null(strategy="forward")  # Handling any gaps in data
)



  .pivot(index="bucket", columns="EXCHANGE", values="vwap")


In [35]:
token_df.write_csv("token_df.csv")

In [5]:
pandas_df = pd.read_csv("token_df.csv")[:10000]

fig = go.Figure()

# Add each DEX price series
for dex in ["RAYDIUM", "ORCA", "METEORA"]:
    if dex in pandas_df.columns:
        fig.add_trace(go.Scatter(
            x=pandas_df["bucket"],
            y=pandas_df[dex],
            mode="lines",
            name=dex
        ))

# Styling
fig.update_layout(
    title="DEX Prices Over Time",
    xaxis_title="Time",
    yaxis_title="Price (USD or Quote Asset)",
    height=600
)

fig.show()

In [17]:
# 📊 Statistical Analysis
# Run cointegration, ADF, spread stats, correlation

spread_df = token_df.with_columns([
    (pl.col("RAYDIUM") - pl.col("ORCA")).alias("SPREAD_RAYDIUM_ORCA"),
    (pl.col("RAYDIUM") - pl.col("METEORA")).alias("SPREAD_RAYDIUM_METEORA"),
    (pl.col("ORCA") - pl.col("METEORA")).alias("SPREAD_ORCA_METEORA"),
])




In [21]:
spread = spread_df["SPREAD_RAYDIUM_METEORA"].to_numpy()
spread = spread[~np.isnan(spread) & ~np.isinf(spread)]
result = adfuller(spread, maxlag=1, regression='c')
print("ADF Statistic:", result[0])
print("p-value:", result[1])

ADF Statistic: -92.3805730490001
p-value: 0.0


In [39]:
pandas_df = token_df.to_pandas()
pandas_df["spread"] = pandas_df["RAYDIUM"] - pandas_df["METEORA"]

# Z-score
mean = pandas_df["spread"].mean()
std = pandas_df["spread"].std()
pandas_df["zscore"] = (pandas_df["spread"] - mean) / std

# Define thresholds
entry_threshold = 2.0
exit_threshold = 0.5

# Entry/exit signals
pandas_df["long_entry"] = pandas_df["zscore"] < -entry_threshold
pandas_df["short_entry"] = pandas_df["zscore"] > entry_threshold
pandas_df["exit"] = pandas_df["zscore"].abs() < exit_threshold

ModuleNotFoundError: pa.Table requires 'pyarrow' module to be installed

In [None]:
fig = go.Figure()

# Z-score line
fig.add_trace(go.Scatter(x=df["bucket"], y=df["zscore"],
                         mode="lines", name="Z-Score"))

# Threshold bands
fig.add_trace(go.Scatter(x=df["bucket"], y=[entry_threshold]*len(df),
                         mode="lines", name="+2σ", line=dict(dash="dot", color="red")))
fig.add_trace(go.Scatter(x=df["bucket"], y=[-entry_threshold]*len(df),
                         mode="lines", name="-2σ", line=dict(dash="dot", color="green")))
fig.add_trace(go.Scatter(x=df["bucket"], y=[exit_threshold]*len(df),
                         mode="lines", name="+0.5σ", line=dict(dash="dash", color="gray")))
fig.add_trace(go.Scatter(x=df["bucket"], y=[-exit_threshold]*len(df),
                         mode="lines", name="-0.5σ", line=dict(dash="dash", color="gray")))

# Long entries
fig.add_trace(go.Scatter(x=df.loc[df["long_entry"], "bucket"],
                         y=df.loc[df["long_entry"], "zscore"],
                         mode="markers", name="Long Entry",
                         marker=dict(symbol="triangle-up", color="green", size=8)))

# Short entries
fig.add_trace(go.Scatter(x=df.loc[df["short_entry"], "bucket"],
                         y=df.loc[df["short_entry"], "zscore"],
                         mode="markers", name="Short Entry",
                         marker=dict(symbol="triangle-down", color="red", size=8)))

# Exit markers
fig.add_trace(go.Scatter(x=df.loc[df["exit"], "bucket"],
                         y=df.loc[df["exit"], "zscore"],
                         mode="markers", name="Exit",
                         marker=dict(symbol="x", color="black", size=6)))

fig.update_layout(title="Z-Score of Spread with Entry and Exit Signals",
                  yaxis_title="Z-Score",
                  xaxis_title="Time",
                  height=600)

fig.show()

In [25]:
price1 = token_df["RAYDIUM"].to_numpy()
price2 = token_df["METEORA"].to_numpy()

mask = ~(
    np.isnan(price1) | np.isnan(price2) |
    np.isinf(price1) | np.isinf(price2)
)

price1_clean = price1[mask]
price2_clean = price2[mask]

score, pvalue, _ = coint(price1_clean, price2_clean)
print("Cointegration test p-value:", pvalue)

Cointegration test p-value: 0.0


In [None]:
# 🚨 Signal Detection
# Detect spread breaks and reversions

# TODO: Add threshold logic and opportunity tracking

In [None]:
# 📈 Reporting
# Generate summary tables and plots

# TODO: Create dashboards and export results