In [None]:
import json
import logging
import os
import time
import traceback
import datetime
from datetime import timezone
import signal
import tempfile
import threading
from collections import defaultdict
from threading import Lock, Event as ThreadEvent
from concurrent.futures import ThreadPoolExecutor, as_completed
from decimal import Decimal

import duckdb
import numpy as np
import pandas as pd
import requests

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from dotenv import load_dotenv
from hexbytes import HexBytes
from requests.exceptions import HTTPError, ConnectionError

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)

from web3 import Web3
from web3.providers.rpc.utils import (
    ExceptionRetryConfiguration,
    REQUEST_RETRY_ALLOWLIST,
)
from web3.exceptions import Web3RPCError

print("✓ All imports loaded successfully")

# Configuration
load_dotenv()
pd.options.display.float_format = "{:20,.4f}".format

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
    handlers=[logging.StreamHandler()],
)

ETHERSCAN_API_KEY_DICT = {
    "hearthquake": {
        "INFURA_URL": os.getenv("INFURA_URL_HEARTHQUAKE"),
        "ETHERSCAN_API_KEY": os.getenv("ETHERSCAN_API_KEY"),
    },
    "opensee": {
        "INFURA_URL": os.getenv("INFURA_URL_OPENSEE"),
        "ETHERSCAN_API_KEY": os.getenv("ETHERSCAN_API_KEY"),
    },
    "eco": {
        "INFURA_URL": os.getenv("INFURA_URL_ECO"),
        "ETHERSCAN_API_KEY": os.getenv("ETHERSCAN_API_KEY"),
    },
}

ETHERSCAN_API_KEY = ETHERSCAN_API_KEY_DICT["hearthquake"]["ETHERSCAN_API_KEY"]

STATE_FILE = "out/V3/V3_final_scan_state.json"
TOKEN_NAME_FILE = "out/V3/V3_token_name.json"
V3_EVENT_BY_CONTRACTS = "out/V3/uniswap_v3_pairs_events.json"
DB_PATH = "out/V3/uniswap_v3.duckdb"
ABI_CACHE_FOLDER = "ABI"

GLOBAL_DICT_TOKEN_SYMBOL = {}
if os.path.exists(TOKEN_NAME_FILE):
    with open(TOKEN_NAME_FILE, "r") as f:
        GLOBAL_DICT_TOKEN_SYMBOL = json.load(f)


class ProviderPool:
    def __init__(self, api_key_dict):
        self.providers = []
        self.provider_names = []
        self.index = 0
        self.lock = threading.Lock()

        for name, config in api_key_dict.items():
            provider = Web3(
                Web3.HTTPProvider(
                    endpoint_uri=config["INFURA_URL"],
                    request_kwargs={"timeout": 30},
                    exception_retry_configuration=ExceptionRetryConfiguration(
                        errors=(ConnectionError, HTTPError, TimeoutError),
                        retries=5,
                        backoff_factor=1,
                        method_allowlist=REQUEST_RETRY_ALLOWLIST,
                    ),
                )
            )
            if provider.is_connected():
                self.providers.append(provider)
                self.provider_names.append(name)
                logging.info(f"✓ Provider '{name}' connected")
            else:
                logging.warning(f"✗ Provider '{name}' failed to connect")

        if not self.providers:
            raise Exception("No providers connected!")

    def get_provider(self):
        with self.lock:
            provider = self.providers[self.index]
            name = self.provider_names[self.index]
            self.index = (self.index + 1) % len(self.providers)
            return provider, name


PROVIDER_POOL = ProviderPool(ETHERSCAN_API_KEY_DICT)
w3 = PROVIDER_POOL.get_provider()
assert w3.is_connected(), "Web3 provider connection failed"
print(f"✓ Connected to Ethereum. Latest block: {w3.eth.block_number:,}")

In [None]:
# --------------------
# Helper Function: Get ABI from Etherscan or Disk
# --------------------
def get_abi(contract_address, api_key):
    abi_folder = "ABI"
    if not os.path.exists(abi_folder):
        os.makedirs(abi_folder)

    filename = os.path.join(abi_folder, f"{contract_address}.json")

    if os.path.exists(filename):
        with open(filename, "r") as file:
            abi = json.load(file)
    else:
        abi = None  # ← INITIALIZE abi BEFORE try block
        try:
            url = f"https://api.etherscan.io/v2/api?chainid=1&module=contract&action=getabi&address={contract_address}&apikey={api_key}"
            response = requests.get(url)
            data = response.json()
            if data["status"] == "1":
                abi = json.loads(data["result"])
                with open(filename, "w") as file:
                    json.dump(abi, file)
        except Exception as e:
            logging.warning(f"Error fetching ABI for contract {contract_address}: {e}")

    return abi


def get_abi(contract_address, api_key=ETHERSCAN_API_KEY, abi_folder=ABI_CACHE_FOLDER):
    os.makedirs(abi_folder, exist_ok=True)

    filename = os.path.join(abi_folder, f"{contract_address}.json")

    if os.path.exists(filename):
        try:
            with open(filename, "r") as f:
                return json.load(f)
        except json.JSONDecodeError as e:
            logging.warning(
                f"Corrupted ABI cache for {contract_address}: {e}, re-fetching..."
            )

    try:
        url = f"https://api.etherscan.io/api?module=contract&action=getabi&address={contract_address}&apikey={api_key}"
        response = requests.get(url, timeout=10)
        response.raise_for_status()

        data = response.json()
        if data["status"] != "1":
            logging.warning(
                f"Etherscan error for {contract_address}: {data.get('result', 'Unknown')}"
            )
            return None

        abi = json.loads(data["result"])

        with open(filename, "w") as f:
            json.dump(abi, f, indent=2)

        return abi

    except requests.RequestException as e:
        logging.error(f"Network error fetching ABI for {contract_address}: {e}")
        return None
    except (json.JSONDecodeError, KeyError) as e:
        logging.error(f"Invalid response format for {contract_address}: {e}")
        return None


def get_contract(contract_address, provider=None, api_key=ETHERSCAN_API_KEY):
    if provider is None:
        provider, _ = PROVIDER_POOL.get_provider()

    contract_address = provider.to_checksum_address(contract_address)
    abi = get_abi(contract_address, api_key)

    if abi is None:
        raise ValueError(f"Could not retrieve ABI for {contract_address}")

    return provider.eth.contract(address=contract_address, abi=abi)


# -----------------------
# Helper: Convert event to dict
# -----------------------
def event_to_dict(event):
    d = dict(event)
    if "args" in d:
        d["args"] = dict(d["args"])
    if "transactionHash" in d:
        d["transactionHash"] = d["transactionHash"].hex()
    if "blockHash" in d:
        d["blockHash"] = d["blockHash"].hex()
    return d


class Web3JSONEncoder(json.JSONEncoder):
    def default(self, obj):
        # HexBytes → hex string
        if isinstance(obj, HexBytes):
            return obj.hex()
        # Peel off any other web3-specific types here as needed...
        return super().default(obj)


# -----------------------
# ETHERSCAN VERSION
# Used to find at which block 1 contract has been deployed
# Might be useful later, put it in JSON in the end
# -----------------------
def get_contract_creation_block_etherscan(
    contract_address: str, etherscan_api_key: str
) -> int:
    """
    Retrieves the contract creation block from Etherscan.
    Returns the block number as an integer.
    """
    url = (
        f"https://api.etherscan.io/api?module=contract&action=getcontractcreation"
        f"&contractaddresses={contract_address}&apikey={etherscan_api_key}"
    )
    response = requests.get(url)
    data = response.json()

    if data.get("status") == "1":
        results = data.get("result", [])
        if results and len(results) > 0:
            return int(results[0]["blockNumber"])
        else:
            raise Exception("No contract creation data found.")
    else:
        raise Exception(
            "Error fetching creation block: " + data.get("result", "Unknown error")
        )


# -----------------------
# Used to find at which block 1 contract has been deployed
# Might be useful later, put it in JSON in the end
# -----------------------
def get_contract_creation_block_custom(start_block=0, end_block=100000):

    def get_contract_deployments(start_block, end_block, max_workers=8):
        deployments = []

        def process_block(block_number):
            block = w3.eth.get_block(block_number, full_transactions=True)
            block_deployments = []
            for tx in block.transactions:
                if tx.to is None:
                    try:
                        receipt = w3.eth.get_transaction_receipt(tx.hash)
                        contract_address = receipt.contractAddress
                        if contract_address:
                            block_deployments.append(
                                {
                                    "block_number": block_number,
                                    "contract_address": contract_address,
                                }
                            )
                    except:
                        print(tx.hash)
            return block_deployments

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_block = {
                executor.submit(process_block, bn): bn
                for bn in range(start_block, end_block + 1)
            }
            for future in as_completed(future_to_block):
                block_deployments = future.result()
                deployments.extend(block_deployments)

        return deployments

    deployments = get_contract_deployments(start_block, end_block)

    # Save the results to a JSON file
    with open("contract_deployments.json", "w") as f:
        json.dump(deployments, f, indent=4)


# -- Step 2: Reconstruct an Event’s Signature --
def get_event_signature(event_name: str, abi: list) -> str:
    """
    Given an event name and an ABI, find the event definition and reconstruct its signature.
    For example, for event Transfer(address,address,uint256) this returns its keccak256 hash.
    """
    from eth_utils import keccak, encode_hex

    for item in abi:
        if item.get("type") == "event" and item.get("name") == event_name:
            # Build the signature string: "Transfer(address,address,uint256)"
            types = ",".join([inp["type"] for inp in item.get("inputs", [])])
            signature = f"{event_name}({types})"
            return encode_hex(keccak(text=signature))
    raise ValueError(f"Event {event_name} not found in ABI.")


def block_to_utc(block_number):
    """
    Convert a block number into its UTC timestamp.

    Parameters:
        w3 (Web3): A Web3 instance
        block_number (int): The block number

    Returns:
        datetime: The block timestamp in UTC
    """
    block = w3.eth.get_block(block_number)
    timestamp = block["timestamp"]
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()


def read_and_sort_jsonl(file_path):
    """
    Reads a JSONL file, each line being a JSON object with a field `blockNumber`,
    and returns a list of those objects sorted by blockNumber (ascending).
    """
    data = []
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except json.JSONDecodeError as e:
                # Handle bad JSON if needed, e.g., log or skip
                print(line)
                print(f"Skipping bad JSON line: {e}")
                continue
            # Optionally, you could check that 'blockNumber' exists, is int, etc.
            if "blockNumber" not in obj:
                print(f"Skipping line with no blockNumber: {obj}")
                continue
            data.append(obj)
    # Now sort by blockNumber ascending
    # If blockNumber in file is already int, fine; else convert
    sorted_data = sorted(data, key=lambda o: int(o["blockNumber"]))
    return sorted_data


def get_address_abi_contract(contract_address, etherscan_api_key=ETHERSCAN_API_KEY):
    address = w3.to_checksum_address(contract_address)
    contract_abi = get_abi(address, etherscan_api_key)
    contract = w3.eth.contract(address=contract_address, abi=contract_abi)

    return address, contract_abi, contract


# Find the amount of token depending on the contract at the very specific block_number
# but it use ETHERSCAN API (to go further: explorer the reconstruct from all the Transfer event but slow)
# Not super useful for the moment
def get_erc20_balance_at_block(user_address, token_address, block_number):
    """
    Query ERC-20 balance of an address at a specific block.

    user_address = "0xe2dFC8F41DB4169A24e7B44095b9E92E20Ed57eD"
    token_address = "0x514910771AF9Ca656af840dff83E8264EcF986CA"
    block_number = 23405236
    balance = get_erc20_balance_at_block(user_address, token_address, block_number)

    Parameters:
        user_address: string, account to check
        token_address: Web3 contract instance for the ERC-20 token
        block_number: int, historical block

    Returns:
        int: token balance
        None if contract is a proxy
    """
    token_address, token_abi, token_contract = get_address_abi_contract(token_address)
    user_address = w3.to_checksum_address(user_address)
    token_name = None
    token_symbol = None
    try:
        token_name = token_contract.functions.name().call()
        token_symbol = token_contract.functions.symbol().call()
    except Exception as e:
        print(f"Error {e}")
        print(f"{token_address}")
        return None
    balance = token_contract.functions.balanceOf(user_address).call(
        block_identifier=block_number
    )
    print(
        f"Address {user_address} had {w3.from_wei(balance, "ether")} of {token_symbol} at block {block_number}"
    )
    return balance


