### Imports

In [None]:
from web3 import Web3
import requests
import json

from web3 import AsyncWeb3
from web3.exceptions import Web3RPCError
from web3.providers.persistent import WebSocketProvider
import asyncio
import threading
from time import sleep
from typing import List
import random
import os

### Funcs

In [None]:
# 1. Load the Router contract using etherscan API
def fetch_abi(address: str) -> list:
    ETHERSCAN_API_KEY = "QGS1VSEPNEHZ8W4M686QCNM1Z6WGIT14Q1"
    url = (
      "https://api.etherscan.io/api"
      f"?module=contract&action=getabi&address={address}"
      f"&apikey={ETHERSCAN_API_KEY}"
    )
    resp = requests.get(url).json()
    return json.loads(resp["result"])


### Initializations

In [None]:
# 0. set up RPC w3 connection
api_keys = [
    "75db4a2f907d4525866a728681b3b458",
    "d3c123079991433a9117637ed32fc540",
    "056abf38c1fb4da8b3dcff03d9a32d3c",
    "9ef3857c438e4f3485bb10daf32fdfa9",
    "b461cd0decb4477396db1e04244bb1ee",
    "8c2befadd58e43329df6331f5b4fa609"
]

api_key = api_keys[3]

infura_url = f"https://mainnet.infura.io/v3/{api_key}" # select mainnet or sepolia here
w3 = Web3(Web3.HTTPProvider(infura_url))

### Keeping track & serializing all pairs

In [None]:

# Constants
PAIR_STATES_PATH    = "/Volumes/Extreme SSD/arbot_data/pair_states.json"
PAIR_ABI_PATH = "/Volumes/Extreme SSD/arbot_data/pair_abi.json"
DEPLOY_BLOCK = 10_008_355
sleep_time: float = 1.0
pair_created_topic  = w3.keccak(text="PairCreated(address,address,address,uint256)")
sync_topic          = w3.keccak(text="Sync(uint112,uint112)")
factory_address_string = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f"
factory_address = Web3.to_checksum_address(factory_address_string)
factory_abi     = fetch_abi(factory_address)
factory         = w3.eth.contract(address=factory_address, abi=factory_abi)
with open(PAIR_ABI_PATH) as f:
    pair_abi = json.load(f)

# TODO: Will have to be scoped inside appropriate thread for safety
pair_states: dict[str, dict] = {}
pair_states_last_block = 0
pair_states_lock    = asyncio.Lock()
state_subscribed    = asyncio.Event()
initial_update      = asyncio.Event()


# (0) Functions to read/write from disk

def load_pair_states():
    global pair_states_last_block, pair_states
    if os.path.isfile(PAIR_STATES_PATH):
        with open(PAIR_STATES_PATH) as f:
            data = json.load(f)
            pair_states_last_block = data.get("pair_states_last_block", DEPLOY_BLOCK - 1)
            pair_states      = data.get("pair_states", {})
    else:
        pair_states_last_block = DEPLOY_BLOCK - 1
        pair_states      = {}

async def dump_pair_states():
    global pair_states_last_block, pair_states
    tmp = PAIR_STATES_PATH + ".tmp"
    payload = {
        "pair_states_last_block": pair_states_last_block,
        "pair_states":     pair_states
    }
    with open(tmp, "w") as f:
        json.dump(payload, f)
    os.replace(tmp, PAIR_STATES_PATH)


In [None]:
# Event Handlers
def _handle_pair_creation(log) -> bool:
    global pair_states
    """ 
    Called back on each new PairCreated event 
    """
    
    # Ignore log if it did not come from our factory
    # TODO: in future accept from multiple factories

    if log["address"].lower() != factory_address_string.lower():
        return False
    
    decoded_log = factory.events.PairCreated().process_log(log)
    addr = decoded_log["args"]["pair"]
    if addr not in pair_states:
        pair_states[addr] = {
            "pairAddress": addr,
            "factory": log["address"],
            "token0": decoded_log["args"]["token0"],
            "token1": decoded_log["args"]["token1"],
            "blockNumber": log["blockNumber"]
        }
        return True
        # print(f"\tAppended pair {addr} (total now {len(pair_states)})")
    return False

