In [None]:
import json
import sys
from pathlib import Path

In [None]:
import pandas as pd
from sqlalchemy import create_engine, text

In [None]:
from web3 import Web3

In [None]:
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

In [None]:
ROOT = Path.cwd().parents[1]
sys.path.append(str(ROOT))

from indexer_engine.app.config import settings

In [None]:
SYNC_DB_URL = settings.sync_database_url
engine = create_engine(SYNC_DB_URL, echo=False, future=True)

In [None]:
query = text("""
    SELECT
        chain_id,
        block_number,
        transaction_hash,
        transaction_index,
        log_index,
        tx_from,
        tx_to,
        tx_value,
        tx_type,
        tx_status,
        tx_gas_used,
        tx_cumulative_gas_used,
        tx_effective_gas_price,
        address,
        topic0,
        topic1,
        topic2,
        topic3,
        data
    FROM staging.evm_event_logs
    WHERE chain_id = :chain_id
    -- przykładowy filtr: konkretny kontrakt
      AND address = :contract_address
    ORDER BY block_number, log_index
    LIMIT 10_000
""")

In [None]:
chain_id = 1
contract_address = bytes.fromhex("000000000004444C5DC75CB358380D2E3DE08A90")  # bez '0x'

with engine.begin() as conn:
    df = pd.read_sql(
        query,
        conn,
        params={
            "chain_id": chain_id,
            "contract_address": contract_address,
        },
    )

df.head()

In [None]:
def evm_address_to_bytes(addr: str) -> bytes:
    addr = addr.strip()

    if addr.startswith("0x"):
        addr = addr[2:]

    if len(addr) != 40:
        raise ValueError("Invalid address length")

    return bytes.fromhex(addr)

In [None]:
ABI_PATH = Path("./PoolManager.json")

with ABI_PATH.open() as f:
    abi = json.load(f)

events = [item for item in abi if item.get("type") == "event"]

In [None]:
def event_signature(evt: dict) -> str:
    name = evt["name"]
    types = [inp["type"] for inp in evt["inputs"]]
    return f"{name}({','.join(types)})"

mapping = {}

for evt in events:
    sig = event_signature(evt)
    topic0 = Web3.keccak(text=sig)  # bytes(32)
    mapping[topic0] = {
        "event_name": evt["name"],
        "event_signature": sig,
    }

len(mapping)


In [None]:
mapping


In [None]:
def topic0_to_event_name(value) -> str | None:
    if value is None:
        return None
    # jeżeli to memoryview, konwertujemy do bytes
    if isinstance(value, memoryview):
        value = value.tobytes()
    info = mapping.get(value)
    return info["event_name"] if info else None

def topic0_to_signature(value) -> str | None:
    if value is None:
        return None
    if isinstance(value, memoryview):
        value = value.tobytes()
    info = mapping.get(value)
    return info["event_signature"] if info else None


In [None]:
df["event_name"] = df["topic0"].apply(topic0_to_event_name)
df["event_signature"] = df["topic0"].apply(topic0_to_signature)

df[["topic0", "event_name", "event_signature"]].head()


In [None]:
df.columns

In [None]:
"""
bez zmian, skopiowac ze staking:
chain_id
block_number
block_timestamp
transaction_hash
transaction_index
log_index
tx_gas_used
tx_effective_gas_price
tx_value
tx_from_address
tx_to_address
---
zmiana nazwy kolumn:
address -> contract_address
---
nowe kolumny:
event_name
event_signature
"""