def get_token_name_by_contract(
    token_address,
    TOKEN_NAME_FILE=TOKEN_NAME_FILE,
    proxy_address=None,
    global_cache=GLOBAL_DICT_TOKEN_SYMBOL,
):
    """
    Returns the token name for `token_address`, using a local JSON cache.
    If not in cache, will call get_token_name_by_contract (your ABI/Web3 function),
    store the result (or None) in the cache file, and return it.
    """
    # 1. Load cache
    cache = global_cache
    # if os.path.exists(TOKEN_NAME_FILE):
    #     try:
    #         with open(TOKEN_NAME_FILE, "r", encoding="utf-8") as f:
    #             cache = json.load(f)
    #     except Exception as e:
    #         # If file is corrupted, proceed with empty cache
    #         print(f"Warning: cannot read token name cache: {e}")

    # 2. Check cache
    if token_address in cache:
        return cache[token_address]

    # Not in cache → fetch from contract
    name = None
    symbol = None
    address = None
    try:
        if proxy_address:
            proxy_address, proxy_abi, proxy_contract = get_address_abi_contract(
                proxy_address
            )
            token_address = proxy_contract.functions.getToken(token_address).call()
        token_address, token_abi, token_contract = get_address_abi_contract(
            token_address
        )
        # call name
        name_raw = token_contract.functions.name().call()
        symbol_raw = token_contract.functions.symbol().call()
        address = token_contract.address
        # Convert raw to str if needed
        name = str(name_raw)
        if isinstance(name_raw, (bytes, bytearray)):
            name = name_raw.decode("utf-8", errors="ignore").rstrip("\x00")
        symbol = str(symbol_raw)
        if isinstance(symbol_raw, (bytes, bytearray)):
            symbol = symbol_raw.decode("utf-8", errors="ignore").rstrip("\x00")
    except Exception as e:
        print(f"Error fetching token name/symbol for {address}: {e}")
        if token_address:
            cache[token_address] = {
                "name": None,
                "symbol": None,
                "address": None,
            }
        try:
            dirn = os.path.dirname(TOKEN_NAME_FILE) or "."
            fd, tmp = tempfile.mkstemp(dir=dirn, text=True)
            with os.fdopen(fd, "w", encoding="utf-8") as f:
                json.dump(cache, f, indent=2, ensure_ascii=False)
            os.replace(tmp, TOKEN_NAME_FILE)
        except Exception as e:
            print(f"Warning: failed to save token cache: {e}")
        return {
            "name": None,
            "symbol": None,
            "address": None,
        }

    # Update cache
    cache[address] = {
        "name": name,
        "symbol": symbol,
        "address": address,
    }

    # Write back atomically (overwrite)
    try:
        dirn = os.path.dirname(TOKEN_NAME_FILE) or "."
        fd, tmp = tempfile.mkstemp(dir=dirn, text=True)
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            json.dump(cache, f, indent=2, ensure_ascii=False)
        os.replace(tmp, TOKEN_NAME_FILE)
    except Exception as e:
        print(f"Warning: failed to save token cache: {e}")

    return cache[address]


def decode_topics(log):
    _, abi, contract = get_address_abi_contract(log["address"])
    # Try matching this log against the ABI events
    for item in abi:
        if item.get("type") == "event":
            event_signature = (
                f'{item["name"]}({",".join(i["type"] for i in item["inputs"])})'
            )
            event_hash = w3.keccak(text=event_signature).hex()

            if log["topics"][0].hex() == event_hash:
                # Found matching event
                decoded = contract.events[item["name"]]().process_log(log)
                return {
                    "event": item["name"],
                    "args": dict(decoded["args"]),
                }

    return {}  # no matching event in ABI


def release_list(a):
    del a[:]
    del a


def normalize_token_value(raw_value, decimals):
    if decimals == 18:
        return float(Web3.from_wei(raw_value, "ether"))
    else:
        return float(Decimal(raw_value) / Decimal(10**decimals))


def inspect_contract_abi(contract_address, provider=None):
    if provider is None:
        provider, _ = PROVIDER_POOL.get_provider()

    contract = get_contract(contract_address, provider)
    abi = contract.abi

    print(f"\n{'='*80}")
    print(f"CONTRACT: {contract_address}")
    print(f"{'='*80}")

    # Functions
    functions = [item for item in abi if item.get("type") == "function"]
    if functions:
        print(f"\n📋 FUNCTIONS ({len(functions)}):")
        for func in functions:
            name = func.get("name", "unnamed")
            inputs = ", ".join(
                [f"{i['type']} {i.get('name', '')}" for i in func.get("inputs", [])]
            )
            outputs = ", ".join([o["type"] for o in func.get("outputs", [])])
            state = func.get("stateMutability", "nonpayable")
            print(f"  • {name}({inputs}) → {outputs} [{state}]")

    # Events
    events = [item for item in abi if item.get("type") == "event"]
    if events:
        print(f"\n📢 EVENTS ({len(events)}):")
        for event in events:
            name = event.get("name", "unnamed")
            inputs = ", ".join(
                [f"{i['type']} {i.get('name', '')}" for i in event.get("inputs", [])]
            )
            print(f"  • {name}({inputs})")

    # Constructor
    constructor = [item for item in abi if item.get("type") == "constructor"]
    if constructor:
        print(f"\n🏗️  CONSTRUCTOR:")
        for c in constructor:
            inputs = ", ".join(
                [f"{i['type']} {i.get('name', '')}" for i in c.get("inputs", [])]
            )
            print(f"  • constructor({inputs})")

    print(f"\n{'='*80}\n")

    return abi

In [None]:
UNISWAP_V3_CONTRACT = "0x1F98431c8aD98523631AE4a59f267346ea31F984"
# Uniswap V3 Factory
factory_contract = get_contract(UNISWAP_V3_CONTRACT)
pair_count = factory_contract.functions.allPairsLength().call()

In [None]:
_thread_local = threading.local()


def get_thread_connection(db_path):
    if not hasattr(_thread_local, "conn"):
        _thread_local.conn = duckdb.connect(db_path)
    return _thread_local.conn


def setup_database(db_path=DB_PATH):
    conn = duckdb.connect(db_path)
    with open("./out/V3/database/schema.sql", "r") as f:
        schema_sql = f.read()
    conn.execute(schema_sql)
    conn.close()
    logging.info("✓ Database schema created successfully")


def batch_insert_events(events, db_path, worker_id="main"):
    if not events:
        return 0

    transfers = []
    swaps = []
    mints = []
    burns = []
    syncs = []
    approvals = []

    for e in events:
        event_type = e.get("event")
        args = e.get("args", {})

        if event_type == "Transfer":
            transfers.append(
                (
                    e["transactionHash"],
                    e["blockNumber"],
                    e.get("logIndex", 0),
                    e["address"],
                    args.get("from", ""),
                    args.get("to", ""),
                    int(args.get("value", 0)),
                )
            )

        elif event_type == "Swap":
            swaps.append(
                (
                    e["transactionHash"],
                    e["blockNumber"],
                    e.get("logIndex", 0),
                    e["address"],
                    args.get("sender", ""),
                    args.get("to", ""),
                    int(args.get("amount0In", 0)),
                    int(args.get("amount1In", 0)),
                    int(args.get("amount0Out", 0)),
                    int(args.get("amount1Out", 0)),
                )
            )

        elif event_type == "Mint":
            mints.append(
                (
                    e["transactionHash"],
                    e["blockNumber"],
                    e.get("logIndex", 0),
                    e["address"],
                    args.get("sender", ""),
                    int(args.get("amount0", 0)),
                    int(args.get("amount1", 0)),
                )
            )

        elif event_type == "Burn":
            burns.append(
                (
                    e["transactionHash"],
                    e["blockNumber"],
                    e.get("logIndex", 0),
                    e["address"],
                    args.get("sender", ""),
                    args.get("to", ""),
                    int(args.get("amount0", 0)),
                    int(args.get("amount1", 0)),
                )
            )

        elif event_type == "Sync":
            syncs.append(
                (
                    e["transactionHash"],
                    e["blockNumber"],
                    e.get("logIndex", 0),
                    e["address"],
                    int(args.get("reserve0", 0)),
                    int(args.get("reserve1", 0)),
                )
            )

        elif event_type == "Approval":
            approvals.append(
                (
                    e["transactionHash"],
                    e["blockNumber"],
                    e.get("logIndex", 0),
                    e["address"],
                    args.get("owner", ""),
                    args.get("spender", ""),
                    int(args.get("value", 0)),
                )
            )

    conn = get_thread_connection(db_path)

    conn.execute("BEGIN TRANSACTION")
    try:
        if transfers:
            conn.executemany(
                """
                INSERT INTO transfer (transaction_hash, block_number, log_index, pair_address, 
                                     from_address, to_address, value)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT (transaction_hash, log_index) DO NOTHING
            """,
                transfers,
            )

        if swaps:
            conn.executemany(
                """
                INSERT INTO swap (transaction_hash, block_number, log_index, pair_address,
                                 sender, to_address, amount0_in, amount1_in, amount0_out, amount1_out)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT (transaction_hash, log_index) DO NOTHING
            """,
                swaps,
            )

        if mints:
            conn.executemany(
                """
                INSERT INTO mint (transaction_hash, block_number, log_index, pair_address,
                                 sender, amount0, amount1)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT (transaction_hash, log_index) DO NOTHING
            """,
                mints,
            )

        if burns:
            conn.executemany(
                """
                INSERT INTO burn (transaction_hash, block_number, log_index, pair_address,
                                 sender, to_address, amount0, amount1)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT (transaction_hash, log_index) DO NOTHING
            """,
                burns,
            )

        if syncs:
            conn.executemany(
                """
                INSERT INTO sync (transaction_hash, block_number, log_index, pair_address,
                                 reserve0, reserve1)
                VALUES (?, ?, ?, ?, ?, ?)
                ON CONFLICT (transaction_hash, log_index) DO NOTHING
            """,
                syncs,
            )

        if approvals:
            conn.executemany(
                """
                INSERT INTO approval (transaction_hash, block_number, log_index, pair_address,
                                     owner, spender, value)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT (transaction_hash, log_index) DO NOTHING
            """,
                approvals,
            )

        conn.execute("COMMIT")

        total = (
            len(transfers)
            + len(swaps)
            + len(mints)
            + len(burns)
            + len(syncs)
            + len(approvals)
        )
        logging.info(
            f"[{worker_id}] Inserted {total} events (T:{len(transfers)} S:{len(swaps)} M:{len(mints)} B:{len(burns)} Sy:{len(syncs)} A:{len(approvals)})"
        )
        return total

    except Exception as e:
        conn.execute("ROLLBACK")
        logging.error(f"[{worker_id}] batch_insert_events failed: {e}", exc_info=True)
        raise


def mark_range_completed(start_block, end_block, db_path, worker_id="main"):
    conn = get_thread_connection(db_path)
    conn.execute(
        """
        INSERT INTO processing_state (start_block, end_block, status, worker_id, updated_at)
        VALUES (?, ?, 'completed', ?, NOW())
        ON CONFLICT (start_block, end_block) 
        DO UPDATE SET 
            status = 'completed', 
            worker_id = ?,
            updated_at = NOW()
    """,
        (start_block, end_block, worker_id, worker_id),
    )


def mark_range_processing(start_block, end_block, db_path, worker_id="main"):
    conn = get_thread_connection(db_path)
    conn.execute(
        """
        INSERT INTO processing_state (start_block, end_block, status, worker_id, updated_at)
        VALUES (?, ?, 'processing', ?, NOW())
        ON CONFLICT (start_block, end_block) 
        DO UPDATE SET 
            status = 'processing',
            worker_id = ?,
            updated_at = NOW()
    """,
        (start_block, end_block, worker_id, worker_id),
    )


def get_completed_ranges(db_path):
    conn = get_thread_connection(db_path)
    result = conn.execute(
        """
        SELECT start_block, end_block 
        FROM processing_state 
        WHERE status = 'completed'
    """
    ).fetchall()
    return set((r[0], r[1]) for r in result)


