In [None]:
import warnings
warnings.filterwarnings('ignore')

import sys
import os
sys.path.append(os.path.abspath(".."))

import pandas as pd
import polars as pl
from datetime import datetime
import requests

from shared.poly_utils import get_markets, PLATFORM_WALLETS  # updated import


In [None]:
# Polars display prefs
pl.Config.set_tbl_rows(25)
pl.Config.set_tbl_cols(-1)
_ = pl.Config.set_tbl_width_chars(1000)

# Paths (relative to analysis/ folder)
LOC = "../"

# Cutoffs / knobs
RECENCY_CUTOFF_STR = "2025-10-01"   # filter traders whose last market activity is after this date
MIN_MARKETS_TRADED = 300            # require enough breadth
BIG_WIN_THRESH = 70.0               # percent change considered a "big win"
MIN_BIG_WINS = 0                    # optionally require some minimum number of big wins to qualify (set >0 to enforce)


In [None]:
# Load markets from parquet directory
markets_df = get_markets(LOC + "markets_partitioned")


In [None]:
df = pl.scan_parquet(LOC + "processed/trades/**/*.parquet").collect(streaming=True)

# Ensure timestamp type (parquet usually preserves it, but casting is safe)
df = df.with_columns(pl.col("timestamp").cast(pl.Datetime).alias("timestamp"))

# Compute latest price per (market_id, nonusdc_side), clamped to [0,1] at the extremes
df = df.with_columns(
    pl.col("price")
      .sort_by("timestamp")
      .last()
      .over(["market_id", "nonusdc_side"])
      .alias("last_price")
)
df = df.with_columns(
    last_price=(
        pl.when(pl.col("last_price") > 0.98).then(pl.lit(1.0))
         .when(pl.col("last_price") < 0.02).then(pl.lit(0.0))
         .otherwise(pl.col("last_price"))
    )
)


In [None]:
# %% ----------------------- Leaderboard pull -----------------------
def get_polymarket_leaderboard(top_n=500):
    """
    Fetch Polymarket leaderboard data in batches of 50.
    Returns a pandas.DataFrame with fields incl. 'user_id' and 'user_name'.
    """
    all_data = []
    limit = 50
    for offset in range(0, top_n, limit):
        url = f"https://data-api.polymarket.com/leaderboard?timePeriod=all&orderBy=PNL&limit={limit}&offset={offset}"
        res = requests.get(url, timeout=30)
        res.raise_for_status()
        all_data.extend(res.json())
    return pd.DataFrame(all_data)

top_n = get_polymarket_leaderboard(500)

