In [1]:
pip install web3 requests

Collecting web3
  Downloading web3-7.13.0-py3-none-any.whl.metadata (5.6 kB)
Collecting eth-abi>=5.0.1 (from web3)
  Downloading eth_abi-5.2.0-py3-none-any.whl.metadata (3.8 kB)
Collecting eth-account>=0.13.6 (from web3)
  Downloading eth_account-0.13.7-py3-none-any.whl.metadata (3.7 kB)
Collecting eth-hash>=0.5.1 (from eth-hash[pycryptodome]>=0.5.1->web3)
  Downloading eth_hash-0.7.1-py3-none-any.whl.metadata (4.2 kB)
Collecting eth-typing>=5.0.0 (from web3)
  Downloading eth_typing-5.2.1-py3-none-any.whl.metadata (3.2 kB)
Collecting eth-utils>=5.0.0 (from web3)
  Downloading eth_utils-5.3.0-py3-none-any.whl.metadata (5.7 kB)
Collecting hexbytes>=1.2.0 (from web3)
  Downloading hexbytes-1.3.1-py3-none-any.whl.metadata (3.3 kB)
Collecting types-requests>=2.0.0 (from web3)
  Downloading types_requests-2.32.4.20250809-py3-none-any.whl.metadata (2.0 kB)
Collecting pyunormalize>=15.0.0 (from web3)
  Downloading pyunormalize-16.0.0-py3-none-any.whl.metadata (4.0 kB)
Collecting parsimonious<

In [8]:
!pip -q install eth-abi==4.2.1 web3==6.20.1 pandas==2.2.2

In [10]:
from dataclasses import dataclass
from typing import Dict, List, Any, Tuple, Optional
from web3 import Web3
import json, pandas as pd, datetime, math, os, io

# ---- Helpers ----
def keccak_sig(text: str) -> str:
    return Web3.keccak(text=text).hex()

def hex_to_int(x: str) -> int:
    if x is None or x == "0x" or x == "":
        return 0
    return int(x, 16)

def hex_addr(x: str) -> Optional[str]:
    if not x or x == "0x":
        return None
    return Web3.to_checksum_address("0x"+x[-40:])

def from_wei(value: int, decimals: int) -> float:
    if decimals <= 0: return float(value)
    return value / (10 ** decimals)

# ---- Token & protocol metadata (extend freely) ----
TOKENS: Dict[str, dict] = {
    # Ethereum bluechips
    "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48": {"symbol": "USDC", "decimals": 6,  "chain": "ethereum", "type": "token"},
    "0xdac17f958d2ee523a2206206994597c13d831ec7": {"symbol": "USDT", "decimals": 6,  "chain": "ethereum", "type": "token"},
    "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599": {"symbol": "WBTC", "decimals": 8,  "chain": "ethereum", "type": "token"},
    # Uniswap V3
    "0xc36442b4a4522e871399cd717abdd847ab11fe88": {"symbol": "UNI-V3-NPM", "decimals": 0, "chain": "ethereum", "type": "dex", "name": "Uniswap V3 Positions NFT"},
    "0x3416cf6c708da44db2624d63ea0aaef7113527c6": {"symbol": "UNI-V3-POOL", "decimals": 0, "chain": "ethereum", "type": "dex", "name": "Uniswap V3 Pool (USDC/USDT 0.05%)"},
    "0x000000000022d473030f116ddee9f6b43ac78ba3": {"symbol": "PERMIT2", "decimals": 0, "chain": "ethereum", "type": "system", "name": "Uniswap Permit2"},
    # Aave V3 core
    "0x87870bca3f3fd6335c3f4ce8392d69350b4fa4e2": {"symbol": "AAVE-V3-POOL", "decimals": 0, "chain": "ethereum", "type": "lending", "name": "Aave V3 Pool"},
    "0x98c23e9d8f34fefb1b7bd6a91b7ff122f4e16f5c": {"symbol": "aUSDC", "decimals": 6,  "chain": "ethereum", "type": "lending", "name": "Aave aUSDC v3"},
    # Polygon native pseudo address (if your sample is Polygon):
    "0x0000000000000000000000000000000000001010": {"symbol": "MATIC", "decimals": 18, "chain": "polygon",  "type": "system", "name": "Polygon Native Token Pseudo-Address"},
}