def get_database_stats(db_path):
    conn = get_thread_connection(db_path)

    result = conn.execute(
        """
        SELECT 
            (SELECT COUNT(*) FROM transfer) as total_transfers,
            (SELECT COUNT(*) FROM swap) as total_swaps,
            (SELECT COUNT(*) FROM mint) as total_mints,
            (SELECT COUNT(*) FROM burn) as total_burns,
            (SELECT COUNT(*) FROM sync) as total_syncs,
            (SELECT COUNT(*) FROM approval) as total_approvals,
            (SELECT COUNT(*) FROM processing_state WHERE status = 'completed') as completed_ranges,
            (SELECT COUNT(*) FROM pair_metadata) as total_pairs,
            (SELECT COUNT(*) FROM block_metadata) as total_blocks
    """
    ).fetchone()

    return {
        "total_transfers": result[0],
        "total_swaps": result[1],
        "total_mints": result[2],
        "total_burns": result[3],
        "total_syncs": result[4],
        "total_approvals": result[5],
        "completed_ranges": result[6],
        "total_pairs": result[7],
        "total_blocks": result[8],
    }


def insert_pair_metadata(
    pair_address,
    token0_address,
    token1_address,
    db_path,
    token0_symbol=None,
    token1_symbol=None,
    token0_decimals=None,
    token1_decimals=None,
    created_block=None,
):
    conn = get_thread_connection(db_path)
    conn.execute(
        """
        INSERT INTO pair_metadata (pair_address, token0_address, token1_address, token0_symbol, 
                                  token1_symbol, token0_decimals, token1_decimals, created_block)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT (pair_address) 
        DO UPDATE SET 
            token0_symbol = COALESCE(EXCLUDED.token0_symbol, pair_metadata.token0_symbol),
            token1_symbol = COALESCE(EXCLUDED.token1_symbol, pair_metadata.token1_symbol),
            token0_decimals = COALESCE(EXCLUDED.token0_decimals, pair_metadata.token0_decimals),
            token1_decimals = COALESCE(EXCLUDED.token1_decimals, pair_metadata.token1_decimals),
            last_updated = NOW()
    """,
        (
            pair_address,
            token0_address,
            token1_address,
            token0_symbol,
            token1_symbol,
            token0_decimals,
            token1_decimals,
            created_block,
        ),
    )


def get_pair_metadata(pair_address, db_path):
    conn = get_thread_connection(db_path)
    result = conn.execute(
        """
        SELECT token0_address, token1_address, token0_symbol, token1_symbol, 
               token0_decimals, token1_decimals, created_block
        FROM pair_metadata
        WHERE pair_address = ?
    """,
        (pair_address,),
    ).fetchone()

    if result:
        return {
            "token0_address": result[0],
            "token1_address": result[1],
            "token0_symbol": result[2],
            "token1_symbol": result[3],
            "token0_decimals": result[4],
            "token1_decimals": result[5],
            "created_block": result[6],
        }
    return None


def batch_insert_block_metadata(blocks_data, db_path):
    if not blocks_data:
        return 0

    conn = get_thread_connection(db_path)
    conn.executemany(
        """
        INSERT INTO block_metadata (block_number, block_timestamp, block_hash)
        VALUES (?, ?, ?)
        ON CONFLICT (block_number) DO NOTHING
    """,
        blocks_data,
    )
    return len(blocks_data)


def normalize_values_for_pair(pair_address, db_path):
    metadata = get_pair_metadata(pair_address, db_path)
    if (
        not metadata
        or metadata["token0_decimals"] is None
        or metadata["token1_decimals"] is None
    ):
        logging.warning(f"Cannot normalize values for {pair_address}: missing decimals")
        return

    lp_decimals = 18
    token0_decimals = metadata["token0_decimals"]
    token1_decimals = metadata["token1_decimals"]

    conn = get_thread_connection(db_path)

    conn.execute(
        """
        UPDATE transfer 
        SET value_normalized = value::DECIMAL / POWER(10, ?)
        WHERE pair_address = ? AND value_normalized IS NULL
    """,
        (lp_decimals, pair_address),
    )

    conn.execute(
        """
        UPDATE mint 
        SET amount0_normalized = amount0::DECIMAL / POWER(10, ?),
            amount1_normalized = amount1::DECIMAL / POWER(10, ?)
        WHERE pair_address = ? AND amount0_normalized IS NULL
    """,
        (token0_decimals, token1_decimals, pair_address),
    )

    conn.execute(
        """
        UPDATE burn 
        SET amount0_normalized = amount0::DECIMAL / POWER(10, ?),
            amount1_normalized = amount1::DECIMAL / POWER(10, ?)
        WHERE pair_address = ? AND amount0_normalized IS NULL
    """,
        (token0_decimals, token1_decimals, pair_address),
    )

    conn.execute(
        """
        UPDATE swap 
        SET amount0_in_normalized = amount0_in::DECIMAL / POWER(10, ?),
            amount1_in_normalized = amount1_in::DECIMAL / POWER(10, ?),
            amount0_out_normalized = amount0_out::DECIMAL / POWER(10, ?),
            amount1_out_normalized = amount1_out::DECIMAL / POWER(10, ?)
        WHERE pair_address = ? AND amount0_in_normalized IS NULL
    """,
        (
            token0_decimals,
            token1_decimals,
            token0_decimals,
            token1_decimals,
            pair_address,
        ),
    )

    conn.execute(
        """
        UPDATE sync 
        SET reserve0_normalized = reserve0::DECIMAL / POWER(10, ?),
            reserve1_normalized = reserve1::DECIMAL / POWER(10, ?)
        WHERE pair_address = ? AND reserve0_normalized IS NULL
    """,
        (token0_decimals, token1_decimals, pair_address),
    )

    logging.info(f"✓ Normalized values for pair {pair_address}")

In [None]:
def fetch_uniswap_pair_metadata(
    pair_address, provider=None, retry_count=0, max_retries=3
):
    if provider is None:
        provider, _ = PROVIDER_POOL.get_provider()

    try:
        pair_contract = get_contract(pair_address, provider)
        token0_address = pair_contract.functions.token0().call()
        token1_address = pair_contract.functions.token1().call()

        token0_contract = get_contract(token0_address, provider)
        token1_contract = get_contract(token1_address, provider)

        return {
            "pair_address": pair_address,
            "token0_address": token0_address,
            "token1_address": token1_address,
            "token0_symbol": token0_contract.functions.symbol().call(),
            "token0_decimals": token0_contract.functions.decimals().call(),
            "token1_symbol": token1_contract.functions.symbol().call(),
            "token1_decimals": token1_contract.functions.decimals().call(),
        }

    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 429:
            if retry_count < max_retries:
                wait_time = 2**retry_count
                logging.warning(
                    f"Rate limit (429) for {pair_address[:10]}, waiting {wait_time}s..."
                )
                time.sleep(wait_time)
                provider, _ = PROVIDER_POOL.get_provider()
                return fetch_uniswap_pair_metadata(
                    pair_address, provider, retry_count + 1, max_retries
                )
            else:
                raise Exception(f"Max retries exceeded for {pair_address}")

        elif e.response.status_code == 402:
            raise Exception(
                f"Payment required (402) - Infura credits exhausted for {pair_address}"
            )

        else:
            logging.error(f"HTTP {e.response.status_code} for {pair_address}: {e}")
            raise

    except requests.exceptions.Timeout:
        if retry_count < max_retries:
            logging.warning(
                f"Timeout for {pair_address[:10]}, retrying with new provider..."
            )
            provider, _ = PROVIDER_POOL.get_provider()
            return fetch_uniswap_pair_metadata(
                pair_address, provider, retry_count + 1, max_retries
            )
        else:
            logging.error(f"Timeout after {max_retries} retries for {pair_address}")
            raise

    except requests.exceptions.ConnectionError as e:
        if retry_count < max_retries:
            logging.warning(f"Connection error for {pair_address[:10]}, retrying...")
            provider, _ = PROVIDER_POOL.get_provider()
            return fetch_uniswap_pair_metadata(
                pair_address, provider, retry_count + 1, max_retries
            )
        else:
            logging.error(
                f"Connection failed after {max_retries} retries for {pair_address}"
            )
            raise

    except (ValueError, KeyError) as e:
        logging.error(f"Contract call failed for {pair_address}: {e}")
        return None

    except Exception as e:
        logging.error(f"Unexpected error fetching metadata for {pair_address}: {e}")
        return None


def fetch_block_metadata(block_number, provider=None, retry_count=0, max_retries=3):
    if provider is None:
        provider, _ = PROVIDER_POOL.get_provider()

    try:
        block = provider.eth.get_block(block_number)
        return (block_number, block["timestamp"], block["hash"].hex())

    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 429:
            if retry_count < max_retries:
                wait_time = 2**retry_count
                logging.warning(
                    f"Rate limit (429) for block {block_number}, waiting {wait_time}s..."
                )
                time.sleep(wait_time)
                provider, _ = PROVIDER_POOL.get_provider()
                return fetch_block_metadata(
                    block_number, provider, retry_count + 1, max_retries
                )
            else:
                raise Exception(f"Max retries exceeded for block {block_number}")

        elif e.response.status_code == 402:
            raise Exception(f"Payment required (402) - Infura credits exhausted")

        else:
            logging.error(
                f"HTTP {e.response.status_code} for block {block_number}: {e}"
            )
            raise

    except requests.exceptions.Timeout:
        if retry_count < max_retries:
            logging.warning(f"Timeout for block {block_number}, retrying...")
            provider, _ = PROVIDER_POOL.get_provider()
            return fetch_block_metadata(
                block_number, provider, retry_count + 1, max_retries
            )
        else:
            logging.error(
                f"Timeout after {max_retries} retries for block {block_number}"
            )
            raise

    except requests.exceptions.ConnectionError as e:
        if retry_count < max_retries:
            logging.warning(f"Connection error for block {block_number}, retrying...")
            provider, _ = PROVIDER_POOL.get_provider()
            return fetch_block_metadata(
                block_number, provider, retry_count + 1, max_retries
            )
        else:
            logging.error(
                f"Connection failed after {max_retries} retries for block {block_number}"
            )
            raise

    except Exception as e:
        logging.error(f"Unexpected error fetching block {block_number}: {e}")
        return None


def fetch_and_store_uniswap_pair_metadata(
    pair_address, db_path, created_block=None, provider=None
):
    metadata = fetch_uniswap_pair_metadata(pair_address, provider)
    if metadata:
        insert_pair_metadata(
            pair_address=metadata["pair_address"],
            token0_address=metadata["token0_address"],
            token1_address=metadata["token1_address"],
            db_path=db_path,
            token0_symbol=metadata["token0_symbol"],
            token1_symbol=metadata["token1_symbol"],
            token0_decimals=metadata["token0_decimals"],
            token1_decimals=metadata["token1_decimals"],
            created_block=created_block,
        )
        return metadata
    return None


def fetch_and_store_block_metadata(
    block_numbers, db_path, provider=None, max_workers=8
):
    if provider is None:
        provider, provider_name = PROVIDER_POOL.get_provider()
    else:
        provider_name = "provided"

    blocks_data = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_block = {
            executor.submit(fetch_block_metadata, block_num, provider): block_num
            for block_num in block_numbers
        }

        for future in as_completed(future_to_block):
            block_num = future_to_block[future]
            try:
                block_data = future.result()
                if block_data:
                    blocks_data.append(block_data)
            except Exception as e:
                logging.error(f"Failed to fetch block {block_num}: {e}")

    if blocks_data:
        batch_insert_block_metadata(blocks_data, db_path)
        logging.info(
            f"✓ Stored metadata for {len(blocks_data)} blocks using {provider_name}"
        )

    return len(blocks_data)


