# Process Swap Events on the blockchain

In [16]:
import json
import bisect

from web3 import Web3
import pandas as pd
from hexbytes import HexBytes

from datetime import datetime

import matplotlib.pyplot as plt
import bisect
from typing import cast
# Import python dotenv
from dotenv import load_dotenv
import numpy as np
import os


from tqdm import tqdm, trange

# Import sqlalchemy
from sqlalchemy import create_engine

from sqlalchemy import String, Column, DateTime, Boolean, Integer
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, select, func

from datetime import datetime


from datetime import datetime
from pydantic import BaseModel


load_dotenv(override=True)

True

In [17]:
w3 = Web3(Web3.HTTPProvider('http://localhost:8545'))

In [18]:
postgres_uri = os.getenv("POSTGRESQL_URI_MP")

assert postgres_uri is not None, "POSTGRESQL_URI is not set in .env file"

engine = create_engine(postgres_uri)

In [19]:
# Load in the ABIs
with open('../abi/UniswapV3Pool.json', 'r') as f:
    uniswap_v3_pool_abi = json.load(f)

with open('../abi/UniswapV2Pair.json', 'r') as f:
    uniswap_v2_pair_abi = json.load(f)


## Define Swap Schemas

### Database Models

In [20]:
# Swaps V2 table
"""
Field               Name        Type
address            	STRING	    NULLABLE
block_timestamp    	TIMESTAMP	NULLABLE
block_number       	INTEGER	    NULLABLE
transaction_hash   	STRING	    NULLABLE
log_index          	INTEGER	    NULLABLE
sender             	STRING	    NULLABLE
amount0In          	STRING	    NULLABLE
amount1In          	STRING	    NULLABLE
amount0Out         	STRING	    NULLABLE
amount1Out         	STRING	    NULLABLE
to                 	STRING	    NULLABLE
""";

In [21]:
# Swap V3 table:
"""
block_timestamp     TIMESTAMP	NULLABLE                        
block_number        INTEGER	    NULLABLE                        
transaction_hash    STRING	    NULLABLE                            
log_index           INTEGER	    NULLABLE                    
sender              STRING	    NULLABLE                
recipient           STRING	    NULLABLE                    
amount0             STRING	    NULLABLE                
amount1             STRING	    NULLABLE                
sqrtPriceX96        STRING	    NULLABLE                        
liquidity           STRING	    NULLABLE                    
tick                STRING	    NULLABLE                
address             STRING	    NULLABLE                
from_address        STRING	    NULLABLE                        
to_address          STRING	    NULLABLE                    
transaction_index   INTEGER	    NULLABLE                               
""";

In [22]:
Base = declarative_base()


class SwapV2(Base):
    __tablename__ = "swaps_v2"

    # id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    transaction_hash = Column(String, index=True, nullable=False)
    address = Column(String, nullable=False, index=True)
    block_timestamp = Column(DateTime, nullable=False)
    block_number = Column(Integer, nullable=False, index=True, primary_key=True)
    transaction_index = Column(Integer, nullable=False, primary_key=True)
    log_index = Column(Integer, nullable=False, primary_key=True)
    sender = Column(String, nullable=False, index=True)
    amount0_in = Column(String, nullable=False)
    amount1_in = Column(String, nullable=False)
    amount0_out = Column(String, nullable=False)
    amount1_out = Column(String, nullable=False)
    to = Column(String, nullable=False)
    from_mempool = Column(Boolean, nullable=False)


class SwapV3(Base):
    __tablename__ = "swaps"

    # id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    transaction_hash = Column(String, index=True)
    block_timestamp = Column(DateTime, nullable=False)
    block_number = Column(Integer, nullable=False, index=True, primary_key=True)
    transaction_index = Column(Integer, nullable=False, primary_key=True)
    log_index = Column(Integer, nullable=False, primary_key=True)
    sender = Column(String, nullable=False, index=True)
    recipient = Column(String, nullable=False, index=True)
    amount0 = Column(String, nullable=False)
    amount1 = Column(String, nullable=False)
    sqrtPriceX96 = Column(String, nullable=False)
    liquidity = Column(String, nullable=False)
    tick = Column(String, nullable=False)
    address = Column(String, nullable=False, index=True)
    from_address = Column(String, nullable=False, index=True)
    to_address = Column(String, nullable=False, index=True)
    from_mempool = Column(Boolean)


class MempoolTransaction(Base):
    __tablename__ = "mempool_transactions"

    hash = Column(String, primary_key=True, index=True)
    # Time the transaction was first seen, timestamp with timezone (UTC), precision 3, not null
    first_seen = Column(DateTime, nullable=False, index=True)

class TransactionSource(Base):
    __tablename__ = "transaction_sources"

    hash = Column(String, primary_key=True, index=True)
    mempool = Column(Boolean, nullable=False, index=True)