PROTOCOLS: Dict[str, dict] = {
    "0xc36442b4a4522e871399cd717abdd847ab11fe88": {"protocol": "Uniswap V3", "type": "dex"},
    "0x3416cf6c708da44db2624d63ea0aaef7113527c6": {"protocol": "Uniswap V3", "type": "dex"},
    "0x87870bca3f3fd6335c3f4ce8392d69350b4fa4e2": {"protocol": "Aave V3",   "type": "lending"},
    "0x98c23e9d8f34fefb1b7bd6a91b7ff122f4e16f5c": {"protocol": "Aave V3",   "type": "lending"},
    "0x000000000022d473030f116ddee9f6b43ac78ba3": {"protocol": "Permit2",  "type": "system"},
}

# ---- Build the (multi-protocol) event signature DB (Phase 2) ----
def build_signature_map():
    sigs = {
        # ERC-20 / ERC-721
        "Transfer(address,address,uint256)":                 ["Transfer",        ["address","address","uint256"]],
        "Approval(address,address,uint256)":                 ["Approval",        ["address","address","uint256"]],
        # Uniswap V2 Pair
        "Swap(address,uint256,uint256,uint256,uint256,address)":["SwapV2",       ["address","uint256","uint256","uint256","uint256","address"]],
        "Sync(uint112,uint112)":                             ["Sync",            ["uint112","uint112"]],
        "Mint(address,uint256,uint256)":                     ["MintV2",          ["address","uint256","uint256"]],
        "Burn(address,uint256,uint256,address)":             ["BurnV2",          ["address","uint256","uint256","address"]],
        # Uniswap V3 (Pool/PositionManager)
        "Swap(address,address,int256,int256,uint160,uint128,int24)":["SwapV3",  ["address","address","int256","int256","uint160","uint128","int24"]],
        "IncreaseLiquidity(address,uint256,uint128,uint256,uint256)":["IncreaseLiquidity",["address","uint256","uint128","uint256","uint256"]],
        "DecreaseLiquidity(address,uint256,uint128,uint256,uint256)":["DecreaseLiquidity",["address","uint256","uint128","uint256","uint256"]],
        "Collect(address,address,uint256,uint256)":          ["Collect",         ["address","address","uint256","uint256"]],
        # Aave V3 core
        "Supply(address,address,address,uint256,uint16)":    ["Supply",          ["address","address","address","uint256","uint16"]],
        "Withdraw(address,address,address,uint256)":         ["Withdraw",        ["address","address","address","uint256"]],
        "Borrow(address,address,address,uint256,uint256,uint16)":["Borrow",     ["address","address","address","uint256","uint256","uint16"]],
        "Repay(address,address,address,uint256,bool)":       ["Repay",           ["address","address","address","uint256","bool"]],
        "FlashLoan(address,address,address,uint256,uint256,uint16)":["FlashLoan",["address","address","address","uint256","uint256","uint16"]],
        "LiquidationCall(address,address,address,uint256,uint256,address,bool)":["LiquidationCall",["address","address","address","uint256","uint256","address","bool"]],
        # ERC-1155 (sometimes shows up)
        "TransferSingle(address,address,address,uint256,uint256)":["TransferSingle",["address","address","address","uint256","uint256"]],
        "TransferBatch(address,address,address,uint256[],uint256[])":["TransferBatch",["address","address","address","uint256[]","uint256[]"]],
        # WETH-like
        "Deposit(address,address,uint256)":                  ["Deposit",         ["address","address","uint256"]],
        "Withdrawal(address,address,uint256,uint256)":       ["Withdrawal",      ["address","address","uint256","uint256"]],
    }
    topic_to_sig = {}
    support_specs = {}
    for proto, (name, inputs) in sigs.items():
        t0 = keccak_sig(proto)
        topic_to_sig[t0] = name
        support_specs[name] = {"proto": proto, "inputs": inputs}
    return topic_to_sig, support_specs

TOPIC_TO_SIG, SUPPORTED_SPECS = build_signature_map()

In [11]:
# upload the sample.json
SAMPLE_PATH = "sample.json"   # change if you used files.upload()

raw = json.load(open(SAMPLE_PATH))
logs = raw["logs"] if isinstance(raw, dict) and "logs" in raw else raw

# optional focal wallet (improves human-readable strings)
WALLET = (raw.get("wallet") if isinstance(raw, dict) else None) or "0xBF0eCCD64bB1b5Ff949f55467E5BBE4376587c23"

print("Total logs loaded:", len(logs))
print("Wallet focus:", WALLET)
# Show one example
print(json.dumps(logs[0], indent=2)[:800])