def collect_missing_pair_metadata(db_path, batch_size=50, provider=None, max_workers=4):
    conn = get_thread_connection(db_path)

    all_pairs = conn.execute(
        """
        SELECT DISTINCT pair_address FROM transfer
    """
    ).fetchall()
    all_pairs = [r[0] for r in all_pairs]

    existing_pairs = conn.execute(
        """
        SELECT pair_address FROM pair_metadata
        WHERE token0_decimals IS NOT NULL AND token1_decimals IS NOT NULL
    """
    ).fetchall()
    existing_pairs = set(r[0] for r in existing_pairs)

    missing_pairs = [p for p in all_pairs if p not in existing_pairs]

    if not missing_pairs:
        logging.info("✓ All pairs already have metadata")
        return

    logging.info(
        f"Found {len(missing_pairs)} pairs missing metadata out of {len(all_pairs)} total"
    )

    for i in range(0, len(missing_pairs), batch_size):
        batch = missing_pairs[i : i + batch_size]
        logging.info(
            f"Processing batch {i // batch_size + 1}/{(len(missing_pairs) + batch_size - 1) // batch_size}"
        )

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_pair = {
                executor.submit(
                    fetch_and_store_uniswap_pair_metadata, pair, db_path, None, provider
                ): pair
                for pair in batch
            }

            for future in as_completed(future_to_pair):
                pair = future_to_pair[future]
                try:
                    metadata = future.result()
                    if metadata:
                        logging.info(
                            f"✓ {metadata['token0_symbol']}/{metadata['token1_symbol']} - {pair[:10]}..."
                        )
                    else:
                        logging.warning(f"✗ Failed: {pair[:10]}...")
                except Exception as e:
                    logging.error(f"✗ Error for {pair[:10]}: {e}")

    logging.info("✓ Metadata collection complete")


def normalize_missing_pairs(db_path, max_workers=4):
    conn = get_thread_connection(db_path)

    pairs_with_metadata = conn.execute(
        """
        SELECT pair_address, token0_decimals, token1_decimals
        FROM pair_metadata
        WHERE token0_decimals IS NOT NULL AND token1_decimals IS NOT NULL
    """
    ).fetchall()

    if not pairs_with_metadata:
        logging.warning(
            "No pairs with metadata found - run collect_missing_pair_metadata() first"
        )
        return

    pairs_to_normalize = []
    for pair_address, token0_dec, token1_dec in pairs_with_metadata:
        needs_norm = conn.execute(
            """
            SELECT COUNT(*) FROM transfer
            WHERE pair_address = ? AND value_normalized IS NULL
            LIMIT 1
        """,
            (pair_address,),
        ).fetchone()[0]

        if needs_norm > 0:
            pairs_to_normalize.append(pair_address)

    if not pairs_to_normalize:
        logging.info("✓ All pairs already normalized")
        return

    logging.info(f"Normalizing {len(pairs_to_normalize)} pairs...")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_pair = {
            executor.submit(normalize_values_for_pair, pair, db_path): pair
            for pair in pairs_to_normalize
        }

        completed = 0
        for future in as_completed(future_to_pair):
            pair = future_to_pair[future]
            try:
                future.result()
                completed += 1
                if completed % 10 == 0 or completed == len(pairs_to_normalize):
                    logging.info(
                        f"Progress: {completed}/{len(pairs_to_normalize)} pairs normalized"
                    )
            except Exception as e:
                logging.error(f"Failed to normalize {pair[:10]}: {e}")

    logging.info("✓ Normalization complete")


def populate_block_metadata_for_range(
    start_block, end_block, db_path, batch_size=1000, provider=None, max_workers=8
):
    conn = get_thread_connection(db_path)

    existing_blocks = conn.execute(
        """
        SELECT block_number FROM block_metadata
        WHERE block_number BETWEEN ? AND ?
    """,
        (start_block, end_block),
    ).fetchall()
    existing_blocks = {b[0] for b in existing_blocks}

    all_blocks = set(range(start_block, end_block + 1))
    missing_blocks = sorted(all_blocks - existing_blocks)

    if not missing_blocks:
        logging.info("✓ All blocks already have metadata")
        return

    logging.info(f"Need to fetch {len(missing_blocks)} block timestamps")

    for i in range(0, len(missing_blocks), batch_size):
        batch = missing_blocks[i : i + batch_size]
        fetch_and_store_block_metadata(batch, db_path, provider, max_workers)
        logging.info(
            f"Progress: {min(i + batch_size, len(missing_blocks))}/{len(missing_blocks)} blocks processed"
        )



In [None]:
# Cell 3: Enhanced Blockchain Scanning Functions with Metadata Collection


def fetch_logs_for_range(
    start_block, end_block, addresses, worker_id="main", retry_count=0, max_retries=5
):
    provider, provider_name = get_provider()

    try:
        params = {
            "fromBlock": start_block,
            "toBlock": end_block,
            "address": addresses,
        }

        logs = provider.eth.get_logs(params)

        transactions = []
        for log in logs:
            transaction = {
                "transactionHash": provider.to_hex(log["transactionHash"]),
                "blockNumber": log["blockNumber"],
                "logIndex": log.get("logIndex", 0),
                "address": log["address"],
                "data": provider.to_hex(log["data"]),
            }

            topics = decode_topics(log)
            transaction.update(topics)

            if log.get("topics") and len(log["topics"]) > 0:
                transaction["eventSignature"] = provider.to_hex(log["topics"][0])
            else:
                transaction["eventSignature"] = ""

            transactions.append(transaction)

        logging.info(
            f"[{worker_id}] [{provider_name}] Fetched {len(transactions)} events from blocks [{start_block:,}, {end_block:,}]"
        )
        return transactions

    except HTTPError as e:
        if e.response.status_code == 413:
            # Response too large - need to split the range
            logging.warning(
                f"[{worker_id}] [{provider_name}] Response too large (413) for range [{start_block:,}, {end_block:,}] - will split"
            )
            raise Web3RPCError("Response payload too large - splitting range")
        
        elif e.response.status_code == 429:
            if retry_count < max_retries:
                wait_time = 2**retry_count
                logging.warning(
                    f"[{worker_id}] [{provider_name}] Rate limit hit, waiting {wait_time}s..."
                )
                time.sleep(wait_time)
                return fetch_logs_for_range(
                    start_block,
                    end_block,
                    addresses,
                    worker_id,
                    retry_count + 1,
                    max_retries,
                )
            else:
                logging.error(f"[{worker_id}] Max retries reached")
                raise
        
        elif e.response.status_code == 402:
            logging.critical(f"[{worker_id}] Payment required (402)")
            raise
        
        else:
            logging.error(f"[{worker_id}] HTTP error {e.response.status_code}: {e}")
            raise

    except Web3RPCError as e:
        if "more than 10000 results" in str(e) or "-32005" in str(e) or "Response payload too large" in str(e):
            raise
        else:
            logging.error(f"[{worker_id}] Web3 RPC error: {e}")
            raise


def collect_block_metadata_for_range(start_block, end_block, worker_id="main"):
    with _connection_lock:
        conn = duckdb.connect(_db_path)
        try:
            existing_blocks = set(
                conn.execute(
                    """
                SELECT block_number FROM block_metadata
                WHERE block_number BETWEEN ? AND ?
            """,
                    (start_block, end_block),
                ).fetchall()
            )
            existing_blocks = {b[0] for b in existing_blocks}
        finally:
            conn.close()

    missing_blocks = [
        b for b in range(start_block, end_block + 1) if b not in existing_blocks
    ]

    if not missing_blocks:
        return 0

    provider, provider_name = get_provider()
    blocks_data = []

    for block_num in missing_blocks:
        try:
            block = provider.eth.get_block(block_num)
            blocks_data.append((block_num, block["timestamp"], block["hash"].hex()))
        except Exception as e:
            logging.warning(f"[{worker_id}] Failed to fetch block {block_num}: {e}")

    if blocks_data:
        batch_insert_block_metadata(blocks_data)
        logging.debug(f"[{worker_id}] Stored metadata for {len(blocks_data)} blocks")

    return len(blocks_data)


def collect_pair_metadata_from_events(events, worker_id="main"):
    unique_pairs = set(e["address"] for e in events if "address" in e)

    for pair_address in unique_pairs:
        existing = get_pair_metadata(pair_address)
        if existing is None:
            try:
                metadata = fetch_and_store_pair_metadata(pair_address)
                if metadata:
                    logging.debug(
                        f"[{worker_id}] Stored metadata for {metadata['token0_symbol']}/{metadata['token1_symbol']}"
                    )
            except Exception as e:
                logging.warning(
                    f"[{worker_id}] Failed to fetch metadata for {pair_address}: {e}"
                )


def process_block_range(
    start_block, end_block, addresses, worker_id="main", collect_metadata=False
):
    if is_shutdown_requested():
        logging.info(
            f"[{worker_id}] Shutdown requested - skipping range [{start_block:,}, {end_block:,}]"
        )
        return 0

    if (start_block, end_block) in get_completed_ranges():
        logging.debug(
            f"[{worker_id}] Skipping already processed range [{start_block:,}, {end_block:,}]"
        )
        return 0

    mark_range_processing(start_block, end_block, worker_id)

    try:
        events = fetch_logs_for_range(start_block, end_block, addresses, worker_id)

        if is_shutdown_requested():
            logging.warning(
                f"[{worker_id}] Shutdown requested after fetch - saving {len(events)} events before stopping"
            )

        batch_insert_events(events, worker_id)

        if collect_metadata and events:
            try:
                collect_block_metadata_for_range(start_block, end_block, worker_id)
                collect_pair_metadata_from_events(events, worker_id)
            except Exception as e:
                logging.warning(
                    f"[{worker_id}] Metadata collection failed (non-fatal): {e}"
                )

        mark_range_completed(start_block, end_block, worker_id)

        logging.debug(
            f"[{worker_id}] ✓ Processed [{start_block:,}, {end_block:,}] - {len(events)} events"
        )
        return len(events)

    except (Web3RPCError, HTTPError) as e:
        # Check if we need to split (too many results OR response too large)
        if (
            "more than 10000 results" in str(e)
            or "-32005" in str(e)
            or "Response payload too large" in str(e)
            or (hasattr(e, "response") and e.response.status_code == 413)
        ):

            mid = (start_block + end_block) // 2

            if mid == start_block:
                logging.error(
                    f"[{worker_id}] Cannot split range [{start_block:,}, {end_block:,}] further - skipping"
                )
                mark_range_completed(start_block, end_block, worker_id)
                return 0

            logging.info(
                f"[{worker_id}] Splitting [{start_block:,}, {end_block:,}] at {mid:,} (reason: {type(e).__name__})"
            )

            count1 = process_block_range(
                start_block, mid, addresses, worker_id, collect_metadata
            )

            if is_shutdown_requested():
                logging.warning(
                    f"[{worker_id}] Shutdown requested - skipping second half of split"
                )
                return count1

            count2 = process_block_range(
                mid + 1, end_block, addresses, worker_id, collect_metadata
            )

            return count1 + count2
        else:
            logging.error(
                f"[{worker_id}] Failed to process [{start_block:,}, {end_block:,}]: {e}"
            )
            return 0

    except Exception as e:
        logging.error(f"[{worker_id}] Unexpected error: {e}")
        logging.error(traceback.format_exc())
        return 0


def generate_block_ranges(start_block, end_block, chunk_size):
    completed = get_completed_ranges()

    ranges = []
    current = start_block

    while current <= end_block:
        end = min(current + chunk_size - 1, end_block)

        if (current, end) not in completed:
            ranges.append((current, end))

        current = end + 1

    return ranges


