In [1]:
!pip -q install python-dotenv pandas requests


In [None]:
# =========================================================
# 2)Upload ENV containing the Uniswap subgraph and ETH RPC URL
# =========================================================
import os
from google.colab import files
from dotenv import load_dotenv

print("Upload your .env file (must include SUBGRAPH_URL and ETH_RPC_URL):")
uploaded = files.upload()

env_name = ".env"
if env_name not in uploaded:
    # fall back to first uploaded if user didn't name it ".env"
    env_name = next(iter(uploaded.keys()))

load_dotenv(env_name)

required_vars = ["SUBGRAPH_URL", "ETH_RPC_URL"]
missing = [v for v in required_vars if not os.getenv(v)]
if missing:
    raise ValueError(f"Missing required env vars in .env: {missing}")

SUBGRAPH_URL = os.getenv("SUBGRAPH_URL")
ETH_RPC_URL = os.getenv("ETH_RPC_URL")

print("✅ .env loaded.")
print("SUBGRAPH_URL =", SUBGRAPH_URL)
print("ETH_RPC_URL  =", ETH_RPC_URL)


In [3]:
# =========================================================
# 3) Upload pools CSV (list of pools to pull swaps for)
# =========================================================
import pandas as pd
import re

print("Upload your pools summary CSV (e.g. top_10_pools_summary.csv):")
uploaded = files.upload()

csv_path = None
for k in uploaded.keys():
    if k.lower().endswith(".csv"):
        csv_path = k
        break
if csv_path is None:
    raise ValueError("No CSV file detected in the upload.")

pools_df = pd.read_csv(csv_path)

# normalize column names if needed
rename_map = {
    "pool": "pool_address",
    "address": "pool_address",
    "pair": "token_pair",
    "fee": "fee_tier",
}
for old, new in rename_map.items():
    if old in pools_df.columns and new not in pools_df.columns:
        pools_df = pools_df.rename(columns={old: new})

needed = ["pool_address", "token_pair", "name", "fee_tier"]
missing = [c for c in needed if c not in pools_df.columns]
if missing:
    raise ValueError(f"CSV missing columns: {missing}. Found: {list(pools_df.columns)}")

# clean addresses
pools_df["pool_address"] = pools_df["pool_address"].astype(str).str.strip().str.lower()

def looks_like_addr(x: str) -> bool:
    return bool(re.fullmatch(r"0x[a-f0-9]{40}", x))

bad = pools_df[~pools_df["pool_address"].map(looks_like_addr)]
if len(bad):
    raise ValueError("Invalid pool_address values:\n" + bad["pool_address"].to_string(index=False))

print("✅ Pools loaded:")
display(pools_df.head())


Upload your pools summary CSV (e.g. top_10_pools_summary.csv):


Saving top_10_pools_summary.csv to top_10_pools_summary.csv
✅ Pools loaded:


Unnamed: 0,pool_address,name,tvl_usd,token_pair,fee_tier,priority
0,0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640,USDC_WETH_0.05%,96607714.21,USDC/WETH,0.05%,1
1,0xcbcdf9626bc03e24f779434178a73a0b4bad62ed,WBTC_WETH_0.05%,82235363.32,WBTC/WETH,0.05%,2
2,0x99ac8ca7087fa4a2a1fb6357269965a2014abc35,WBTC_USDC_0.05%,74676284.7,WBTC/USDC,0.05%,3
3,0x5777d92f208679db4b9778590fa3cab3ac9e2168,DAI_USDC_0.05%,57144995.13,DAI/USDC,0.05%,4
4,0x4e68ccd3e89f51c3074ca5072bbac773960dfa36,WETH_USDT_0.05%,56123958.04,WETH/USDT,0.05%,5


In [13]:
# =========================================================
# 4) GraphQL + helpers for SWAPS
#    - dynamic range support
#    - fetch receipts
#    - fetch tx (to get .to/.from)
#    - tag Uniswap router calls
# =========================================================
import os
import time
import datetime as dt
from typing import List, Dict, Tuple, Optional

import pandas as pd
import requests

SUBGRAPH_URL = os.getenv("SUBGRAPH_URL")
ETH_RPC_URL = os.getenv("ETH_RPC_URL")
assert SUBGRAPH_URL, "SUBGRAPH_URL missing in env."
assert ETH_RPC_URL,  "ETH_RPC_URL missing in env."