Total logs loaded: 43
Wallet focus: 0xBF0eCCD64bB1b5Ff949f55467E5BBE4376587c23
{
  "removed": false,
  "logIndex": 82,
  "transactionIndex": 513,
  "transactionHash": "0x9ee0c523f0b9f856cb1d75dc62075be0d0e6d83ffaf3cd1627e6e49c3e734f7a",
  "blockHash": "0x9786b55baa529eceadfe57d855d11c6d54aba94bc440a2a4b823880332ef2d94",
  "blockNumber": 19003465,
  "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
  "data": "0x000000000000000000000000000000000000000000000000000000002cb41780",
  "topics": [
    "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
    "0x000000000000000000000000bf0eccd64bb1b5ff949f55467e5bbe4376587c23",
    "0x00000000000000000000000016786ffbd087684b0c09d6e66f91c71c7d722365",
    null
  ]
}


In [12]:
def classify_protocol(addr: str) -> Tuple[str, str]:
    a = addr.lower()
    if a in PROTOCOLS:
        info = PROTOCOLS[a]
        return info.get("protocol","Unknown"), info.get("type","unknown")
    if a in TOKENS:
        typ = TOKENS[a].get("type","token")
        # Tokens/system pseudo-addrs are tagged as such
        return TOKENS[a].get("symbol","Token"), typ
    # Heuristics upgraded later by event type (e.g., SwapV3 => DEX)
    return "Unknown","unknown"

# Build address → protocol table
addr_rows = {}
for lg in logs:
    a = lg.get("address","").lower()
    proto, ptype = classify_protocol(a)
    if a not in addr_rows: addr_rows[a] = {"address": a, "protocol": proto, "type": ptype}

addr_df = pd.DataFrame(addr_rows.values()).sort_values(["type","protocol","address"]).reset_index(drop=True)
addr_df.head(10)


Unnamed: 0,address,protocol,type
0,0xc36442b4a4522e871399cd717abdd847ab11fe88,Uniswap V3,dex
1,0x87870bca3f3fd6335c3f4ce8392d69350b4fa4e2,Aave V3,lending
2,0x98c23e9d8f34fefb1b7bd6a91b7ff122f4e16f5c,Aave V3,lending
3,0x000000000022d473030f116ddee9f6b43ac78ba3,Permit2,system
4,0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48,USDC,token
5,0xdac17f958d2ee523a2206206994597c13d831ec7,USDT,token
6,0x2260fac5e5542a773aa44fbcfedf7c193bc2c599,WBTC,token
7,0x0a992d191deec32afe36203ad87d7d289a738f81,Unknown,unknown
8,0x39053d51b77dc0d36036fc1fcc8cb819df8ef37a,Unknown,unknown
9,0x4e502ab1bb313b3c1311eb0d11b31a6b62988b86,Unknown,unknown


In [13]:
# Extract all topic0 values and map to names (with fallback)
topic_count = {}
sig_seen = {}
for lg in logs:
    topics = lg.get("topics") or []
    if not topics: continue
    t0 = topics[0]
    topic_count[t0] = topic_count.get(t0, 0) + 1
    sig_seen[t0] = TOPIC_TO_SIG.get(t0, "Unknown")

event_signature_map = {t0: sig_seen[t0] for t0 in sorted(sig_seen)}
print("Unique signatures seen:", len(event_signature_map))

# Save
os.makedirs("out", exist_ok=True)
json.dump(event_signature_map, open("out/event_signature_map.json","w"), indent=2)
json.dump(SUPPORTED_SPECS,   open("out/supported_signatures.json","w"), indent=2)

list(event_signature_map.items())[:8]


Unique signatures seen: 16


[('0x00058a56ea94653cdf4f152d227ace22d4c00ad99e2a43f58cb7d9e3feb295f2',
  'Unknown'),
 ('0x1b2a7ff080b8cb6ff436ce0372e399692bbfb6d4ae5766fd8d58a7b8cc6142e6',
  'Unknown'),
 ('0x25b428dfde728ccfaddad7e29e4ac23c24ed7fd1a6e3e3f91894a9a073f5dfff',
  'Unknown'),
 ('0x2b627736bca15cd5381dcf80b0bf11fd197d01a037c52b927a881a10fb73ba61',
  'Unknown'),
 ('0x2db5ddd0b42bdbca0d69ea16f234a870a485854ae0d91f16643d6f317d8b8994',
  'Unknown'),
 ('0x3115d1449a7b732c986cba18244e897a450f61e1bb8d589cd2e69e6c8924f9f7',
  'Unknown'),
 ('0x44c58d81365b66dd4b1a7f36c25aa97b8c71c361ee4937adc1a00000227db5dd',
  'Unknown'),
 ('0x458f5fa412d0f69b08dd84872b0215675cc67bc1d5b6fd93300a1c3878b86196',
  'Unknown')]