def scan_blockchain(
    addresses,
    start_block,
    end_block,
    chunk_size=10000,
    max_workers=3,
    collect_metadata=False,
):
    ranges = generate_block_ranges(start_block, end_block, chunk_size)

    if not ranges:
        logging.info("No ranges to process - all already completed!")
        return

    total_ranges = len(ranges)
    logging.info(f"Processing {total_ranges} block ranges with {max_workers} workers")

    total_events = 0
    completed_ranges = 0
    start_time = time.time()

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_range = {
            executor.submit(
                process_block_range,
                start,
                end,
                addresses,
                f"worker-{i % max_workers}",
                collect_metadata,
            ): (start, end, i)
            for i, (start, end) in enumerate(ranges)
        }

        try:
            for future in as_completed(future_to_range):
                if is_shutdown_requested():
                    logging.warning(
                        "Shutdown requested - waiting for active tasks to complete..."
                    )
                    break

                start, end, idx = future_to_range[future]

                try:
                    event_count = future.result()
                    total_events += event_count
                    completed_ranges += 1

                    progress = (completed_ranges / total_ranges) * 100
                    elapsed = time.time() - start_time
                    rate = completed_ranges / elapsed if elapsed > 0 else 0
                    eta_seconds = (
                        (total_ranges - completed_ranges) / rate if rate > 0 else 0
                    )
                    eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s"

                    logging.info(
                        f"Progress: {completed_ranges}/{total_ranges} ({progress:.1f}%) | "
                        f"Events: {total_events:,} | "
                        f"Rate: {rate:.1f} ranges/s | "
                        f"ETA: {eta_str}"
                    )

                except Exception as e:
                    logging.error(f"Range [{start:,}, {end:,}] failed: {e}")

            if is_shutdown_requested():
                logging.warning(
                    "Waiting for active workers to finish their current ranges..."
                )
                executor.shutdown(wait=True, cancel_futures=True)
                logging.info(
                    f"✓ Graceful shutdown complete. Processed {completed_ranges}/{total_ranges} ranges."
                )

        except KeyboardInterrupt:
            logging.warning(
                "\n⚠️  Additional Ctrl+C detected - forcing immediate shutdown..."
            )
            executor.shutdown(wait=False, cancel_futures=True)
            raise

    elapsed_total = time.time() - start_time
    logging.info(f"\n{'='*60}")
    logging.info(f"Scan completed!")
    logging.info(f"Total events fetched: {total_events:,}")
    logging.info(f"Ranges processed: {completed_ranges}/{total_ranges}")
    logging.info(f"Total time: {int(elapsed_total // 60)}m {int(elapsed_total % 60)}s")
    logging.info(f"{'='*60}\n")


def post_process_normalization():
    logging.info("Starting post-processing normalization...")

    pairs = get_all_unique_pairs_from_db()
    logging.info(f"Found {len(pairs)} unique pairs")

    for idx, pair_address in enumerate(pairs):
        logging.info(f"[{idx+1}/{len(pairs)}] Processing {pair_address}")

        metadata = get_pair_metadata(pair_address)
        if metadata is None:
            logging.info(f"  Fetching metadata...")
            metadata = fetch_and_store_pair_metadata(pair_address)

        if metadata and metadata["token0_decimals"] is not None:
            logging.info(
                f"  Normalizing values for {metadata.get('token0_symbol', '?')}/{metadata.get('token1_symbol', '?')}..."
            )
            normalize_values_for_pair(pair_address)
        else:
            logging.warning(f"  Cannot normalize - missing decimals")

    logging.info("✓ Post-processing complete")


print("✓ Enhanced scanning functions loaded")

In [None]:
# Cell 4: Main Scanning Function with Batch Support

def scan_blockchain_to_duckdb(
    event_file=V2_EVENT_BY_CONTRACTS,
    db_path=DB_PATH,
    start_block=10000001,
    end_block=20000000,
    chunk_size=10000,
    max_workers=3,
    token_filter=None,
):

    logging.info("=" * 60)
    logging.info("BLOCKCHAIN SCANNER STARTING")
    logging.info("=" * 60)

    logging.info(f"Loading addresses from {event_file}...")
    with open(event_file, "r") as f:
        all_pairs = json.load(f)

    all_addresses = [Web3.to_checksum_address(addr) for addr in all_pairs.keys()]

    if token_filter:
        filter_checksummed = [Web3.to_checksum_address(addr) for addr in token_filter]
        addresses = [addr for addr in all_addresses if addr in filter_checksummed]
        logging.info(
            f"Filtered to {len(addresses)} addresses from {len(all_addresses)} total"
        )
    else:
        addresses = all_addresses
        logging.info(f"Using all {len(addresses)} addresses")

    logging.info("Setting up Web3 connection pool...")
    setup_web3_pool(ETHERSCAN_API_KEY_DICT)

    logging.info(f"Setting up database at {db_path}...")
    setup_database(db_path)

    stats = get_database_stats()
    logging.info("\nCurrent database stats:")
    logging.info(f"  Transfers: {stats['total_transfers']:,}")
    logging.info(f"  Swaps: {stats['total_swaps']:,}")
    logging.info(f"  Mints: {stats['total_mints']:,}")
    logging.info(f"  Burns: {stats['total_burns']:,}")
    logging.info(f"  Syncs: {stats['total_syncs']:,}")
    logging.info(f"  Approvals: {stats['total_approvals']:,}")
    logging.info(f"  Pairs with metadata: {stats['total_pairs']:,}")
    logging.info(f"  Completed ranges: {stats['completed_ranges']}")

    logging.info(f"\nStarting scan:")
    logging.info(f"  Block range: {start_block:,} to {end_block:,}")
    logging.info(f"  Chunk size: {chunk_size:,}")
    logging.info(f"  Workers: {max_workers}")
    logging.info(f"  Addresses: {len(addresses)}")
    logging.info("=" * 60 + "\n")

    try:
        scan_blockchain(
            addresses, start_block, end_block, chunk_size, max_workers, False
        )

        final_stats = get_database_stats()
        logging.info("\n" + "=" * 60)
        logging.info("SCAN COMPLETED!")
        logging.info("=" * 60)
        logging.info(f"  Transfers: {final_stats['total_transfers']:,}")
        logging.info(f"  Swaps: {final_stats['total_swaps']:,}")
        logging.info(f"  Mints: {final_stats['total_mints']:,}")
        logging.info(f"  Burns: {final_stats['total_burns']:,}")
        logging.info(f"  Syncs: {final_stats['total_syncs']:,}")
        logging.info(f"  Approvals: {final_stats['total_approvals']:,}")
        logging.info("=" * 60)

    except KeyboardInterrupt:
        logging.info("\n\nInterrupted by user - progress saved to database")
        logging.info("You can restart to continue from where it left off")
    except Exception as e:
        logging.error(f"Fatal error: {e}", exc_info=True)


def scan_all_pairs_in_batches(
    event_file=V2_EVENT_BY_CONTRACTS,
    db_path=DB_PATH,
    start_block=10000001,
    end_block=20000000,
    chunk_size=10000,
    max_workers=3,
    batch_size=100,
):
    """
    Scan all pairs in the event file in batches.
    Safer for large numbers of pairs.
    """

    logging.info("=" * 60)
    logging.info("BATCH SCANNER STARTING")
    logging.info("=" * 60)

    with open(event_file, "r") as f:
        all_pairs = json.load(f)

    all_addresses = list(all_pairs.keys())
    total_pairs = len(all_addresses)
    total_batches = (total_pairs + batch_size - 1) // batch_size

    logging.info(f"Total pairs to scan: {total_pairs}")
    logging.info(f"Batch size: {batch_size} pairs per batch")
    logging.info(f"Total batches: {total_batches}")
    logging.info(f"Block range: {start_block:,} to {end_block:,}")
    logging.info("=" * 60 + "\n")

    for i in range(0, total_pairs, batch_size):
        batch = all_addresses[i : i + batch_size]
        batch_num = i // batch_size + 1

        logging.info(f"\n{'='*60}")
        logging.info(f"BATCH {batch_num}/{total_batches}")
        logging.info(f"Pairs in this batch: {len(batch)}")
        logging.info(f"{'='*60}\n")

        try:
            scan_blockchain_to_duckdb(
                event_file=event_file,
                db_path=db_path,
                start_block=start_block,
                end_block=end_block,
                chunk_size=chunk_size,
                max_workers=max_workers,
                token_filter=batch,
            )
        except KeyboardInterrupt:
            logging.warning(f"\nInterrupted at batch {batch_num}/{total_batches}")
            logging.info(
                "Progress saved - you can resume by adjusting batch parameters"
            )
            raise
        except Exception as e:
            logging.error(f"Batch {batch_num} failed: {e}")
            logging.info(f"Continuing with next batch...")
            continue

    logging.info("\n" + "=" * 60)
    logging.info("ALL BATCHES COMPLETE")
    logging.info("=" * 60)

    final_stats = get_database_stats()
    logging.info(f"  Total Transfers: {final_stats['total_transfers']:,}")
    logging.info(f"  Total Swaps: {final_stats['total_swaps']:,}")
    logging.info(f"  Total Mints: {final_stats['total_mints']:,}")
    logging.info(f"  Total Burns: {final_stats['total_burns']:,}")
    logging.info("=" * 60)

    logging.info("\nCollecting metadata for all pairs...")
    collect_missing_pair_metadata()

    logging.info("\nNormalizing values...")
    normalize_missing_pairs()

    logging.info("\n✓ Complete pipeline finished")


def query_database(db_path="out/V2/uniswap_v2_events.duckdb"):

    conn = duckdb.connect(db_path, read_only=True)

    try:
        print("\n" + "=" * 60)
        print("DATABASE QUERIES")
        print("=" * 60)

        print("\n1. Event counts by type:")
        print(
            f"  Transfers: {conn.execute('SELECT COUNT(*) FROM transfer').fetchone()[0]:,}"
        )
        print(f"  Swaps: {conn.execute('SELECT COUNT(*) FROM swap').fetchone()[0]:,}")
        print(f"  Mints: {conn.execute('SELECT COUNT(*) FROM mint').fetchone()[0]:,}")
        print(f"  Burns: {conn.execute('SELECT COUNT(*) FROM burn').fetchone()[0]:,}")
        print(f"  Syncs: {conn.execute('SELECT COUNT(*) FROM sync').fetchone()[0]:,}")
        print(
            f"  Approvals: {conn.execute('SELECT COUNT(*) FROM approval').fetchone()[0]:,}"
        )

        print("\n2. Pair metadata coverage:")
        result = conn.execute(
            """
            SELECT 
                COUNT(DISTINCT t.pair_address) as total_pairs,
                COUNT(DISTINCT pm.pair_address) as pairs_with_metadata,
                COUNT(DISTINCT CASE WHEN pm.token0_decimals IS NOT NULL THEN pm.pair_address END) as pairs_with_decimals
            FROM (SELECT DISTINCT pair_address FROM transfer) t
            LEFT JOIN pair_metadata pm ON t.pair_address = pm.pair_address
        """
        ).fetchone()
        print(f"  Total pairs: {result[0]:,}")
        print(f"  With metadata: {result[1]:,}")
        print(f"  With decimals: {result[2]:,}")

        print("\n3. Most active pairs (by swaps):")
        result = conn.execute(
            """
            SELECT 
                s.pair_address,
                COALESCE(pm.token0_symbol || '/' || pm.token1_symbol, 'Unknown') as pair_name,
                COUNT(*) as swap_count
            FROM swap s
            LEFT JOIN pair_metadata pm ON s.pair_address = pm.pair_address
            GROUP BY s.pair_address, pair_name
            ORDER BY swap_count DESC
            LIMIT 10
        """
        ).fetchdf()
        print(result)

        return result

    finally:
        conn.close()


def get_pair_info(pair_address, db_path="out/V2/uniswap_v2_events.duckdb"):
    pair_address = Web3.to_checksum_address(pair_address)

    conn = duckdb.connect(db_path, read_only=True)

    try:
        print("\n" + "=" * 60)
        print(f"PAIR INFORMATION: {pair_address}")
        print("=" * 60)

        metadata = conn.execute(
            """
            SELECT token0_address, token1_address, token0_symbol, token1_symbol,
                   token0_decimals, token1_decimals, created_block
            FROM pair_metadata
            WHERE pair_address = ?
        """,
            (pair_address,),
        ).fetchone()

        if metadata:
            print("\nMetadata:")
            print(f"  Pair: {metadata[2] or '?'}/{metadata[3] or '?'}")
            print(f"  Token0: {metadata[0]} ({metadata[4] or '?'} decimals)")
            print(f"  Token1: {metadata[1]} ({metadata[5] or '?'} decimals)")
            if metadata[6]:
                print(f"  Created at block: {metadata[6]:,}")
        else:
            print("\n⚠️  No metadata found for this pair")

        print("\nEvent counts:")
        transfers = conn.execute(
            "SELECT COUNT(*) FROM transfer WHERE pair_address = ?", (pair_address,)
        ).fetchone()[0]
        swaps = conn.execute(
            "SELECT COUNT(*) FROM swap WHERE pair_address = ?", (pair_address,)
        ).fetchone()[0]
        mints = conn.execute(
            "SELECT COUNT(*) FROM mint WHERE pair_address = ?", (pair_address,)
        ).fetchone()[0]
        burns = conn.execute(
            "SELECT COUNT(*) FROM burn WHERE pair_address = ?", (pair_address,)
        ).fetchone()[0]

        print(f"  Transfers: {transfers:,}")
        print(f"  Swaps: {swaps:,}")
        print(f"  Mints: {mints:,}")
        print(f"  Burns: {burns:,}")

        latest_sync = conn.execute(
            """
            SELECT reserve0_normalized, reserve1_normalized, block_number
            FROM sync
            WHERE pair_address = ?
            ORDER BY block_number DESC
            LIMIT 1
        """,
            (pair_address,),
        ).fetchone()

        if latest_sync and latest_sync[0] is not None:
            print(f"\nLatest reserves (block {latest_sync[2]:,}):")
            print(f"  Reserve0: {latest_sync[0]:,.6f}")
            print(f"  Reserve1: {latest_sync[1]:,.6f}")

        print("=" * 60)

    finally:
        conn.close()


