In [4]:
import os
import math
import time
import json
from datetime import datetime, timezone, timedelta
from dateutil import parser as dtparser
from collections import defaultdict

import requests
import pandas as pd
from tqdm import tqdm

In [6]:

# =========================
# Configuration
# =========================
BEACON_API = os.environ.get("BEACON_API", "https://YOUR-BEACON-ENDPOINT")  # e.g. https://ethereum-beacon-api.publicnode.com
VALIDATOR_PUBKEY = os.environ.get("VALIDATOR_PUBKEY")  # 0x... (preferred)
VALIDATOR_INDEX = os.environ.get("VALIDATOR_INDEX")     # integer as string, fallback if no pubkey

# Range to pull (epochs). You can also set START_DATE/END_DATE to auto-convert to epochs.
START_EPOCH = os.environ.get("START_EPOCH")  # e.g. "225000"
END_EPOCH   = os.environ.get("END_EPOCH")    # e.g. "230000"

START_DATE = os.environ.get("START_DATE")  # e.g. "2024-01-01"
END_DATE   = os.environ.get("END_DATE")    # e.g. "2024-12-31"

# Aggregation granularity: 'D' for daily, 'W' for weekly
RESAMPLE_RULE = os.environ.get("RESAMPLE_RULE", "D")

OUTPUT_CSV = os.environ.get("OUTPUT_CSV", "eth_staking_apy.csv")

SESSION = requests.Session()
SESSION.headers.update({"Accept": "application/json"})

# =========================
# Helper functions
# =========================

def get_json(url, params=None, retry=3, sleep=0.8):
    for i in range(retry):
        r = SESSION.get(url, params=params, timeout=30)
        if r.status_code == 200:
            return r.json()
        time.sleep(sleep * (i+1))
    raise RuntimeError(f"GET failed {url} -> {r.status_code} {r.text[:200]}")

def resolve_validator_id():
    """Return the canonical 'public key' for rewards queries, resolving from index if needed."""
    if VALIDATOR_PUBKEY:
        return VALIDATOR_PUBKEY.lower()
    if not VALIDATOR_INDEX:
        raise ValueError("Set either VALIDATOR_PUBKEY or VALIDATOR_INDEX.")
    # Resolve index -> pubkey at current state (head)
    url = f"{BEACON_API}/eth/v1/beacon/states/head/validators/{VALIDATOR_INDEX}"
    js = get_json(url)
    return js["data"]["validator"]["pubkey"].lower()

def get_genesis():
    js = get_json(f"{BEACON_API}/eth/v1/beacon/genesis")
    genesis_time = int(js["data"]["genesis_time"])
    return datetime.fromtimestamp(genesis_time, tz=timezone.utc)

def slot_time():
    """Fetch spec to derive seconds per slot + slots per epoch."""
    spec = get_json(f"{BEACON_API}/eth/v1/config/spec")
    sp = int(spec["data"]["SECONDS_PER_SLOT"])
    slots_per_epoch = int(spec["data"]["SLOTS_PER_EPOCH"])
    return sp, slots_per_epoch