In [None]:
# %% ----------------------- Per-trader metrics helpers -----------------------
def get_metrics(wallet_address: str) -> pl.DataFrame:
    """
    Build side-level metrics for a single wallet, then compute per-side PnL pieces and VWAPs.
    """
    global df, markets_df

    trader_df = df.filter(pl.col("maker") == wallet_address)
    if trader_df.height == 0:
        # Empty frame with expected schema so downstream doesn't explode
        return pl.DataFrame(
            {
                "market_id": pl.Series([], pl.Utf8),
                "side": pl.Series([], pl.Utf8),
                "trades": pl.Series([], pl.Int64),
                "avg_buy_price": pl.Series([], pl.Float64),
                "avg_sold_price_only": pl.Series([], pl.Float64),
                "avg_sell_price": pl.Series([], pl.Float64),
                "last_price": pl.Series([], pl.Float64),
                "total_pnl_usd": pl.Series([], pl.Float64),
                "last_trade_ts": pl.Series([], pl.Datetime),
                "buy_usd": pl.Series([], pl.Float64),
                "sell_usd": pl.Series([], pl.Float64),
                "buy_tokens": pl.Series([], pl.Float64),
                "sell_tokens": pl.Series([], pl.Float64),
                "buy_notional": pl.Series([], pl.Float64),
                "sell_notional": pl.Series([], pl.Float64),
                "question": pl.Series([], pl.Utf8),
            }
        )

    trader_df = trader_df.select(
        "timestamp", "market_id", "maker", "taker", "maker_direction",
        "nonusdc_side", "price", "token_amount", "usd_amount",
        "transactionHash", "last_price"
    ).rename({"maker_direction": "direction", "nonusdc_side": "side"})

    metrics_df = (
        trader_df
        .group_by(["market_id", "side"])
        .agg(
            # USD volumes
            (pl.when(pl.col("direction") == "BUY").then(pl.col("usd_amount")).otherwise(0.0)).sum().alias("buy_usd"),
            (pl.when(pl.col("direction") == "SELL").then(pl.col("usd_amount")).otherwise(0.0)).sum().alias("sell_usd"),

            # Token volumes
            (pl.when(pl.col("direction") == "BUY").then(pl.col("token_amount")).otherwise(0.0)).sum().alias("buy_tokens"),
            (pl.when(pl.col("direction") == "SELL").then(pl.col("token_amount")).otherwise(0.0)).sum().alias("sell_tokens"),

            # Notionals
            (pl.when(pl.col("direction") == "BUY").then(pl.col("price") * pl.col("token_amount")).otherwise(0.0)).sum().alias("buy_notional"),
            (pl.when(pl.col("direction") == "SELL").then(pl.col("price") * pl.col("token_amount")).otherwise(0.0)).sum().alias("sell_notional"),

            pl.col("timestamp").max().alias("last_trade_ts"),
            pl.len().alias("trades"),
            pl.col("last_price").last().alias("last_price"),
        )
        # PnL pieces
        .with_columns(
            (pl.col("sell_usd") - pl.col("buy_usd")).alias("cash_pnl_usd"),
            (pl.col("buy_tokens") - pl.col("sell_tokens")).alias("inventory_tokens"),
        )
        .with_columns(
            (pl.col("inventory_tokens") * pl.col("last_price")).alias("unrealized_usd"),
        )
        # Then create total_pnl_usd using it
        .with_columns(
            (pl.col("cash_pnl_usd") + pl.col("unrealized_usd")).alias("total_pnl_usd"),
        )
        # VWAPs
        .with_columns(
            pl.when(pl.col("buy_tokens") > 0)
              .then(pl.col("buy_notional") / pl.col("buy_tokens"))
              .otherwise(None)
              .alias("avg_buy_price"),
            pl.when(pl.col("sell_tokens") > 0)
              .then(pl.col("sell_notional") / pl.col("sell_tokens"))
              .otherwise(None)
              .alias("avg_sold_price_only"),
        )
        # Blended exit if closed now at last_price
        .with_columns(
            (pl.col("sell_tokens") + pl.col("inventory_tokens")).alias("effective_exit_tokens"),
            (pl.col("sell_notional") + pl.col("inventory_tokens") * pl.col("last_price")).alias("effective_exit_notional"),
        )
        .with_columns(
            pl.when(pl.col("effective_exit_tokens") > 0)
              .then(pl.col("effective_exit_notional") / pl.col("effective_exit_tokens"))
              .otherwise(None)
              .alias("avg_sell_price")
        )
        .select(
            "market_id", "side", "trades",
            "avg_buy_price", "avg_sold_price_only", "avg_sell_price",
            "last_price", "total_pnl_usd",
            "last_trade_ts",
            "buy_usd", "sell_usd",
            "buy_tokens", "sell_tokens",
            "buy_notional", "sell_notional",
        )
        .sort(["market_id", "side"])
    )

    # attach question text & compute pct_change at side level (for reference)
    metrics_df = metrics_df.join(
        markets_df[["id", "question"]],
        left_on="market_id", right_on="id", how="left"
    ).drop("id")

    metrics_df = metrics_df.with_columns(
        (((pl.col('avg_sell_price') - pl.col('avg_buy_price')) / pl.col('avg_buy_price')) * 100)
        .alias('pct_change')
    )

    # remove orphaned market rows
    metrics_df = metrics_df.drop_nulls(subset=["market_id"])
    return metrics_df

In [None]:
def combine_market_pct(metrics_df: pl.DataFrame) -> pl.DataFrame:
    """
    Collapse (market_id, side) -> (market_id) by taking a buy-USD-weighted average
    of pct_change across sides. Also sum PnL/volumes and keep last_trade_ts (max).
    """
    if metrics_df.height == 0:
        return pl.DataFrame(
            {
                "market_id": pl.Series([], pl.Utf8),
                "question": pl.Series([], pl.Utf8),
                "trades": pl.Series([], pl.Int64),
                "pct_change_combined": pl.Series([], pl.Float64),
                "total_pnl_usd": pl.Series([], pl.Float64),
                "buy_usd_total": pl.Series([], pl.Float64),
                "sell_usd_total": pl.Series([], pl.Float64),
                "last_trade_ts": pl.Series([], pl.Datetime),
            }
        )

    tmp = (
        metrics_df
        .with_columns(
            pl.when(pl.col("pct_change").is_not_null()).then(pl.col("buy_usd")).otherwise(0.0).alias("weight_usd"),
            (pl.col("pct_change") * pl.col("buy_usd")).fill_null(0.0).alias("num_pct_x_usd"),
        )
        .group_by("market_id")
        .agg(
            pl.col("question").first().alias("question"),
            pl.col("trades").sum().alias("trades"),
            pl.col("total_pnl_usd").sum().alias("total_pnl_usd"),
            pl.col("buy_usd").sum().alias("buy_usd_total"),
            pl.col("sell_usd").sum().alias("sell_usd_total"),
            pl.col("num_pct_x_usd").sum().alias("num_pct_x_usd"),
            pl.col("weight_usd").sum().alias("den_usd"),
            pl.col("last_trade_ts").max().alias("last_trade_ts"),
        )
        .with_columns(
            pl.when(pl.col("den_usd") > 0)
              .then(pl.col("num_pct_x_usd") / pl.col("den_usd"))
              .otherwise(None)
              .alias("pct_change_combined")
        )
        .select(
            "market_id", "question", "trades",
            "pct_change_combined",
            "total_pnl_usd", "buy_usd_total", "sell_usd_total",
            "last_trade_ts",
        )
        .sort("market_id")
    )
    return tmp