print("✓ Enhanced main functions loaded")

In [None]:
token_filter = [
    "0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc",
    "0x3139Ffc91B99aa94DA8A2dc13f1fC36F9BDc98eE",
    "0x12EDE161c702D1494612d19f05992f43aa6A26FB",
    "0xA478c2975Ab1Ea89e8196811F51A7B7Ade33eB11",
    "0x07F068ca326a469Fc1d87d85d448990C8cBa7dF9",
    "0xAE461cA67B15dc8dc81CE7615e0320dA1A9aB8D5",
    "0xCe407CD7b95B39d3B4d53065E711e713dd5C5999",
    "0x33C2d48Bc95FB7D0199C5C693e7a9F527145a9Af",
]
START_BLOCK = 10000000
END_BLOCK = 10500000

In [None]:
token_filter = [
    "0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc",
    "0x3139Ffc91B99aa94DA8A2dc13f1fC36F9BDc98eE",
    "0x12EDE161c702D1494612d19f05992f43aa6A26FB",
    "0xA478c2975Ab1Ea89e8196811F51A7B7Ade33eB11",
    "0x07F068ca326a469Fc1d87d85d448990C8cBa7dF9",
    "0xAE461cA67B15dc8dc81CE7615e0320dA1A9aB8D5",
    "0xCe407CD7b95B39d3B4d53065E711e713dd5C5999",
    "0x33C2d48Bc95FB7D0199C5C693e7a9F527145a9Af",
]


START_BLOCK = 6000000
END_BLOCK = 10500000
#END_BLOCK = w3.eth.block_number
# Usage Example 2: Scan ALL pairs in batches (RECOMMENDED)
try:
    scan_all_pairs_in_batches(
        start_block=START_BLOCK,
        end_block=END_BLOCK,
        chunk_size=10000,
        max_workers=18,
        batch_size=50,  # 50 pairs at a time
    )   
except KeyboardInterrupt:
    logging.warning("Shutdown requested by user")
    # Clean shutdown logic
except Exception as e:
    logging.error(f"Fatal error: {e}")
    raise  

# Usage Example 3: Continue metadata collection after scanning
collect_missing_pair_metadata()
normalize_missing_pairs()