ROUTER_ADDRESSES = {
    "0xe592427a0aece92de3edee1f18e0157c05861564",  # SwapRouter
    "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45",  # SwapRouter02
    "0x66a9893cc07d91d95644aedd05d03f95e1dba8af",  # UniversalRouter
}
ROUTER_ADDRESSES = {a.lower() for a in ROUTER_ADDRESSES}

SWAPS_Q = """
query SwapsWindow($pool: String!, $tsMin: Int!, $tsMax: Int!, $first: Int!, $skip: Int!) {
  swaps(
    first: $first
    skip: $skip
    orderBy: timestamp
    orderDirection: asc
    where: { pool: $pool, timestamp_gte: $tsMin, timestamp_lt: $tsMax }
  ) {
    id
    sender
    recipient
    origin
    amount0
    amount1
    amountUSD
    sqrtPriceX96
    tick
    token0 { id symbol decimals }
    token1 { id symbol decimals }
    pool  { id feeTier }
    transaction { id blockNumber timestamp gasPrice gasUsed }
  }
}
"""

EARLIEST_SWAP_Q = """
query EarliestSwap($pool: String!) {
  swaps(
    first: 1
    orderBy: timestamp
    orderDirection: asc
    where: { pool: $pool }
  ) {
    id
    timestamp
    transaction { id }
  }
}
"""

def run_gql(query: str, variables: dict, max_retries: int = 5, backoff: float = 1.6, timeout: int = 60) -> dict:
    attempt = 0
    while True:
        try:
            r = requests.post(SUBGRAPH_URL, json={"query": query, "variables": variables}, timeout=timeout)
            r.raise_for_status()
            data = r.json()
            if "errors" in data:
                raise RuntimeError(data["errors"])
            return data["data"]
        except Exception:
            attempt += 1
            if attempt > max_retries:
                raise
            time.sleep((backoff ** attempt) * 0.5)

def get_earliest_swap_ts(pool: str) -> Optional[int]:
    data = run_gql(EARLIEST_SWAP_Q, {"pool": pool.lower()})
    swaps = data.get("swaps", [])
    if not swaps:
        return None
    return int(swaps[0]["timestamp"])

def flatten_swaps(batch: List[dict]) -> pd.DataFrame:
    flat = []
    for s in batch:
        tx  = (s.get("transaction") or {})
        t0  = (s.get("token0") or {})
        t1  = (s.get("token1") or {})
        p   = (s.get("pool") or {})
        flat.append({
            "swap_id": s.get("id"),
            "pool_id": p.get("id"),
            "fee_tier": p.get("feeTier"),
            "tx_hash": tx.get("id"),
            "block_number": tx.get("blockNumber"),
            "timestamp": tx.get("timestamp"),
            "sender": s.get("sender"),
            "recipient": s.get("recipient"),
            "origin": s.get("origin"),
            "amount0": s.get("amount0"),
            "amount1": s.get("amount1"),
            "amountUSD": s.get("amountUSD"),
            "sqrtPriceX96": s.get("sqrtPriceX96"),
            "tick": s.get("tick"),
            "token0_id": t0.get("id"),
            "token0_symbol": t0.get("symbol"),
            "token0_decimals": t0.get("decimals"),
            "token1_id": t1.get("id"),
            "token1_symbol": t1.get("symbol"),
            "token1_decimals": t1.get("decimals"),
            "gasUsed": tx.get("gasUsed"),
            "gasPrice": tx.get("gasPrice"),
        })
    df = pd.DataFrame(flat)
    if len(df):
        df = df[
            [
                "swap_id","pool_id","fee_tier",
                "tx_hash","block_number","timestamp",
                "sender","recipient","origin",
                "amount0","amount1","amountUSD",
                "sqrtPriceX96","tick",
                "token0_id","token0_symbol","token0_decimals",
                "token1_id","token1_symbol","token1_decimals",
                "gasUsed","gasPrice",
            ]
        ]
    return df