def date_to_epoch(genesis_ts, slots_per_epoch, seconds_per_slot, when_utc: datetime):
    """Convert UTC datetime to nearest epoch number."""
    delta = (when_utc - genesis_ts).total_seconds()
    slot = max(0, int(delta // seconds_per_slot))
    epoch = slot // slots_per_epoch
    return epoch

def epoch_to_time(genesis_ts, slots_per_epoch, seconds_per_slot, epoch: int):
    """Epoch start time (UTC)."""
    slot = epoch * slots_per_epoch
    ts = genesis_ts + timedelta(seconds=slot * seconds_per_slot)
    return ts

def fetch_attestation_rewards(pubkey, epoch):
    """
    Returns attestation rewards (gwei) for the validator at a given epoch.
    Endpoint: /eth/v1/beacon/rewards/attestations/{epoch}?public_key=...
    """
    url = f"{BEACON_API}/eth/v1/beacon/rewards/attestations/{epoch}"
    js = get_json(url, params={"public_key": pubkey})
    # Response contains list of rewards for the provided pubkey(s)
    # Sum all components returned (source/target/head inclusion etc.)
    total = 0
    for item in js.get("data", []):
        if item.get("validator", "").lower() == pubkey:
            # Fields vary by implementation; sum all numeric reward fields present
            for k, v in item.items():
                if k == "validator":
                    continue
                if isinstance(v, (int, float)) and not math.isnan(v):
                    total += int(v)
    return total  # gwei

def fetch_proposer_rewards(pubkey, epoch):
    """
    Proposer rewards are rare per epoch for a specific validator.
    Some beacon nodes expose /eth/v1/beacon/rewards/proposer/{epoch} or blocks rewards endpoints.
    We'll attempt proposer rewards if available; otherwise return 0.
    """
    url = f"{BEACON_API}/eth/v1/beacon/rewards/blocks/{epoch}"
    try:
        js = get_json(url)
    except Exception:
        return 0
    total = 0
    # If your provider returns a list with proposer_pubkey and execution/consensus rewards, aggregate when proposer matches.
    for blk in js.get("data", []):
        if blk.get("proposer_pubkey", "").lower() == pubkey:
            for k, v in blk.items():
                if k in ("proposer_pubkey", "slot", "block_root"):
                    continue
                if isinstance(v, (int, float)):
                    total += int(v)
    return total

def effective_balance_gwei():
    """
    Effective balance is typically 32 ETH (32e9 gwei) for healthy validators.
    If you want historical EB by epoch, you could query states/{epoch}/validators/{id}.
    Here we assume 32 ETH unless you plug in a custom function.
    """
    return 32_000_000_000  # gwei

def iterate_epochs(start_epoch, end_epoch):
    for e in range(start_epoch, end_epoch + 1):
        yield e

def build_epoch_range(genesis_ts, seconds_per_slot, slots_per_epoch):
    if START_EPOCH and END_EPOCH:
        return int(START_EPOCH), int(END_EPOCH)
    if START_DATE and END_DATE:
        s = dtparser.parse(START_DATE).astimezone(timezone.utc)
        e = dtparser.parse(END_DATE).astimezone(timezone.utc)
        return (
            date_to_epoch(genesis_ts, slots_per_epoch, seconds_per_slot, s),
            date_to_epoch(genesis_ts, slots_per_epoch, seconds_per_slot, e),
        )
    # Default: last 90 days
    e = datetime.now(timezone.utc)
    s = e - timedelta(days=90)
    return (
        date_to_epoch(genesis_ts, slots_per_epoch, seconds_per_slot, s),
        date_to_epoch(genesis_ts, slots_per_epoch, seconds_per_slot, e),
    )

# =========================
# Main: pull, aggregate, APY
# =========================
def main():
    pubkey = resolve_validator_id()
    genesis_ts = get_genesis()
    seconds_per_slot, slots_per_epoch = slot_time()
    start_epoch, end_epoch = build_epoch_range(genesis_ts, seconds_per_slot, slots_per_epoch)

    rows = []
    eb_gwei = effective_balance_gwei()

    for epoch in tqdm(iterate_epochs(start_epoch, end_epoch), total=(end_epoch - start_epoch + 1), desc="Epochs"):
        # Consensus rewards: attestations
        att_gwei = 0
        try:
            att_gwei = fetch_attestation_rewards(pubkey, epoch)
        except Exception:
            # Some providers may throttle; keep going
            pass

        # Proposer rewards (from beacon if available; otherwise 0)
        prop_gwei = 0
        try:
            prop_gwei = fetch_proposer_rewards(pubkey, epoch)
        except Exception:
            pass

        total_gwei = att_gwei + prop_gwei
        # Epoch timestamp (start)
        ts = epoch_to_time(genesis_ts, slots_per_epoch, seconds_per_slot, epoch)

        # Per-epoch return (consensus-layer only) on effective balance
        # Note: execution-layer tips/MEV are *not* included here.
        epoch_ret = (total_gwei / eb_gwei) if eb_gwei > 0 else 0.0
        rows.append({"epoch": epoch, "timestamp": ts, "rewards_gwei": total_gwei, "epoch_return": epoch_ret})

    df = pd.DataFrame(rows).set_index("timestamp").sort_index()

    # Aggregate to chosen frequency
    agg = df.resample(RESAMPLE_RULE).agg(
        rewards_gwei=("rewards_gwei", "sum"),
        epochs=("epoch", "count"),
        period_return=("epoch_return", "sum"),  # small-return approximation
    )

    # For better compounding, compute geometric return over the period:
    # period_return_geo = prod(1 + epoch_return) - 1
    # We'll calculate both and keep the geometric for APY.
    def geo(g):
        return float((1.0 + g).prod() - 1.0)

    geo_returns = (
        df["epoch_return"]
        .groupby(pd.Grouper(freq=RESAMPLE_RULE))
        .apply(lambda s: geo((1.0 + s) - 1.0))  # s already epoch_return
        .rename("period_return_geo")
    )
    agg = agg.join(geo_returns)

    # Convert rewards gwei -> ETH
    agg["rewards_eth"] = agg["rewards_gwei"] / 1e9

    # Periodic rate to APY (annualize): (1 + r_period) ** (annual_periods) - 1
    if RESAMPLE_RULE.upper().startswith("D"):
        annualization = 365
    elif RESAMPLE_RULE.upper().startswith("W"):
        annualization = 52
    else:
        # Fallback: use observed epochs/day (225 per day approx) -> epochs/year
        annualization = 365

    agg["period_APY_geo"] = (1.0 + agg["period_return_geo"]).pow(annualization) - 1.0
    agg["period_APY_simple"] = (1.0 + agg["period_return"]).pow(annualization) - 1.0

    # Save & print tail
    agg.to_csv(OUTPUT_CSV, index=True)
    print(f"Saved: {OUTPUT_CSV}")
    print(agg.tail(10))

if __name__ == "__main__":
    main()


ValueError: Set either VALIDATOR_PUBKEY or VALIDATOR_INDEX.