In [None]:
def get_logs_with_chunking(
    get_logs_fn,
    from_block,
    to_block,
    argument_filters=None,
    initial_chunk_size=10000,
    max_retries=5,
    base_delay=0.5,
):
    """
    Fetch logs with automatic chunking and rate limit handling.
    """

    if argument_filters is None:
        argument_filters = {}

    # Convert 'latest' to actual block number
    if to_block == "latest":
        to_block = w3.eth.block_number

    def fetch_with_retry(start, end, retries=0):
        """Fetch logs with exponential backoff on rate limit errors."""
        try:
            time.sleep(base_delay)

            logs = get_logs_fn(
                from_block=start, to_block=end, argument_filters=argument_filters
            )
            return logs

        except HTTPError as e:
            if "429" in str(e) or "Too Many Requests" in str(e):
                if retries < max_retries:
                    wait_time = base_delay * (2**retries)
                    print(
                        f"  ⚠ Rate limit hit, waiting {wait_time:.1f}s... (retry {retries + 1}/{max_retries})"
                    )
                    time.sleep(wait_time)
                    return fetch_with_retry(start, end, retries + 1)
                else:
                    print(f"  ✗ Max retries reached")
                    raise
            else:
                raise

    def fetch_range(start, end, chunk_size):
        """Recursive function to fetch a block range with dynamic chunking."""

        if end - start <= chunk_size:
            try:
                print(f"Fetching blocks {start} to {end} ({end - start + 1} blocks)...")
                logs = fetch_with_retry(start, end)
                print(f"  ✓ Got {len(logs)} logs")
                return logs

            except HTTPError as e:
                raise

            except Exception as e:
                error_str = str(e)

                if "-32005" in error_str or "more than 10000 results" in error_str:
                    print(f"  ⚠ Too many results, splitting...")

                    mid = (start + end) // 2

                    if mid == start:
                        print(f"  ⚠ Cannot split further")
                        try:
                            if hasattr(e, "args") and len(e.args) > 0:
                                error_data = e.args[0]
                                if (
                                    isinstance(error_data, dict)
                                    and "data" in error_data
                                ):
                                    suggested_to = int(
                                        error_data["data"].get("to", hex(end)), 16
                                    )
                                    if suggested_to < end:
                                        print(f"  Using RPC hint: {suggested_to}")
                                        return fetch_range(
                                            start, suggested_to, chunk_size // 2
                                        )
                        except:
                            pass

                        raise Exception(
                            f"Cannot split block range {start}-{end} further"
                        )

                    left_logs = fetch_range(start, mid, chunk_size // 2)
                    right_logs = fetch_range(mid + 1, end, chunk_size // 2)

                    return left_logs + right_logs
                else:
                    print(f"  ✗ Error: {e}")
                    raise

        else:
            print(f"Splitting range {start}-{end} into chunks of {chunk_size}...")
            current = start
            logs = []

            while current <= end:
                chunk_end = min(current + chunk_size - 1, end)
                chunk_logs = fetch_range(current, chunk_end, chunk_size)
                logs.extend(chunk_logs)
                current = chunk_end + 1

            return logs

    print(f"\n{'='*60}")
    print(f"Fetching logs from block {from_block} to {to_block}")
    print(f"{'='*60}")

    all_logs = fetch_range(from_block, to_block, initial_chunk_size)

    print(f"\n{'='*60}")
    print(f"✓ Complete! Total logs fetched: {len(all_logs)}")
    print(f"{'='*60}\n")

    return all_logs


# ============================================================
# MAIN CODE
# ============================================================

# 1. Get the factory contract
address, abi, contract = get_address_abi_contract(UNISWAP_V2_CONTRACT)

start_block = 0
end_block = "latest"

# 2. Fetch all PairCreated events
print("Fetching all PairCreated events from Uniswap V2 Factory...")
pair_created_logs = get_logs_with_chunking(
    get_logs_fn=contract.events.PairCreated().get_logs,
    from_block=start_block,
    to_block=end_block,
    argument_filters={},
    initial_chunk_size=10000,
    max_retries=5,
    base_delay=0.5,
)

print(f"\n{'='*60}")
print(f"Found {len(pair_created_logs)} pairs")
print(f"{'='*60}\n")

# 3. Get event names from one sample pair (all pairs have same interface)
if len(pair_created_logs) > 0:
    sample_pair_address = pair_created_logs[0].args.pair
    print(f"Getting event list from sample pair: {sample_pair_address}")

    pair_address, pair_abi, pair_contract = get_address_abi_contract(
        sample_pair_address
    )
    event_names = [ev.event_name for ev in pair_contract.events]

    print(f"All pairs have these events: {event_names}\n")

    # 4. Build the dictionary structure
    FULL_EVENT_BY_CONTRACTS = {}

    print(f"Building dictionary structure for {len(pair_created_logs)} pairs...")

    for idx, log in enumerate(pair_created_logs):
        pair_addr = log.args.pair

        # Create structure with empty dicts for each event
        FULL_EVENT_BY_CONTRACTS[pair_addr] = {event: {} for event in event_names}

        if (idx + 1) % 100 == 0:
            print(f"  Processed {idx + 1}/{len(pair_created_logs)} pairs...")

    print(f"  ✓ Completed all {len(pair_created_logs)} pairs\n")

    # 5. Save to disk
    output_file = "uniswap_v2_pairs_events.json"

    print(f"Saving to {output_file}...")
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(FULL_EVENT_BY_CONTRACTS, f, ensure_ascii=False, indent=4)

    print(f"✓ Saved successfully!")

    # 6. Print summary
    print(f"\n{'='*60}")
    print("SUMMARY")
    print(f"{'='*60}")
    print(f"Total pairs: {len(FULL_EVENT_BY_CONTRACTS)}")
    print(f"Events per pair: {len(event_names)}")
    print(f"Event types: {', '.join(event_names)}")
    print(f"Output file: {output_file}")
    print(f"{'='*60}\n")

    # Print first 3 pairs as sample
    print("Sample (first 3 pairs):")
    for idx, (pair_addr, events) in enumerate(
        list(FULL_EVENT_BY_CONTRACTS.items())[:3]
    ):
        print(f"\n{pair_addr}:")
        for event_name in events.keys():
            print(f"  - {event_name}: {{}}")

else:
    print("No pairs found!")

In [None]:
# Important code
# We look for the Genesis Uniswap factory, and we get all its events (Only 1 for the V1_factory: 'NewExchange', Only 1 for the V2_factory: PairCreated)
# Then we scan from 0 to latest block every NexEchange created from this Factory
# (We have the filter of events in case we are filtering events from contract that have multiple events to remove when we don't care)
address, abi, contract = get_address_abi_contract(
    UNISWAP_V2_CONTRACT
)  # Uniswap Genesis Factory
start_block = 0
end_block = 'latest'
# list all event names
event_names = [ev.event_name for ev in contract.events]
print(event_names)
# define which events you want and filters directly
events_to_scan = [
    contract.events.PairCreated().get_logs,
    #contract.events.Transfer().get_logs,
    #contract.events.Approval().get_logs,
]
L_LOGS = [] # IMPORTANT
for get_logs_fn in events_to_scan:
    logs = get_logs_fn(
        from_block=start_block,
        to_block=end_block,
        argument_filters={},  # or {"from": some_address}, {"to": [addr1, addr2]}
    )
    for log in logs:
        # print(log["transactionHash"].hex(), log["blockNumber"], log["event"])
        L_LOGS.append(log)

# Important code we use in combination with the events filter
# We created a list of Exchange created by the Uniswap V1 Factory Contract and we list all their Events
# We create the Dictionnary
# "exchange_address_1": {"event_1": {}, event_2: {}, event_3:{}}
# This dict fed with the code allow us to retrieve every transactions with the events(logs) of this exchange
# we can then sniff Liquidity out of it

FULL_EVENT_BY_CONTRACTS = {}  # IMPORTANT
for log in L_LOGS:
    add, abi, contract = get_address_abi_contract(log.args.exchange)
    event_names = [ev.event_name for ev in contract.events]
    FULL_EVENT_BY_CONTRACTS[add] = {event: {} for event in event_names}
    time.sleep(1)

print(len(FULL_EVENT_BY_CONTRACTS)) 

if not os.path.exists(V2_EVENT_BY_CONTRACTS):
    with open(V2_EVENT_BY_CONTRACTS, "w", encoding="utf-8") as f:
        json.dump(V2_EVENT_BY_CONTRACTS, f, ensure_ascii=False, indent=4)

In [None]:
# Cell: Query specific pair data for analysis


def load_pair_data_for_analysis(pair_address, block_start=None, block_end=None):
    """
    Load normalized transfer data for a specific pair.
    This is what you need for liquidity analysis.
    """
    pair_address = w3.to_checksum_address(pair_address)

    block_filter = ""
    if block_start and block_end:
        block_filter = f"AND block_number BETWEEN {block_start} AND {block_end}"
    elif block_start:
        block_filter = f"AND block_number >= {block_start}"
    elif block_end:
        block_filter = f"AND block_number <= {block_end}"

    conn = duckdb.connect(DB_PATH, read_only=True)

    # Get pair metadata
    metadata = conn.execute(
        """
        SELECT token0_symbol, token1_symbol, token0_decimals, token1_decimals
        FROM pair_metadata
        WHERE pair_address = ?
    """,
        (pair_address,),
    ).fetchone()

    if metadata:
        pair_name = f"{metadata[0]}/{metadata[1]}"
        print(f"Loading data for {pair_name} pair ({pair_address[:10]}...)")
    else:
        pair_name = "Unknown"
        print(f"⚠️  No metadata found for {pair_address}")

    # Load transfer events (this is what you need for liquidity tracking)
    df = conn.execute(
        f"""
        SELECT 
            block_number as block,
            pair_address as address,
            from_address,
            to_address,
            COALESCE(value_normalized, CAST(value AS DOUBLE) / 1e18) as value
        FROM transfer
        WHERE pair_address = ?
        {block_filter}
        ORDER BY block_number, log_index
    """,
        (pair_address,),
    ).fetchdf()

    conn.close()

    if df.empty:
        print("⚠️  No transfer events found")
        return df

    # Add metadata
    df["pair_name"] = pair_name
    df["address"] = df["address"].apply(w3.to_checksum_address)
    df["from_address"] = df["from_address"].apply(w3.to_checksum_address)
    df["to_address"] = df["to_address"].apply(w3.to_checksum_address)

    print(f"Loaded {len(df):,} transfer events")
    print(f"Block range: {df['block'].min():,} to {df['block'].max():,}")

    return df


def get_pair_summary_stats(pair_address):
    """
    Get summary statistics for a pair.
    """
    pair_address = w3.to_checksum_address(pair_address)

    conn = duckdb.connect(DB_PATH, read_only=True)

    stats = conn.execute(
        """
        SELECT 
            COUNT(DISTINCT t.transaction_hash) as total_transactions,
            COUNT(*) FILTER (WHERE t.from_address = '0x0000000000000000000000000000000000000000') as mints,
            COUNT(*) FILTER (WHERE t.to_address = '0x0000000000000000000000000000000000000000') as burns,
            COUNT(*) FILTER (WHERE t.from_address != '0x0000000000000000000000000000000000000000' 
                             AND t.to_address != '0x0000000000000000000000000000000000000000') as transfers,
            (SELECT COUNT(*) FROM swap WHERE pair_address = ?) as swaps,
            MIN(t.block_number) as first_block,
            MAX(t.block_number) as last_block
        FROM transfer t
        WHERE t.pair_address = ?
    """,
        (pair_address, pair_address),
    ).fetchone()

    metadata = conn.execute(
        """
        SELECT token0_symbol, token1_symbol
        FROM pair_metadata
        WHERE pair_address = ?
    """,
        (pair_address,),
    ).fetchone()

    conn.close()

    if metadata:
        print(f"\n{'='*60}")
        print(f"PAIR SUMMARY: {metadata[0]}/{metadata[1]}")
        print(f"{'='*60}")

    print(f"Total transactions: {stats[0]:,}")
    print(f"Mints (add liquidity): {stats[1]:,}")
    print(f"Burns (remove liquidity): {stats[2]:,}")
    print(f"Transfers (LP token): {stats[3]:,}")
    print(f"Swaps: {stats[4]:,}")
    print(f"Block range: {stats[5]:,} to {stats[6]:,}")
    print(f"{'='*60}\n")

    return stats


# Example usage:
df = load_pair_data_for_analysis(token_filter[0], START_BLOCK, END_BLOCK)
get_pair_summary_stats(token_filter[0])

In [None]:
df

In [None]:
# 1ST GRAPH, evolution of the UNISWAP v1 (UNI-V1) amount of token issued/burned (GLOBAL TOTAL over block)
# Important to compare the size of every pool but we need to link "value" to either $ or something relevant for comparison
# NEED: df
totals = (
    df.groupby(["block", "address"], as_index=False)["value"]
    .sum()
    .sort_values(["address", "block"])
)
totals["cum_value"] = totals.groupby("address")["value"].cumsum()

# # 2) fill missing blocks only inside each address' span (min..max), then cumulate
# totals = totals.groupby("address", group_keys=False).apply(
#     lambda g: (
#         g.set_index("block")
#         .reindex(range(g["block"].min(), g["block"].max() + 1), fill_value=0)
#         .rename_axis("block")
#         .reset_index()
#         .assign(address=g.name)
#     )
# )
# totals = (
#     totals[["block", "address", "value"]]
#     .sort_values(["address", "block"])
#     .reset_index(drop=True)
# )

pools_of_interest = [
    w3.to_checksum_address(token_filter[0]),
    w3.to_checksum_address(token_filter[1]),
    w3.to_checksum_address(token_filter[2]),
]

# pools_of_interest = ["add_1","add_2","add_3"]
cum_long_sub = totals[totals["address"].isin(pools_of_interest)]

fig = px.area(
    cum_long_sub,
    x="block",
    y="cum_value",
    color="address",
    line_group="address",
    title="Cumulative liquidity evolution per pool",
    labels={"cum_value": "Cumulative liquidity", "address": "Pool address"},
)

# Optionally, you can also do px.line instead of px.area if you prefer lines without fill
fig = px.line(cum_long_sub, x="block", y="cum_value", color="address",
              title="Cumulative liquidity per pool")
# You can also make it not stacked (i.e. overlayed) by doing:
# fig = px.area(
#     cum_long_sub,
#     x="block",
#     y="cum_value",
#     color="address",
#     line_group="address",
#     facet_col=None,
#     # maybe set `groupnorm=None` or other arguments
# )

fig.update_layout(legend_title="Pool address")
fig.show()

In [None]:
def build_block_filter(block_start=None, block_end=None):
    if block_start is not None and block_end is not None:
        return f"AND block_number BETWEEN {block_start} AND {block_end}"
    if block_start is not None:
        return f"AND block_number >= {block_start}"
    if block_end is not None:
        return f"AND block_number <= {block_end}"
    return ""


def calculate_pool_liquidity_python_optimized(
    db_path, pair_address, block_start=None, block_end=None
):
    pair_address = w3.to_checksum_address(pair_address)
    block_filter = build_block_filter(block_start, block_end)

    query = f"""
    SELECT 
        block_number AS block,
        from_address AS provider,
        -CAST(value AS DOUBLE) AS delta
    FROM transfer
    WHERE pair_address = '{pair_address}'
        AND from_address != '0x0000000000000000000000000000000000000000'
        {block_filter}
    
    UNION ALL
    
    SELECT 
        block_number AS block,
        to_address AS provider,
        CAST(value AS DOUBLE) AS delta
    FROM transfer
    WHERE pair_address = '{pair_address}'
        AND to_address != '0x0000000000000000000000000000000000000000'
        {block_filter}
    ORDER BY block, provider
    """

    with duckdb.connect(db_path, read_only=True) as conn:
        df = conn.execute(query).fetch_df()

    if df.empty:
        return pd.DataFrame()

    df_grouped = df.groupby(["block", "provider"], as_index=False)["delta"].sum()
    df_grouped = df_grouped.sort_values(["provider", "block"])
    df_grouped["cum_provider"] = df_grouped.groupby("provider")["delta"].cumsum()

    all_blocks = np.sort(df_grouped["block"].unique())
    all_providers = df_grouped["provider"].unique()

    provider_histories = []

    for provider in all_providers:
        provider_data = df_grouped[df_grouped["provider"] == provider].copy()
        first_block = provider_data["block"].min()

        provider_blocks = provider_data["block"].values
        provider_balances = provider_data["cum_provider"].values

        for block in all_blocks:
            if block >= first_block:
                idx = np.searchsorted(provider_blocks, block, side="right") - 1
                if idx >= 0:
                    balance = provider_balances[idx]
                    if balance > 1e-8:
                        provider_histories.append(
                            {
                                "block": block,
                                "provider": provider,
                                "cum_provider": balance,
                            }
                        )

    df_full = pd.DataFrame(provider_histories)

    if df_full.empty:
        return df_full

    pool_per_block = df_full.groupby("block", as_index=False)["cum_provider"].sum()
    pool_per_block.rename(columns={"cum_provider": "cum_pool"}, inplace=True)

    df_full = df_full.merge(pool_per_block, on="block", how="left")

    df_full["share_pct"] = np.where(
        df_full["cum_pool"].abs() < 1e-10,
        0.0,
        (df_full["cum_provider"] / df_full["cum_pool"] * 100),
    )
    df_full["share_pct"] = df_full["share_pct"].clip(0, 100)

    df_full = df_full[df_full["share_pct"] >= 0.1].copy()

    df_full["provider_label"] = df_full["provider"].apply(create_provider_label)

    return df_full.sort_values(["block", "provider"]).reset_index(drop=True)


def validate_liquidity_data(db_path, pair_address, block_start=None, block_end=None):
    pair_address = w3.to_checksum_address(pair_address)
    block_filter = build_block_filter(block_start, block_end)

    query = f"""
    WITH mints AS (
        SELECT 
            block_number,
            SUM(CAST(value AS DOUBLE)) AS minted
        FROM transfer
        WHERE pair_address = '{pair_address}'
            AND from_address = '0x0000000000000000000000000000000000000000'
            {block_filter}
        GROUP BY block_number
    ),
    burns AS (
        SELECT 
            block_number,
            SUM(CAST(value AS DOUBLE)) AS burned
        FROM transfer
        WHERE pair_address = '{pair_address}'
            AND to_address = '0x0000000000000000000000000000000000000000'
            {block_filter}
        GROUP BY block_number
    ),
    all_blocks AS (
        SELECT DISTINCT block_number FROM mints
        UNION
        SELECT DISTINCT block_number FROM burns
    ),
    supply_tracking AS (
        SELECT 
            ab.block_number,
            COALESCE(m.minted, 0) AS minted,
            COALESCE(b.burned, 0) AS burned,
            SUM(COALESCE(m.minted, 0) - COALESCE(b.burned, 0)) 
                OVER (ORDER BY ab.block_number) AS cumulative_supply
        FROM all_blocks ab
        LEFT JOIN mints m ON ab.block_number = m.block_number
        LEFT JOIN burns b ON ab.block_number = b.block_number
    )
    SELECT * FROM supply_tracking ORDER BY block_number
    """

    with duckdb.connect(db_path, read_only=True) as conn:
        validation_df = conn.execute(query).fetch_df()

    return validation_df


def print_liquidity_validation(db_path, pair_address, block_start=None, block_end=None):
    validation_df = validate_liquidity_data(
        db_path, pair_address, block_start, block_end
    )

    if validation_df.empty:
        print("⚠️  No liquidity events found")
        return

    print("\n" + "=" * 60)
    print("LIQUIDITY VALIDATION")
    print("=" * 60)

    total_minted = validation_df["minted"].sum()
    total_burned = validation_df["burned"].sum()
    final_supply = validation_df["cumulative_supply"].iloc[-1]

    print(f"Total LP Tokens Minted: {total_minted:,.6f}")
    print(f"Total LP Tokens Burned: {total_burned:,.6f}")
    print(f"Net Supply (Current): {final_supply:,.6f}")
    print(f"First Event Block: {validation_df['block_number'].min()}")
    print(f"Last Event Block: {validation_df['block_number'].max()}")
    print(f"Total Events: {len(validation_df)}")

    if final_supply < 1000:
        print(
            "⚠️  Warning: Supply seems low. First 1000 wei should be permanently locked."
        )

    print("=" * 60)


def get_mint_burn_summary(db_path, pair_address, block_start=None, block_end=None):
    pair_address = w3.to_checksum_address(pair_address)
    block_filter = build_block_filter(block_start, block_end)

    query = f"""
    WITH mint_events AS (
        SELECT 
            t.block_number,
            t.to_address AS provider,
            CAST(t.value AS DOUBLE) AS lp_tokens,
            m.amount0,
            m.amount1
        FROM transfer t
        LEFT JOIN mint m ON t.transaction_hash = m.transaction_hash 
            AND t.pair_address = m.pair_address
        WHERE t.pair_address = '{pair_address}'
            AND t.from_address = '0x0000000000000000000000000000000000000000'
            AND t.to_address != '0x0000000000000000000000000000000000000000'
            {block_filter.replace('block_number', 't.block_number') if block_filter else ''}
    ),
    burn_events AS (
        SELECT 
            t.block_number,
            t.from_address AS provider,
            CAST(t.value AS DOUBLE) AS lp_tokens,
            b.amount0,
            b.amount1
        FROM transfer t
        LEFT JOIN burn b ON t.transaction_hash = b.transaction_hash 
            AND t.pair_address = b.pair_address
        WHERE t.pair_address = '{pair_address}'
            AND t.to_address = '0x0000000000000000000000000000000000000000'
            AND t.from_address != '0x0000000000000000000000000000000000000000'
            {block_filter.replace('block_number', 't.block_number') if block_filter else ''}
    )
    SELECT 
        'MINT' AS event_type,
        COUNT(*) AS event_count,
        SUM(lp_tokens) AS total_lp_tokens,
        SUM(CAST(amount0 AS DOUBLE)) AS total_amount0,
        SUM(CAST(amount1 AS DOUBLE)) AS total_amount1
    FROM mint_events
    
    UNION ALL
    
    SELECT 
        'BURN' AS event_type,
        COUNT(*) AS event_count,
        SUM(lp_tokens) AS total_lp_tokens,
        SUM(CAST(amount0 AS DOUBLE)) AS total_amount0,
        SUM(CAST(amount1 AS DOUBLE)) AS total_amount1
    FROM burn_events
    """

    with duckdb.connect(db_path, read_only=True) as conn:
        summary_df = conn.execute(query).fetch_df()

    return summary_df


def print_mint_burn_summary(db_path, pair_address, block_start=None, block_end=None):
    summary_df = get_mint_burn_summary(db_path, pair_address, block_start, block_end)

    print("\n" + "=" * 60)
    print("MINT/BURN SUMMARY")
    print("=" * 60)

    for _, row in summary_df.iterrows():
        print(f"\n{row['event_type']} Events:")
        print(f"  Count: {int(row['event_count'])}")
        print(f"  Total LP Tokens: {row['total_lp_tokens']:,.6f}")
        if row["total_amount0"] is not None:
            print(f"  Total Token0: {row['total_amount0']:,.6f}")
        if row["total_amount1"] is not None:
            print(f"  Total Token1: {row['total_amount1']:,.6f}")

    print("=" * 60)


def analyze_pool_liquidity(
    db_path,
    pair_address,
    block_start=None,
    block_end=None,
    show_plots=True,
    show_validation=True,
):
    pair_address = w3.to_checksum_address(pair_address)

    if show_validation:
        print_liquidity_validation(db_path, pair_address, block_start, block_end)
        print_mint_burn_summary(db_path, pair_address, block_start, block_end)

    print("\nCalculating pool liquidity distribution...")
    liquidity_df = calculate_pool_liquidity_python_optimized(
        db_path=db_path,
        pair_address=pair_address,
        block_start=block_start,
        block_end=block_end,
    )

    if liquidity_df.empty:
        print("⚠️  No liquidity data found for this pair")
        return None, None

    print(f"Total rows in liquidity data: {len(liquidity_df)}")
    print(
        f"Block range: {liquidity_df['block'].min()} to {liquidity_df['block'].max()}"
    )
    print(f"Number of unique providers: {liquidity_df['provider'].nunique()}")

    if show_plots:
        print("\nGenerating percentage ownership chart...")
        fig_pct = plot_staircase_ownership(liquidity_df)
        fig_pct.show()

        print("Generating absolute liquidity chart...")
        fig_abs = plot_absolute_liquidity_staircase(liquidity_df)
        fig_abs.show()

        print("Generating concentration analysis...")
        fig_conc, concentration_metrics = plot_ownership_concentration(liquidity_df)
        fig_conc.show()

        print("\nGenerating current ownership snapshot (bubble chart)...")
        fig_bubble = plot_bubble_ownership_snapshot(liquidity_df)
        fig_bubble.show()
    else:
        _, concentration_metrics = plot_ownership_concentration(liquidity_df)

    print_liquidity_summary(liquidity_df)
    print_concentration_summary(concentration_metrics)

    return liquidity_df, concentration_metrics


def create_provider_label(address):
    checksum_addr = w3.to_checksum_address(address)
    short_addr = f"{checksum_addr[:6]}...{checksum_addr[-4:]}"
    return short_addr


def add_million_block_markers(fig, min_block, max_block):
    start = (min_block // 1_000_000) * 1_000_000
    end = (max_block // 1_000_000 + 1) * 1_000_000 + 1

    for million_block in range(start, end, 1_000_000):
        if min_block <= million_block <= max_block:
            fig.add_vline(
                x=million_block,
                line_width=2,
                line_dash="dash",
                line_color="black",
                opacity=0.4,
                annotation_text=f"{million_block / 1_000_000:.0f}M",
                annotation_position="top",
                annotation_font_size=12,
            )


def plot_staircase_ownership(df):
    fig = go.Figure()
    providers = sorted(df["provider_label"].unique())

    for provider in providers:
        provider_data = df[df["provider_label"] == provider].sort_values("block")

        fig.add_trace(
            go.Scatter(
                x=provider_data["block"],
                y=provider_data["share_pct"],
                name=provider,
                mode="lines",
                line=dict(width=0.5, shape="hv"),
                stackgroup="one",
                groupnorm="",
                hovertemplate="<b>%{fullData.name}</b><br>Block: %{x}<br>Share: %{y:.4f}%<extra></extra>",
            )
        )

    add_million_block_markers(fig, df["block"].min(), df["block"].max())

    fig.update_layout(
        title="Pool Ownership Distribution (Staircase View)",
        hovermode="x",
        yaxis_title="Ownership Share (%)",
        xaxis_title="Block Number",
        legend=dict(
            title="Provider",
            orientation="v",
            yanchor="top",
            y=1,
            xanchor="left",
            x=1.02,
        ),
        yaxis=dict(range=[0, 100]),
    )

    return fig


def plot_absolute_liquidity_staircase(df):
    fig = go.Figure()
    providers = sorted(df["provider_label"].unique())

    for provider in providers:
        provider_data = df[df["provider_label"] == provider].sort_values("block")

        fig.add_trace(
            go.Scatter(
                x=provider_data["block"],
                y=provider_data["cum_provider"],
                name=provider,
                mode="lines",
                line=dict(width=0.5, shape="hv"),
                stackgroup="one",
                hovertemplate="<b>%{fullData.name}</b><br>Block: %{x}<br>Amount: %{y:.6f}<extra></extra>",
            )
        )

    add_million_block_markers(fig, df["block"].min(), df["block"].max())

    fig.update_layout(
        title="Pool Liquidity by Provider (Absolute Values)",
        hovermode="x",
        yaxis_title="Liquidity Amount (Token Units)",
        xaxis_title="Block Number",
        legend=dict(
            title="Provider",
            orientation="v",
            yanchor="top",
            y=1,
            xanchor="left",
            x=1.02,
        ),
    )

    return fig


def calculate_hhi_metrics(df):
    share_clean = np.where(
        np.isinf(df["share_pct"]) | np.isnan(df["share_pct"]), 0, df["share_pct"]
    )
    df = df.assign(share_pct_clean=share_clean)

    hhi_agg = (
        df.groupby("block")
        .agg(
            hhi=("share_pct_clean", lambda x: (x**2).sum()),
            active_providers=("share_pct_clean", lambda x: (x > 0.01).sum()),
        )
        .reset_index()
    )

    return hhi_agg


def add_hhi_zones(fig):
    zones = [
        (0, 1500, "green", "Competitive"),
        (1500, 2500, "yellow", "Moderate"),
        (2500, 10000, "red", "Concentrated"),
    ]

    for y0, y1, color, label in zones:
        fig.add_hrect(
            y0=y0,
            y1=y1,
            fillcolor=color,
            opacity=0.1,
            annotation_text=label,
            secondary_y=False,
        )


def plot_ownership_concentration(df):
    hhi_df = calculate_hhi_metrics(df)

    fig = make_subplots(specs=[[{"secondary_y": True}]])

    fig.add_trace(
        go.Scatter(
            x=hhi_df["block"],
            y=hhi_df["hhi"],
            name="HHI (Concentration)",
            line=dict(color="#F46821", width=2),
        ),
        secondary_y=False,
    )

    fig.add_trace(
        go.Scatter(
            x=hhi_df["block"],
            y=hhi_df["active_providers"],
            name="Active Providers",
            line=dict(color="#29BEFD", width=2),
        ),
        secondary_y=True,
    )

    fig.update_layout(title="Pool Concentration Analysis", hovermode="x unified")
    fig.update_xaxes(title_text="Block Number")
    fig.update_yaxes(title_text="HHI Score", secondary_y=False)
    fig.update_yaxes(title_text="Number of Providers", secondary_y=True)

    add_hhi_zones(fig)

    return fig, hhi_df


def get_concentration_status(hhi):
    if hhi < 1500:
        return "✅ COMPETITIVE (Decentralized)"
    elif hhi < 2500:
        return "⚠️  MODERATE CONCENTRATION"
    else:
        return "🔴 HIGHLY CONCENTRATED"


def print_liquidity_summary(df):
    max_block = df["block"].max()

    summary = (
        df.groupby(["provider", "provider_label"])["cum_provider"]
        .last()
        .sort_values(ascending=False)
    )

    print("\n" + "=" * 60)
    print("LIQUIDITY SUMMARY")
    print("=" * 60)

    for (provider, label), amount in summary.items():
        provider_checksum = w3.to_checksum_address(provider)
        final_data = df[
            (df["provider"] == provider_checksum) & (df["block"] == max_block)
        ]

        if not final_data.empty:
            share = final_data["share_pct"].values[0]
            print(f"{label}: {amount:.6f} tokens ({share:.2f}% of pool)")
        else:
            print(f"{label}: {amount:.6f} tokens (exited)")


def print_concentration_summary(hhi_df):
    print("\n" + "=" * 60)
    print("CONCENTRATION METRICS")
    print("=" * 60)
    print(f"Average HHI: {hhi_df['hhi'].mean():.2f}")
    print(f"Current HHI: {hhi_df['hhi'].iloc[-1]:.2f}")
    print(f"Max providers at any block: {hhi_df['active_providers'].max()}")
    print(f"Current active providers: {hhi_df['active_providers'].iloc[-1]}")

    current_hhi = hhi_df["hhi"].iloc[-1]
    status = get_concentration_status(current_hhi)
    print(f"Pool status: {status}")
    print("=" * 60)


def plot_bubble_ownership_snapshot(df):
    latest_block = df["block"].max()
    latest_data = df[df["block"] == latest_block].copy()
    latest_data = latest_data.sort_values("share_pct", ascending=False)

    n_providers = len(latest_data)
    cols = int(np.ceil(np.sqrt(n_providers)))

    latest_data["x_pos"] = latest_data.index % cols
    latest_data["y_pos"] = latest_data.index // cols

    fig = go.Figure()

    fig.add_trace(
        go.Scatter(
            x=latest_data["x_pos"],
            y=latest_data["y_pos"],
            mode="markers",
            marker=dict(
                size=latest_data["share_pct"] * 15,
                sizemode="diameter",
                sizemin=30,
                color=latest_data["share_pct"],
                colorscale="RdYlGn_r",
                showscale=True,
                colorbar=dict(title="Share (%)", thickness=20, len=0.7),
                line=dict(color="darkgray", width=3),
                opacity=0.8,
            ),
            hovertemplate=(
                "<b>%{customdata[0]}</b><br>"
                "Share: %{customdata[1]:.4f}%<br>"
                "LP Tokens: %{customdata[2]:.6f}"
                "<extra></extra>"
            ),
            customdata=latest_data[
                ["provider_label", "share_pct", "cum_provider"]
            ].values,
        )
    )

    fig.update_layout(
        title=f"Pool Ownership Snapshot at Block {latest_block:,}",
        xaxis=dict(visible=False, range=[-0.5, cols - 0.5]),
        yaxis=dict(visible=False, scaleanchor="x", scaleratio=1),
        height=600,
        width=800,
        showlegend=False,
        hovermode="closest",
        plot_bgcolor="white",
    )

    return fig

In [None]:
# This is all you need for the liquidity graphs we built earlier
pair_address = token_filter[0]  # Your pair
df = load_pair_data_for_analysis(pair_address, START_BLOCK, END_BLOCK)

# Then use the existing analysis function
liquidity_df, concentration_metrics = analyze_pool_liquidity(
    db_path=DB_PATH,
    pair_address=pair_address,
    block_start=START_BLOCK,
    block_end=END_BLOCK,
    show_plots=True,
    show_validation=True,
)