def fetch_receipts_batched(tx_hashes: List[str]) -> Dict[str, Dict[str, int]]:
    out: Dict[str, Dict[str, int]] = {}
    BATCH = 100
    for i in range(0, len(tx_hashes), BATCH):
        chunk = tx_hashes[i:i+BATCH]
        payload = [
            {
                "jsonrpc": "2.0",
                "id": j,
                "method": "eth_getTransactionReceipt",
                "params": [h],
            }
            for j, h in enumerate(chunk)
        ]
        r = requests.post(ETH_RPC_URL, json=payload, timeout=120)
        r.raise_for_status()
        results = r.json()
        if isinstance(results, dict) and "result" in results:
            results = [results]
        for item in results:
            res = item.get("result")
            if not res:
                continue
            txh = res.get("transactionHash")
            if not txh:
                continue
            gas_used_hex = res.get("gasUsed")
            egp_hex = res.get("effectiveGasPrice") or res.get("gasPrice")
            if gas_used_hex and egp_hex:
                out[txh.lower()] = {
                    "gasUsed": int(gas_used_hex, 16),
                    "effectiveGasPrice": int(egp_hex, 16),
                }
        time.sleep(0.15)
    return out

def fetch_txs_batched(tx_hashes: List[str]) -> Dict[str, Dict[str, str]]:
    """get tx.to and tx.from"""
    out: Dict[str, Dict[str, str]] = {}
    BATCH = 100
    for i in range(0, len(tx_hashes), BATCH):
        chunk = tx_hashes[i:i+BATCH]
        payload = [
            {
                "jsonrpc": "2.0",
                "id": j,
                "method": "eth_getTransactionByHash",
                "params": [h],
            }
            for j, h in enumerate(chunk)
        ]
        r = requests.post(ETH_RPC_URL, json=payload, timeout=120)
        r.raise_for_status()
        results = r.json()
        if isinstance(results, dict) and "result" in results:
            results = [results]
        for item in results:
            res = item.get("result")
            if not res:
                continue
            th = res.get("hash")
            if not th:
                continue
            out[th.lower()] = {
                "to": (res.get("to") or "").lower(),
                "from": (res.get("from") or "").lower(),
            }
        time.sleep(0.15)
    return out

