In [4]:
import cryo
import polars as pl
import json

In [5]:
LIQUIDATION_EVENT_SIG = "event LiquidationCall(address indexed collateralAsset, address indexed debtAsset, address indexed user, uint256 debtToCover, uint256 liquidatedCollateralAmount, address liquidator, bool receiveAToken);"
LIQUIDATION_EVENT_HASH = "0xe413a321e8681d831f4dbccbca790d2952b56f977908e45be37335533e005286"
AAVE_V3_POOL = "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2"

def convert_to_type(data, type, data_offset = 0):
    ''' Convert raw data to a specific type
    data: the raw data (bytes)
    type: the type of the data
    data_offset: the offset of the data in the raw data'''
    
    if type == 'string':
        offset = int(data[data_offset+12: data_offset + 32].hex(), 16)
        length = int(data[offset:offset + 32].hex(), 16)
        return data[offset + 32: offset + 32 + length].decode('utf-8')
    elif '[]' in type:
        # TODO
        raise ValueError("Array not implemented yet")
    elif len(type) > 4 and type[:4] == 'uint':
        return float(int(data[data_offset+22: data_offset+32].hex(), 16))
    elif type == 'address':
        return '0x' + data[12+data_offset:32+data_offset].hex()
    elif type == 'bool':
        return int(data[data_offset: 32 + data_offset], 16) > 0
    elif len(type) > 5 and type[:5] == 'bytes':
        bytes_left = 32 - int(type[5:])
        return '0x' + data[data_offset+bytes_left: data_offset+32].hex()
    else:
        raise ValueError("Type not handled yet")
    
def parse_raw_events(raw_events: pl.DataFrame, signature: str) -> pl.DataFrame:
    '''
    Parse raw events from a DataFrame, this dataframe should contain the following columns:
    - topic0 to topic3: the topics of the event
    - data: the data of the event
    The signature is used to determine the type of the event, it should look like this:
    "EventName(uint256 indexed arg1, uint256 indexed arg2, T arg3, ...)"

    The function works with basic types (string, uint, address, bool, bytes) but not yet (TODO) with arrays or structs.
    '''
    
    # Parse the signature
    first_parenthesis = signature.find('(')
    last_parenthesis = signature.rfind(')')

    print(f'Parsing Event name: {signature[:first_parenthesis]}')

    types_str = signature[first_parenthesis + 1:last_parenthesis]
    types = types_str.split(',')
    indexed_types, data_types = [], []

    for t in types:
        parts = t.strip().split(' ')
        if 'indexed' in parts:
            indexed_types.append((parts[0], parts[-1]))
        else:
            data_types.append((parts[0], parts[-1]))

    dfs = []
    for index, (type, name) in enumerate(data_types):
        data_events_cur_df = raw_events.select([
            pl.col("data").map_elements(lambda x: convert_to_type(x, type, 32*index)).alias(name)
        ] + [pl.col("block_number"), pl.col("transaction_index", "log_index")])
        dfs.append(data_events_cur_df)

    data_events_df = raw_events.select([
        pl.col("data").map_elements(lambda x: convert_to_type(x, type, 32*index)).alias(name)
        for index, (type, name) in enumerate(data_types)
    ] + [pl.col("block_number"), pl.col("transaction_index", "log_index")])
    indexed_events_df = raw_events.select([
        pl.col(f"topic{1+index}").map_elements(lambda x: convert_to_type(x, type)).alias(name)
        for index, (type, name) in enumerate(indexed_types)
    ] + [pl.col("block_number"), pl.col("transaction_index", "log_index")])

    print(f'Indexed types: {indexed_types}')
    print(f'Data types: {data_types}')
    

    res_df = indexed_events_df
    for df in dfs:
        res_df = res_df.join(df, on=["block_number", "transaction_index", "log_index"], how="inner")

    return res_df

# Liquidation events

In [6]:
# Collect the raw events with specific cryo params
params = {
    "datatype": "events",
    "blocks": ["16M:"],
    "inner_request_size": 10000,
    "contract": [AAVE_V3_POOL],
    "topic0": [LIQUIDATION_EVENT_HASH]
}

# Collect the raw events with cryo
liquidations_raw = cryo.collect(
    **params
)

liquidations_raw.head(5)

# Parse the raw events to a DataFrame with a specific event signature
liquidations = parse_raw_events(liquidations_raw, LIQUIDATION_EVENT_SIG)

liquidations.head(5)

Parsing Event name: event LiquidationCall


thread 'polars-0' panicked at py-polars/src/map/series.rs:213:19:
python function failed ValueError: invalid literal for int() with base 16: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
--- PyO3 is resuming a panic after fetching a PanicException from Python. ---
Python stack trace below:


PanicException: python function failed ValueError: invalid literal for int() with base 16: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

PanicException: python function failed ValueError: invalid literal for int() with base 16: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'