In [21]:
# phase 3
@dataclass
class DecodedEvent:
    tx: str
    block: int
    logIndex: int
    contract: str
    protocol: str
    protocolType: str
    eventName: str
    eventSignature: str
    decoded: Dict[str, Any]
    human: Dict[str, Any]

def decode_erc20_like(topics: List[str], data_hex: str, contract: str, wallet_focus: Optional[str]):
    from_addr = hex_addr(topics[1]) if len(topics) > 1 else None
    to_addr   = hex_addr(topics[2]) if len(topics) > 2 else None
    value     = hex_to_int(data_hex)
    meta = TOKENS.get(contract.lower(), {"symbol":"TOKEN", "decimals":18})
    return (
        {"from": from_addr, "to": to_addr, "value": str(value)},
        {
            "from": from_addr and (from_addr[:6]+"..."+from_addr[-4:]),
            "to":   to_addr   and (to_addr[:6]+"..."+to_addr[-4:]),
            "amount": f"{from_wei(value, meta.get('decimals',18)):.6f}",
            "token":  meta.get("symbol","TOKEN"),
            "involvesWallet": wallet_focus in (from_addr, to_addr) if wallet_focus else False
        }
    )

def decode_aave_core(name: str, topics: List[str], data_hex: str, contract: str, wallet_focus: Optional[str]):
    user = hex_addr(topics[2]) if len(topics) > 2 else (hex_addr(topics[1]) if len(topics) > 1 else None)
    # Non-indexed amount is first 32 bytes in data for several Aave core events
    amount = hex_to_int(data_hex[:66]) if data_hex and len(data_hex) >= 66 else hex_to_int(data_hex)
    meta = TOKENS.get(contract.lower(), {"symbol":"", "decimals":18})
    return (
        {"user": user, "amount": str(amount)},
        {
            "action": f"{name} on Aave V3",
            "user": user and (user[:6]+"..."+user[-4:]),
            "amount": f"{from_wei(amount, meta.get('decimals',18)):.6f}",
            "token":  meta.get("symbol","")
        }
    )

def build_human_action(event_name: str, contract: str, decoded: Dict[str,Any], protocol: str):
    if event_name in ("Transfer","Approval"):
        token = TOKENS.get(contract.lower(), {}).get("symbol","TOKEN")
        pretty = {"action": f"{event_name}", "token": token}
        pretty.update({k:v for k,v in decoded.items() if k in ("from","to","value")})
        return pretty
    return {"action": f"{event_name} on {protocol}"}

def decode_all(logs: List[dict], wallet_focus: Optional[str]=None):
    decoded: List[DecodedEvent] = []
    protocol_usage: Dict[str, Dict[str,int]] = {}
    event_counts: Dict[str, int] = {}
    seen_sig_dump = {}

    for lg in logs:
        addr = lg.get("address")
        topics = lg.get("topics") or []
        data   = lg.get("data","0x")
        tx     = lg.get("transactionHash")
        block  = lg.get("blockNumber")
        idx    = lg.get("logIndex")
        t0     = topics[0] if topics else None

        proto, ptype = classify_protocol(addr)
        name = TOPIC_TO_SIG.get(t0, "Unknown")

        # Heuristic upgrade of protocol classification from event
        if proto == "Unknown":
            if name in ("SwapV2","SwapV3","IncreaseLiquidity","DecreaseLiquidity","Collect"):
                proto, ptype = "Uniswap", "dex"
            elif name in ("Supply","Withdraw","Borrow","Repay","FlashLoan","LiquidationCall"):
                proto, ptype = "Aave", "lending"
            elif addr.lower() in TOKENS:
                proto, ptype = TOKENS[addr.lower()].get("symbol","Token"), TOKENS[addr.lower()].get("type","token")

        # Decode
        d_dec, d_hum = {}, {}
        if name in ("Transfer","Approval"):
            d_dec, d_hum = decode_erc20_like(topics, data, addr, wallet_focus)
        elif name in ("Supply","Withdraw","Borrow","Repay","FlashLoan","LiquidationCall"):
            d_dec, d_hum = decode_aave_core(name, topics, data, addr, wallet_focus)
        else:
            d_dec = {"topics": topics, "data": data}
            d_hum = build_human_action(name, addr, d_dec, proto)

        # Tallies
        protocol_usage.setdefault(ptype, {}).setdefault(proto, 0)
        protocol_usage[ptype][proto] += 1
        event_counts[name] = event_counts.get(name, 0) + 1
        seen_sig_dump[t0] = name

        decoded.append(DecodedEvent(
            tx=tx, block=block, logIndex=idx, contract=addr,
            protocol=proto, protocolType=ptype, eventName=name,
            eventSignature=t0, decoded=d_dec, human=d_hum
        ))
    return decoded, protocol_usage, event_counts, seen_sig_dump