In [23]:
# Create a Session class bound to this engine
Session = sessionmaker(bind=engine)

# Now create the table
Base.metadata.create_all(engine)

### Data Validation Classes

In [24]:
even_v2_data = {
    "args": {
        "sender": "0xD2A52F45C74b358ABE1428bC43F0ce9dDf130780",
        "to": "0xB517850510997a34b4DdC8c3797B4F83fAd510c4",
        "amount0In": 2865737771099214,
        "amount1In": 0,
        "amount0Out": 0,
        "amount1Out": 20711683288458725,
    },
    "event": "Swap",
    "logIndex": 5,
    "transactionIndex": 0,
    "transactionHash": HexBytes(
        "0x65a44fa5806b9e3b6198c96fff88604612a597be45d3dabc86c3fe10359fe3ef"
    ),
    "address": "0x477514E2100DC0E94667f390e3117dc813AC2934",
    "blockHash": HexBytes(
        "0x34a8f45c8ccf010b35106e0e5c40954b3a03159d7de422be3e99ba3fa2a9e865"
    ),
    "blockNumber": 17495444,
};

In [25]:
even_v3_data = {
    "args": {
        "sender": "0x92F3f71CeF740ED5784874B8C70Ff87ECdF33588",
        "recipient": "0x92F3f74efb71CeF740ED5784874B8C70Ff87ECdF33588",
        "amount0": 14146668836465334409,
        "amount1": -26737897363,
        "sqrtPriceX96": 3445279794781067888930523,
        "liquidity": 607716593014265737873,
        "tick": -200872,
    },
    "event": "Swap",
    "logIndex": 22,
    "transactionIndex": 4,
    "transactionHash": HexBytes(
        "0x8e153cf3a5e292a3ad24e7f4bb4a4ae71ef1ec057f52735d13a0166ea03cb78f5"
    ),
    "address": "0x11b815efB8f581194ae79006d24E0d814B7697F6",
    "blockHash": HexBytes(
        "0x292f6bd15578754b33f5294b51cfad0b3bc6d9936a3365281926af00a11f38ee"
    ),
    "blockNumber": 17549265,
};

In [26]:
class SwapArgsV2(BaseModel):
    sender: str
    to: str
    amount0In: int
    amount1In: int
    amount0Out: int
    amount1Out: int


class SwapDataV2(BaseModel):
    args: SwapArgsV2
    logIndex: int
    transactionIndex: int
    event: str
    address: str
    blockNumber: int

In [27]:
class SwapArgsV3(BaseModel):
    sender: str
    recipient: str
    amount0: int
    amount1: int
    sqrtPriceX96: int
    liquidity: int
    tick: int

class SwapDataV3(BaseModel):
    args: SwapArgsV3
    blockNumber: int
    event: str
    logIndex: int
    transactionIndex: int
    address: str
    blockNumber: int

## Ingest data from the blockchain into the Swap V2 and V3 tables

- This should be changed to work directly on the Uniswap tables and just updates the heads of those
- Next, I'll make a separate table to keep track transaction_hash --> mempool_{true,false}



### Make functions to extract swap event data

In [28]:
def get_v2_swaps_events(tx_hash):
    # Get transaction receipt
    tx_receipt = w3.eth.get_transaction_receipt(tx_hash)

    liquidity_events = []

    for log in tx_receipt['logs']:

        contract = w3.eth.contract(abi=uniswap_v2_pair_abi, address=log['address'])
    
        # Parse the logs for Swap, Mint, and Burn events
        try:
            event_data = contract.events.Swap().process_log(log)
        except:
            continue

        liquidity_events.append(event_data)

    return liquidity_events

In [29]:
def get_v3_swaps_events(tx_hash):
    # Get transaction receipt
    tx_receipt = w3.eth.get_transaction_receipt(tx_hash)

    liquidity_events = []

    for log in tx_receipt['logs']:

        contract = w3.eth.contract(abi=uniswap_v3_pool_abi, address=log['address'])
    
        # Parse the logs for Swap, Mint, and Burn events
        try:
            event_data = contract.events.Swap().process_log(log)
        except:
            continue

        liquidity_events.append(event_data)

    return liquidity_events

### Figure out what blocks to extract

In [30]:
# Get current block number
current_block_number = w3.eth.block_number

print(f'Current block number: {current_block_number:,}')

ConnectionError: HTTPConnectionPool(host='localhost', port=8545): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x16c5b3050>: Failed to establish a new connection: [Errno 61] Connection refused'))

In [16]:
# Find the smallest `first_seen` in the `mempool_transactions` table
stmt = select(func.min(MempoolTransaction.first_seen))

with engine.connect() as connection:
    result = connection.execute(stmt)
    min_timestamp: datetime = cast(datetime, result.scalar())