def coingecko_range_ethusd(ts_min: int, ts_max: int) -> List[Tuple[int, float]]:
    start = ts_min - 600
    end   = ts_max + 600
    resp = requests.get(
        "https://api.coingecko.com/api/v3/coins/ethereum/market_chart/range",
        params={"vs_currency": "usd", "from": start, "to": end},
        timeout=60
    )
    resp.raise_for_status()
    data = resp.json()
    prices = data.get("prices") or []
    return [(int(ms // 1000), float(px)) for ms, px in prices]

def map_nearest_price(timestamps: pd.Series, price_series: List[Tuple[int, float]]) -> pd.Series:
    if not price_series:
        return pd.Series([None] * len(timestamps), index=timestamps.index)
    secs, pxs = zip(*price_series)
    secs = list(secs); pxs = list(pxs)
    import bisect
    out = []
    for t in timestamps.astype(int).tolist():
        idx = bisect.bisect_left(secs, t)
        if idx == 0:
            out.append(pxs[0])
        elif idx >= len(secs):
            out.append(pxs[-1])
        else:
            prev_s, next_s = secs[idx-1], secs[idx]
            if abs(prev_s - t) <= abs(next_s - t):
                out.append(pxs[idx-1])
            else:
                out.append(pxs[idx])
    return pd.Series(out, index=timestamps.index, dtype="float64")


In [14]:

def download_swaps_with_receipts(pool_id: str, days: int = 1, out_dir: str = ".") -> str:
    pool = pool_id.strip().lower()
    assert pool.startswith("0x") and len(pool) == 42, "pool_id must be 42-char hex addr."

    ts_max_req = int(dt.datetime.now(dt.timezone.utc).timestamp())
    ts_min_req = ts_max_req - int(days) * 24 * 60 * 60

    # check earliest
    earliest_ts = get_earliest_swap_ts(pool)
    if earliest_ts is None:
        out_path = os.path.join(out_dir, f"{pool}_NO_SWAPS.csv")
        pd.DataFrame().to_csv(out_path, index=False)
        print("No swaps for this pool. Saved empty CSV.")
        return out_path

    if ts_min_req < earliest_ts:
        human_earliest = dt.datetime.utcfromtimestamp(earliest_ts)
        raise ValueError(
            f"Requested {days} days goes before earliest swap ({human_earliest} UTC). "
            f"Reduce days or start at {human_earliest}."
        )

    # chunk decision
    MAX_SINGLE_DAYS = 14
    if days <= MAX_SINGLE_DAYS:
        chunks = [(ts_min_req, ts_max_req)]
    else:
        CHUNK_DAYS = 7
        chunks = []
        cur_end = ts_max_req
        lower_bound = ts_min_req
        while cur_end > lower_bound:
            cur_start = max(lower_bound, cur_end - CHUNK_DAYS*24*3600)
            chunks.append((cur_start, cur_end))
            cur_end = cur_start
        chunks = list(reversed(chunks))

    total_chunks = len(chunks)
    print(f"Will fetch {days} day(s) in {total_chunks} chunk(s).")

    all_dfs = []
    token0_symbol = None
    token1_symbol = None
    BATCH = 1000

    start_all = time.time()

    for idx, (chunk_start, chunk_end) in enumerate(chunks, start=1):
        chunk_t0 = time.time()
        print(f"\n[Chunk {idx}/{total_chunks}] {chunk_start} → {chunk_end}")
        skip = 0
        while True:
            data = run_gql(SWAPS_Q, {
                "pool": pool,
                "tsMin": int(chunk_start),
                "tsMax": int(chunk_end),
                "first": BATCH,
                "skip": skip
            })
            batch = data["swaps"]
            if not batch:
                break
            df_batch = flatten_swaps(batch)
            if len(df_batch):
                all_dfs.append(df_batch)
                if token0_symbol is None:
                    token0_symbol = batch[0]["token0"]["symbol"]
                    token1_symbol = batch[0]["token1"]["symbol"]
            skip += len(batch)
            print(f"  fetched {len(batch)} swaps (skip={skip})")
        chunk_t1 = time.time()

        # progress info
        elapsed_total = chunk_t1 - start_all
        avg_per_chunk = elapsed_total / idx
        remaining_chunks = total_chunks - idx
        est_remaining = remaining_chunks * avg_per_chunk
        print(f"  chunk time: {chunk_t1 - chunk_t0:.1f}s | remaining chunks: {remaining_chunks} | est remaining: {est_remaining:.1f}s")

        time.sleep(0.7)

    if not all_dfs:
        out_name = f"{pool}_NO_SWAPS_{int(days)}d.csv"
        out_path = os.path.join(out_dir, out_name)
        pd.DataFrame().to_csv(out_path, index=False)
        print(f"No swaps found in requested range. Saved empty CSV: {out_path}")
        return out_path

    df = pd.concat(all_dfs, ignore_index=True)

    # receipts + tx meta
    tx_hashes = df["tx_hash"].astype(str).str.lower().unique().tolist()
    print(f"\nFetching receipts for {len(tx_hashes)} txs...")
    receipts = fetch_receipts_batched(tx_hashes)

    print("Fetching transaction metadata (to/from)...")
    tx_meta = fetch_txs_batched(tx_hashes)

    df["__gasUsed_receipt"] = df["tx_hash"].str.lower().map(lambda h: receipts.get(h, {}).get("gasUsed"))
    df["__egp_wei"] = df["tx_hash"].str.lower().map(lambda h: receipts.get(h, {}).get("effectiveGasPrice"))
    df["gasUsed"] = df["__gasUsed_receipt"]

    df["tx_to"] = df["tx_hash"].str.lower().map(lambda h: (tx_meta.get(h, {}) or {}).get("to"))
    df["tx_from"] = df["tx_hash"].str.lower().map(lambda h: (tx_meta.get(h, {}) or {}).get("from"))
    df["is_routed"] = df["tx_to"].fillna("").str.lower().isin(ROUTER_ADDRESSES)

    print("Fetching historical ETH/USD for full range...")
    price_series = coingecko_range_ethusd(int(df["timestamp"].min()), int(df["timestamp"].max()))
    df["__ethusd_at_block"] = map_nearest_price(df["timestamp"], price_series)

    df["gas_used_convertedUSD"] = (
        df["gasUsed"].astype("float64")
        * df["__egp_wei"].astype("float64")
        * df["__ethusd_at_block"].astype("float64")
        / 1e18
    )

    df = df.drop(columns=["__gasUsed_receipt", "__egp_wei", "__ethusd_at_block"])

    if token0_symbol is None:
        token0_symbol = str(df.loc[0, "token0_symbol"])
        token1_symbol = str(df.loc[0, "token1_symbol"])

    def clean(x: str) -> str:
        return "".join(ch for ch in str(x) if ch.isalnum() or x in ("-", "_")).strip("_")

    fname = f"{pool}_{clean(token0_symbol)}_{clean(token1_symbol)}_{int(days)}d.csv"
    out_path = os.path.join(out_dir, fname)
    df.to_csv(out_path, index=False)

    total_time = time.time() - start_all
    print(f"\n✅ Saved {len(df):,} rows to {out_path}")
    print(f"Total time: {total_time:.1f}s")
    return out_path


In [15]:
first_pool = pools_df.iloc[0]["pool_address"]
out_csv = download_swaps_with_receipts(first_pool, days=2)
print("CSV saved to:", out_csv)

preview = pd.read_csv(out_csv)
display(preview.head())


Will fetch 2 day(s) in 1 chunk(s).

[Chunk 1/1] 1762468751 → 1762641551
  fetched 1000 swaps (skip=1000)
  fetched 1000 swaps (skip=2000)
  fetched 1000 swaps (skip=3000)
  fetched 1000 swaps (skip=4000)
  fetched 1000 swaps (skip=5000)
  fetched 1000 swaps (skip=6000)
  fetched 1000 swaps (skip=7000)
  fetched 1000 swaps (skip=8000)
  fetched 1000 swaps (skip=9000)
  fetched 1000 swaps (skip=10000)
  fetched 551 swaps (skip=10551)
  chunk time: 94.0s | remaining chunks: 0 | est remaining: 0.0s

Fetching receipts for 10230 txs...
Fetching transaction metadata (to/from)...
Fetching historical ETH/USD for full range...

✅ Saved 10,551 rows to ./0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640_USDC_WETH_2d.csv
Total time: 145.5s
CSV saved to: ./0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640_USDC_WETH_2d.csv


Unnamed: 0,swap_id,pool_id,fee_tier,tx_hash,block_number,timestamp,sender,recipient,origin,amount0,...,token0_decimals,token1_id,token1_symbol,token1_decimals,gasUsed,gasPrice,tx_to,tx_from,is_routed,gas_used_convertedUSD
0,0xce7a02b39536c3e5554b98df68956147bd6ab7785362...,0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640,500,0xce7a02b39536c3e5554b98df68956147bd6ab7785362...,23743276,1762468751,0x51c72848c68a965f66fa7a88855f9f7784502a7f,0x51c72848c68a965f66fa7a88855f9f7784502a7f,0xebedc8e9ff409b23dd251f87ccbffa8075f87255,23352.125875,...,6,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH,18,151703.0,12278293096,,,False,6.173245
1,0x0c6c7580a2a522c16fe216105dd6ff41ba0723e44185...,0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640,500,0x0c6c7580a2a522c16fe216105dd6ff41ba0723e44185...,23743288,1762468895,0x66a9893cc07d91d95644aedd05d03f95e1dba8af,0x66a9893cc07d91d95644aedd05d03f95e1dba8af,0x5068a059f3d246eb88f32fa7c553bf48e13910e9,133.098117,...,6,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH,18,264066.0,985759696,,,False,0.86271
2,0x12a8da9091d0cca50ef1d50376589c447eacede8cdc1...,0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640,500,0x12a8da9091d0cca50ef1d50376589c447eacede8cdc1...,23743289,1762468907,0xfbd4cdb413e45a52e2c8312f670e9ce67e794c37,0xfbd4cdb413e45a52e2c8312f670e9ce67e794c37,0x5b43453fce04b92e190f391a83136bfbecedefd1,-4508.648316,...,6,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH,18,512418.0,836997744,,,False,1.421445
3,0x1a6b7697e5499e7febc6fb3ef6d3d4a8fb6e23e8657e...,0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640,500,0x1a6b7697e5499e7febc6fb3ef6d3d4a8fb6e23e8657e...,23743291,1762468931,0xfbd4cdb413e45a52e2c8312f670e9ce67e794c37,0xfbd4cdb413e45a52e2c8312f670e9ce67e794c37,0x5b43453fce04b92e190f391a83136bfbecedefd1,-31513.110497,...,6,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH,18,3542882.0,8164980679,,,False,95.872306
4,0x3e3fdd88341e8a74f66e196af4bc283f70b98f1a7719...,0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640,500,0x3e3fdd88341e8a74f66e196af4bc283f70b98f1a7719...,23743294,1762468967,0xfbd4cdb413e45a52e2c8312f670e9ce67e794c37,0xfbd4cdb413e45a52e2c8312f670e9ce67e794c37,0x5b43453fce04b92e190f391a83136bfbecedefd1,-30915.30229,...,6,0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2,WETH,18,4044984.0,12277015817,,,False,164.585263