decoded, protocol_usage, event_counts, sig_dump = decode_all(logs, wallet_focus=WALLET)

len(decoded), list(event_counts.items())[:8], {k: list(v.items())[:3] for k,v in protocol_usage.items()}

(43,
 [('Unknown', 43)],
 {'token': [('USDC', 13), ('USDT', 5), ('WBTC', 5)],
  'unknown': [('Unknown', 10)],
  'dex': [('Uniswap V3', 1)],
  'lending': [('Aave V3', 8)],
  'system': [('Permit2', 1)]})

In [20]:
# Phase 4
def to_output(decoded, protocol_usage, event_counts):
    events_by_protocol = {"lending": [], "dex": [], "system": [], "token": [], "unknown": []}
    for ev in decoded:
        bucket = ev.protocolType if ev.protocolType in events_by_protocol else "unknown"
        events_by_protocol[bucket].append({
            "transactionHash": ev.tx,
            "blockNumber": ev.block,
            "logIndex": ev.logIndex,
            "protocol": ev.protocol,
            "protocolType": ev.protocolType,
            "contractAddress": ev.contract,
            "eventName": ev.eventName,
            "eventSignature": ev.eventSignature,
            "decodedData": ev.decoded,
            "humanReadable": ev.human,
        })
    summary = {
        "totalLogsProcessed": len(decoded),
        "totalEventsDecoded": sum(1 for e in decoded if e.eventName != "Unknown"),
        "protocolsIdentified": protocol_usage,
        "eventTypeDistribution": event_counts,
        "processingTimestamp": datetime.datetime.utcnow().isoformat() + "Z"
    }
    return {"summary": summary, "eventsByProtocol": events_by_protocol}

os.makedirs("out", exist_ok=True)
json.dump({a:v for a,v in protocol_usage.items()}, open("out/protocol_usage.json","w"), indent=2)
json.dump(sig_dump, open("out/event_signature_map.json","w"), indent=2)
# also persist the address map (Phase 1 deliverable)
addr_df.to_csv("out/address_protocol_mapping.csv", index=False)

output = to_output(decoded, protocol_usage, event_counts)
json.dump(output, open("out/decoded_output.json","w"), indent=2)

print(json.dumps(output["summary"], indent=2))

{
  "totalLogsProcessed": 43,
  "totalEventsDecoded": 0,
  "protocolsIdentified": {
    "token": {
      "USDC": 13,
      "USDT": 5,
      "WBTC": 5
    },
    "unknown": {
      "Unknown": 10
    },
    "dex": {
      "Uniswap V3": 1
    },
    "lending": {
      "Aave V3": 8
    },
    "system": {
      "Permit2": 1
    }
  },
  "eventTypeDistribution": {
    "Unknown": 43
  },
  "processingTimestamp": "2025-08-21T14:34:30.476900Z"
}


  "processingTimestamp": datetime.datetime.utcnow().isoformat() + "Z"


In [18]:
# Sanity checks to ensure nothing crashes on malformed entries
def _smoketest_first_entry():
    lg = logs[0].copy()
    lg["topics"] = []
    res = decode_all([lg], wallet_focus=WALLET)
    assert res[0] and isinstance(res[1], dict)

_smoketest_first_entry()
print("Smoke test OK")

# Expected: ERC-20 Transfer topic is recognized
transfer_topic = keccak_sig("Transfer(address,address,uint256)")
assert TOPIC_TO_SIG[transfer_topic] == "Transfer"
print("Signature DB OK")

Smoke test OK
Signature DB OK