def _handle_synch(log) -> bool:
    global pair_states, pair_abi
    """ 
    Called back on each Synch event 
    """
    addr = log["address"]
    if addr not in pair_states:
        return False # drop Sync for unknown pools
    decoded_log = w3.eth.contract(address=addr, abi=pair_abi).events.Sync().process_log(log)
    r0, r1 = decoded_log["args"]["reserve0"], decoded_log["args"]["reserve1"]
    blk = log["blockNumber"]

    # Only maintain very latest state in this batch
    if blk >= pair_states[addr]["blockNumber"]:
        pair_states[addr].update({
            "reserves": {
                "reserve0": str(r0),
                "reserve1": str(r1),
                "lastBlockTimestamp": blk,
            },
            "blockNumber": blk
        })
        return True
        # print(f"\tSync @ {blk} for {addr}: {r0}/{r1}")
    return False

log_sort_key = lambda L: (
        L["blockNumber"],
        L["transactionIndex"],
        L["logIndex"],
    )

async def safe_get_logs(args: dict):
    from_blk    = args["fromBlock"]
    to_blk      = args["toBlock"]
    topics      = args["topics"]

    try:
        # controlling requests to the node
        await asyncio.sleep(sleep_time)
        return w3.eth.get_logs(args)
    except Web3RPCError as e:
        if "more than 10000" in str(e):
            mid = (from_blk + to_blk) // 2
            logs1 = await safe_get_logs(args={
                "fromBlock": from_blk,
                "toBlock": mid,
                "topics": topics
            })
            logs2 = await safe_get_logs(args={
                "fromBlock": mid + 1,
                "toBlock": to_blk,
                "topics": topics,
            })
            return sorted(logs1 + logs2, key=log_sort_key)
        raise
         
async def pair_states_log_range(from_blk: int, to_blk: int) -> int:
    global pair_states, pair_states_last_block, pair_abi

    logs = await safe_get_logs({
        "fromBlock": from_blk,
        "toBlock":   to_blk,
        "topics":    [[pair_created_topic, sync_topic]]
    })

    print(f"Fetched {len(logs)} logs")

    # sort by (block, tx, log index)
    logs.sort(key=log_sort_key)

    # 1. Track creation logs and latest Sync for each address
    creation_logs = []
    latest_sync = {}
    for log in logs:
        topic_sig = log["topics"][0]
        if topic_sig == pair_created_topic:
            creation_logs.append(log)
        elif topic_sig == sync_topic:
            addr = log["address"]
            key = (log["blockNumber"], log["transactionIndex"], log["logIndex"])
            prev = latest_sync.get(addr)
            if prev is None or key > prev[0]:
                latest_sync[addr] = (key, log)
    
    # 2. First handle all newly created pairs
    new_pairs = 0
    for log in creation_logs:
        new_pairs += _handle_pair_creation(log)
    
    # 3. Then handle exactly one sync per address
    for _, (_, log) in latest_sync.items():
        _handle_synch(log)

    # 4. Finally save to disk
    pair_states_last_block = to_blk
    async with pair_states_lock:
        await dump_pair_states()

    print(f"Processed blocks {from_blk}-{to_blk}: +{new_pairs} pools, total {len(pair_states)}")
    return new_pairs


async def initial_pair_states_sync(chunk_size: int = 50_000):
    """ 
    Synchronizes pair_states up to a given block
    """
    global pair_states, pair_states_last_block

    load_pair_states()
    await state_subscribed.wait()
    to_block = w3.eth.block_number

    start = pair_states_last_block + 1
    for blk in range(start, to_block + 1, chunk_size):
        end = min(blk + chunk_size - 1, to_block)
        await pair_states_log_range(blk, end)
    
    initial_update.set()