def _safe_median(series: pl.Series):
    return series.median() if series.len() > 0 else None


def compute_trader_metrics(wallet_address: str) -> dict:
    """
    Robust per-trader stats based on per-market outcomes (one row per market).
    - win_rate
    - big_win_rate_overall (>= BIG_WIN_THRESH)
    - big_win_rate_among_wins
    - median_win_pct / median_loss_pct
    - median_win_usd / median_loss_usd
    - total_pnl_usd, markets_traded, last_trade
    """
    metrics_df = get_metrics(wallet_address)
    c_df = combine_market_pct(metrics_df).rename({"pct_change_combined": "pct_change"})

    n = c_df.height
    if n == 0:
        return {
            "wallet": wallet_address, "markets_traded": 0, "last_trade": None,
            "win_rate": None, "big_win_rate_overall": None, "big_win_rate_among_wins": None,
            "median_win_pct": None, "median_loss_pct": None,
            "median_win_usd": None, "median_loss_usd": None,
            "total_pnl_usd": 0.0, "big_wins_count": 0
        }

    last_trade = c_df["last_trade_ts"].max()
    wins   = c_df.filter(pl.col("total_pnl_usd") > 0)
    losses = c_df.filter(pl.col("total_pnl_usd") <= 0)

    w = wins.height
    l = losses.height

    big_wins = wins.filter(pl.col("pct_change") >= BIG_WIN_THRESH)
    bw = big_wins.height

    win_rate = w / n if n > 0 else None
    big_win_rate_overall = bw / n if n > 0 else None
    big_win_rate_among_wins = (bw / w) if w > 0 else None

    median_win_pct  = _safe_median(wins["pct_change"]) if w > 0 else None
    median_loss_pct = _safe_median(losses["pct_change"]) if l > 0 else None
    median_win_usd  = _safe_median(wins["total_pnl_usd"]) if w > 0 else None
    median_loss_usd = _safe_median(losses["total_pnl_usd"]) if l > 0 else None

    total_pnl_usd   = float(c_df["total_pnl_usd"].sum())

    return {
        "wallet": wallet_address,
        "markets_traded": n,
        "last_trade": last_trade,
        "win_rate": win_rate,
        "big_win_rate_overall": big_win_rate_overall,
        "big_win_rate_among_wins": big_win_rate_among_wins,
        "median_win_pct": median_win_pct,
        "median_loss_pct": median_loss_pct,
        "median_win_usd": median_win_usd,
        "median_loss_usd": median_loss_usd,
        "total_pnl_usd": total_pnl_usd,
        "big_wins_count": bw,
    }

In [None]:
# %% ----------------------- Run over leaderboard & rank -----------------------
rows = []
for _, r in top_n.iterrows():
    stats = compute_trader_metrics(wallet_address=r["user_id"])
    stats["user_name"] = r.get("user_name", None)
    print(stats)

    rows.append(stats)

rank_df = pd.DataFrame(rows)

# Normalize & filter
rank_df["last_trade"] = pd.to_datetime(rank_df["last_trade"])
recency_cutoff = pd.to_datetime(RECENCY_CUTOFF_STR)

# Core filters
rank_df = rank_df[
    (rank_df["markets_traded"] >= MIN_MARKETS_TRADED) &
    (rank_df["last_trade"] > recency_cutoff) &
    (rank_df["win_rate"].notna())
].copy()

# Optional: require at least N big wins
if MIN_BIG_WINS > 0:
    rank_df = rank_df[rank_df["big_wins_count"] >= MIN_BIG_WINS].copy()

# Convenient percent display columns
rank_df["win_rate_pct"] = (rank_df["win_rate"] * 100).round(2)
rank_df["big_win_rate_overall_pct"] = (rank_df["big_win_rate_overall"] * 100).round(2)
rank_df["big_win_rate_among_wins_pct"] = (rank_df["big_win_rate_among_wins"] * 100).round(2)

# Sort to emphasize consistent big winners (then typical size, then $ impact)
rank_df = rank_df.sort_values(
    by=[
        "big_win_rate_among_wins",  # consistency of landing big wins when they do win
        "median_win_pct",           # typical magnitude of wins (pct)
        "total_pnl_usd"             # economic impact
    ],
    ascending=[False, False, False]
)

# Output: Top 50 table
top50_cols = [
    "user_name", "wallet", "markets_traded", "last_trade",
    "win_rate_pct", "big_win_rate_overall_pct", "big_win_rate_among_wins_pct",
    "median_win_pct", "median_loss_pct",
    "median_win_usd", "median_loss_usd",
    "big_wins_count",
    "total_pnl_usd"
]
top50 = rank_df[top50_cols].head(50)
top50.reset_index(drop=True, inplace=True)