# Get the block number that preceeds the min timestamp
block_number = bisect.bisect_right(list(range(0, current_block_number)), min_timestamp.timestamp(), key=lambda x: w3.eth.get_block(x).get('timestamp', 0))

print(f'Min timestamp: {min_timestamp}, block number: {block_number:,}')

Min timestamp: 2023-06-24 22:00:37.327000+00:00, block number: 17,552,205


In [17]:
# Generate a set of all block numbers between the block number of the transaction
# and the current block number
set_of_block_numbers = set(range(block_number, current_block_number + 1))

print(f'Total number of blocks to query: {len(set_of_block_numbers):,}')

Total number of blocks to query: 37,270


In [18]:
from typing import Set, Union


def get_blocks_to_process(swap_table: Union[SwapV2, SwapV3], block_numbers: Set[int]) -> Set[int]:
    # Find all blocks already in the database and remove them from the set
    stmt = select(swap_table.block_number).order_by(swap_table.block_number)

    stmt = stmt.where(swap_table.block_number > block_number)

    with engine.connect() as connection:
        result = connection.execute(stmt)

    df = pd.DataFrame(result.fetchall(), columns=result.keys()) # type: ignore

    block_numbers = block_numbers - cast(set[int], set(df['block_number'].astype(int)))

    return block_numbers

block_numbers = sorted(list(get_blocks_to_process(SwapV2, set_of_block_numbers) | get_blocks_to_process(SwapV3, set_of_block_numbers)))

print(block_numbers[:10])

print(f'Number of blocks to process: {len(block_numbers):,}')


[17552205, 17552206, 17552227, 17552230, 17552264, 17552265, 17552300, 17552333, 17552365, 17552384]
Number of blocks to process: 26,135


### Run the extraction for V2 and V3

In [None]:
assert False, "Has this been updated to the new schema?"

session = Session()
swaps_to_insert = []

it = tqdm(block_numbers)
for block_number in it:
    block = w3.eth.get_block(block_number)
    
    if 'transactions' not in block:
        with open(f'./errors_v2.txt', 'a') as f:
            f.writelines(f'Block {block_number} has no transactions\n')
        continue

    block_timestamp = datetime.fromtimestamp(block['timestamp'] if 'timestamp' in block else 0)
    it.set_description(f'Processing block {block_number:_} / {current_block_number:_} ({block_timestamp:%Y-%m-%d %H:%M})')

    for transaction in block['transactions']:

        # Get transaction hash
        tx_hash = w3.to_hex(transaction).lower() # type: ignore

        # Check if this transaction is in the mempool_transactions in PostgreSQL database
        from_mempool = session.get(MempoolTransaction, tx_hash) is not None

        # Get swaps V2 and V3 from transaction
        swapsv2 = get_v2_swaps_events(tx_hash)
        swapsv3 = get_v3_swaps_events(tx_hash)

        for swap in swapsv2:
            swap = SwapDataV2(**swap)

            swap_to_insert = SwapV2(
                transaction_hash=tx_hash,
                address=swap.address,
                block_timestamp=block_timestamp,
                block_number=block_number,
                transaction_index=swap.transactionIndex,
                log_index=swap.logIndex,
                sender=swap.args.sender,
                amount0_in=swap.args.amount0In,
                amount1_in=swap.args.amount1In,
                amount0_out=swap.args.amount0Out,
                amount1_out=swap.args.amount1Out,
                to=swap.args.to,
                from_mempool=from_mempool,
            )

            swaps_to_insert.append(swap_to_insert)
            it.set_postfix({"swaps_to_insert": len(swaps_to_insert)})


        for swap in swapsv3:
            swap = SwapDataV3(**swap)

            swap_to_insert = SwapV3(
                transaction_hash=tx_hash,
                block_timestamp=block_timestamp,
                block_number=block_number,
                log_index=swap.logIndex,
                sender=swap.args.sender,
                recipient=swap.args.recipient,
                amount0=str(swap.args.amount0),
                amount1=str(swap.args.amount1),
                sqrtPriceX96=str(swap.args.sqrtPriceX96),
                liquidity=str(swap.args.liquidity),
                tick=str(swap.args.tick),
                address=swap.address,
                from_address=swap.args.sender,
                to_address=swap.args.recipient,
                transaction_index=swap.transactionIndex,
                from_mempool=from_mempool,
            )

            swaps_to_insert.append(swap_to_insert)
            it.set_postfix({"swaps_to_insert": len(swaps_to_insert)})
            


    # Checkpoint if we get more than 100 swaps
    if len(swaps_to_insert) > 100:
        # Insert the swaps into the database
        for swap in swaps_to_insert:
            session.merge(swap)
        session.commit()

        swaps_to_insert = []

session.close()


## Insert mempool_{true,false} into the database