async def monitor_state():
    global pair_states_last_block

    async with AsyncWeb3(WebSocketProvider(f"wss://mainnet.infura.io/ws/v3/{api_key}")) as w3s:
        sub_id = await w3s.eth.subscribe("logs", {
            "topics": [[pair_created_topic, sync_topic]]
        })
        state_subscribed.set()

        async for msg in w3s.socket.process_subscriptions():
            await initial_update.wait()
            log = msg["result"]

            topic_sig = log["topics"][0]
            if topic_sig == pair_created_topic:
                print("Live Pair Created Event: ")
                _handle_pair_creation(log)
            elif topic_sig == sync_topic:
                print("Live Synch Event: ")
                _handle_synch(log)
            
            pair_states_last_block = log["blockNumber"]
            async with pair_states_lock:
                await dump_pair_states()



In [None]:
# Entrypoint: this thread runs indefinitely keeping pair_states up to date
async def main():

    await asyncio.gather(
        initial_pair_states_sync(chunk_size=100_000),
        monitor_state(),
    )

await main()

In [None]:
# Junk-cleaning procedure after bug discovery
"""

load_pair_states()

with open("/Volumes/Extreme SSD/arbot_data/pairaddr.json") as f:
    pairaddr = json.load(f)

pairaddr_dict = {}
for pair in pairaddr["pairs"]:
    pairaddr_dict[pair] = pair

# Only keep the items that are amongst legitimate addresses
new_pair_states = {}
for pair in pair_states.keys():
    if pair in pairaddr_dict:
        new_pair_states[pair] = pair_states[pair]

# Rewrite the global pair_states dict by clearing then updating
pair_states.clear()             
pair_states.update(new_pair_states)

await dump_pair_states()

"""

In [None]:
"""

attempts chunk size 5000
breaks at 
Fetched 7765 logs
Processed blocks 10213355-10218354: +16 pools, total 651

attempts chunk size 3000
Fetched 6860 logs
Processed blocks 10293355-10296354: +13 pools, total 1278

attempts chunk size 2000
Fetched 5589 logs
Processed blocks 10352355-10354354: +6 pools, total 1566

"""

In [None]:
# Testing
def test_rand_pair():
    test_i = random.randint(0, len(pair_addresses) - 1)
    assert( factory.functions.allPairs(test_i).call() == pair_addresses[test_i] )
    print(f"Assertion passed for pair # {test_i}")

while True:
    test_rand_pair()
    sleep(1)

In [None]:
# Up to date?
len(pair_addresses) == factory.functions.allPairsLength().call()

In [None]:
curr_num = factory.functions.allPairsLength().call() - 10
factory.functions.allPairs(curr_num).call() == pair_addresses[curr_num]

In [None]:
# Do all pair contracts use the same abi? (looks like they do, hence save unique abi under this pair_abi object)
pair_num = -1

pair_address = Web3.to_checksum_address(pair_addresses[pair_num])
pair_abi     = fetch_abi(pair_address)

In [None]:
with open(PAIR_ABI_PATH, "w") as f:
    json.dump(pair_abi, f)

In [None]:
""" The proposed shape for each pair


{
  "pair_addressesess": "0x…",            // contract address
  "factory":    "0x…",
  "token0":     "0x…",
  "token1":     "0x…",
  "reserves": {
    "reserve0":           "123456789012345678",   // as string to avoid JS precision loss
    "reserve1":           "987654321098765432",
    "blockTimestampLast": 1717065600
  },
  "priceCumulatives": {
    "price0CumulativeLast": "123456789012345",    // price1/price0 * elapsed time
    "price1CumulativeLast": "543210987654321"
  },
  "kLast":       "121932631137021795113",         // reserve0 * reserve1 at last liquidity event
  "totalSupply": "1000000000000000000"            // LP token total supply
}




Monitor single "Synch" event?


"""