# cryoQuery

In [1]:
# |default_exp cryo_query

In [7]:
# | export
import jupyter_black
import os
import polars as pl
import re
import cryo

from dataclasses import dataclass

pl.Config.set_fmt_str_lengths(200)
pl.Config.set_fmt_float("full")
jupyter_black.load()

In [2]:
cd ../

/home/evan/Documents/ethereum_block_explorer


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


In [24]:
# | export
@dataclass
class cryoQuery:
    """
    `cryoQuery` is used to query data from cryo
    """

    raw_data_path: str = "data/raw"
    rpc: str = "https://eth.merkle.io"

    def _create_data_filepaths(self):
        """
        Creates folders for storing raw data from cryo.
        """
        if not os.path.exists(self.raw_data_path):
            os.makedirs(self.raw_data_path)
            print("Data folder created.")
        else:
            print("Data folder already exists.")

    def query_blocks_txs(
        self,
        n_error_threshold: int = 1,
        retry_threshold: int = 5,
        block_range: list[str] = ["latest"],
    ):
        """
        Fetches block and transaction data. Attempts retries up to 'retry_threshold'. If errors exceed 'n_error_threshold'.

        :param n_error_threshold: The number of allowed errors before retrying the query.
        :param retry_threshold: The maximum number of retries for the query.
        :param block_range: The range of blocks to query. Defaults to ["latest"].
        """
        self._create_data_filepaths()
        n_errored = n_error_threshold + 1
        retry_count = 0

        # make cryo query
        while retry_count < retry_threshold:
            print(f"Retry count: {retry_count}")
            retry_count += 1
            if n_error_threshold < n_errored:
                output: dict[str] = cryo.freeze(
                    "blocks_and_transactions",
                    blocks=block_range,
                    hex=True,
                    rpc=self.rpc,
                    no_verbose=False,  # this doesn't seem to have any effect
                    output_dir="data/raw",
                    subdirs=["datatype"],
                    include_columns=["n_rlp_bytes"],
                    exclude_columns=["input", "value"],
                    # compression=["zstd"],
                    compression=["lz4"],  # bug, can't use zstd in cryo 0.3.0
                )

                n_errored = output["n_errored"]
                print(f"Number of errors: {n_errored}")
            if n_errored == 0:
                print(f"{n_errored} == 0. Done!")
                break


# | export
@dataclass
class cryoTransform:
    """
    cryoTransform extends the underlying transactions and blocks dataset with extra columns.
    """

    def extend_txs_blocks(
        self,
        txs: pl.LazyFrame,
        blocks: pl.LazyFrame,
        mempool: pl.LazyFrame | None = None,
    ) -> pl.LazyFrame:
        """
        Combines transaction, block, and optionally flashbots mempool data into a single LazyFrame.

        Uses pattern matching to handle the optional mempool data.
        If mempool data is provided, it is joined with the transactions and blocks data.
        If not, only transactions and blocks are joined.

        Preprocessing:
        - add block percentile and ticks
        - convert gas to gwei
        - convert bytes to kilobytes


        Parameters:
        - txs (pl.LazyFrame): LazyFrame containing transaction data.
        - blocks (pl.LazyFrame): LazyFrame containing block data.
        - mempool (pl.LazyFrame, optional): LazyFrame containing mempool data. Default is None.

        Returns:
        - pl.LazyFrame: A unified LazyFrame with enriched transaction data.
        """

        # Use pattern matching to handle the presence or absence of mempool data
        match mempool:
            case _ if isinstance(mempool, pl.LazyFrame):
                # Join transactions with blocks and mempool data if mempool is provided
                combined_df = txs.join(
                    blocks, on="block_number", how="left", suffix="_block"
                ).join(
                    mempool,
                    right_on="hash",
                    left_on="transaction_hash",
                    how="left",
                    suffix="_mempool",
                )

            case None:
                # Join only transactions with blocks if mempool is not provided
                combined_df = txs.join(
                    blocks, on="block_number", how="left", suffix="_block"
                )

        agg_df: pl.LazyFrame = combined_df.group_by("block_number").agg(
            [
                pl.col("transaction_index").max().alias("transaction_index_max"),
                pl.col("n_rlp_bytes").sum().alias("block_encoded_bytes"),
                pl.col("n_input_bytes").sum().alias("block_calldata_bytes"),
            ]
        )

        return (
            combined_df.join(agg_df, on="block_number", how="left")
            .with_columns(
                [
                    # Calculate the transaction gas cost
                    (pl.col("gas_used") * pl.col("gas_price") / 10**18).alias(
                        "tx_gas_cost"
                    ),
                    # Convert epoch timestamp to datetime
                    pl.from_epoch("timestamp").alias("block_datetime"),
                    # Calculate the gas price premium over the base fee per gas
                    (pl.col("gas_price") / pl.col("base_fee_per_gas")).alias(
                        "block_gas_premium"
                    ),
                ]
            )
            .with_columns(
                # Calculate the transaction index percentile within its block
                (
                    pl.col("transaction_index") / pl.col("transaction_index_max") * 100
                ).alias("blockspace_percentile")
            )
            .with_columns(
                # Round the block space percentile for easier interpretation
                (pl.col("blockspace_percentile").round()).alias(
                    "rounded_blockspace_percentile"
                )
            )
            # unit conversions
            .with_columns(
                # convert gas to gwei
                (pl.col("gas_price") / 10**9).alias("gas_price_gwei"),
                (pl.col("max_priority_fee_per_gas") / 10**9).alias(
                    "max_priority_fee_per_gas_gwei"
                ),
                (pl.col("max_fee_per_gas") / 10**9).alias("max_fee_per_gas_gwei"),
                (pl.col("base_fee_per_gas") / 10**9).alias("base_fee_per_gas_gwei"),
                # convert bytes to kilobytes
                (pl.col("block_encoded_bytes") / 10**3).alias("block_encoded_kbytes"),
                (pl.col("block_calldata_bytes") / 10**3).alias(
                    "block_calldata_kbytes"
                ),
            )
            .drop(
                "gas_price",
                "max_priority_fee_per_gas",
                "max_fee_per_gas",
                "base_fee_per_gas",
                "block_encoded_bytes",
                "block_calldata_bytes",
            )
            .fill_nan(0)  # Fill NaN values with 0
            .unique()  # Ensure all rows are unique
        )

    # 1. read the files in the raw data from block number partitions, sync them together, perform a transformation, and then save into a new folder.
    def read_filenames(self, directory) -> list:
        try:
            return sorted(
                [
                    f
                    for f in os.listdir(directory)
                    if os.path.isfile(os.path.join(directory, f))
                ]
            )
        except FileNotFoundError:
            return []

    def extract_block_batch_index(self, filename) -> list[tuple]:
        match = re.search(r"to_(\d+)", filename)
        return int(match.group(1)) if match else None

    def sync_filenames(self, directory_a: str, directory_b: str):
        # Read filenames from both directories
        transactions_filenames = self.read_filenames(directory_a)
        blocks_filenames = self.read_filenames(directory_b)

        # Extract numbers and create mappings
        transactions_mapping = {
            self.extract_block_batch_index(name): name
            for name in transactions_filenames
        }
        blocks_mapping = {
            self.extract_block_batch_index(name): name for name in blocks_filenames
        }

        # Find common keys
        common_keys = set(transactions_mapping.keys()).intersection(
            blocks_mapping.keys()
        )

        # Creating synced files dictionary
        synced_files = {
            directory_a: [transactions_mapping[key] for key in common_keys],
            directory_b: [blocks_mapping[key] for key in common_keys],
        }

        return synced_files

In [21]:
directory_a: str = "data/raw/transactions"
directory_b: str = "data/raw/blocks"

synced_files = sync_filenames(directory_a=directory_a, directory_b=directory_b)

In [23]:
synced_files

{'data/raw/transactions': ['ethereum__transactions__18132000_to_18132999.parquet',
  'ethereum__transactions__18089000_to_18089999.parquet',
  'ethereum__transactions__18345000_to_18345999.parquet',
  'ethereum__transactions__18046000_to_18046999.parquet',
  'ethereum__transactions__18302000_to_18302999.parquet',
  'ethereum__transactions__18259000_to_18259999.parquet',
  'ethereum__transactions__18216000_to_18216999.parquet',
  'ethereum__transactions__18173000_to_18173999.parquet',
  'ethereum__transactions__18429000_to_18429999.parquet',
  'ethereum__transactions__18130000_to_18130999.parquet',
  'ethereum__transactions__18087000_to_18087999.parquet',
  'ethereum__transactions__18044000_to_18044999.parquet',
  'ethereum__transactions__18300000_to_18300999.parquet',
  'ethereum__transactions__18257000_to_18257999.parquet',
  'ethereum__transactions__18214000_to_18214999.parquet',
  'ethereum__transactions__18171000_to_18171999.parquet',
  'ethereum__transactions__18427000_to_18427999

In [5]:
cryo_query = cryoQuery()

In [6]:
cryo_query.query_blocks_txs(
    block_range=["18030000:18950000"]  # September - Mid Januaryish Data
)  # Internal note - 18753440 is roughly when the ordinal spam started

Data folder already exists.
Retry count: 0
[1;37mcryo parameters[0m
[38;2;0;225;0m───────────────[0m
[38;2;0;225;0m- [0m[1;37mversion[0m[38;2;0;225;0m: [0m[38;2;170;170;170m0.3.2[0m
[38;2;0;225;0m- [0m[1;37mdata[0m[38;2;0;225;0m: [0m[38;2;170;170;170m[0m
    [38;2;0;225;0m- [0m[1;37mdatatypes[0m[38;2;0;225;0m: [0m[38;2;170;170;170mblocks, transactions[0m
    [38;2;0;225;0m- [0m[1;37mblocks[0m[38;2;0;225;0m: [0m[38;2;170;170;170mn=920,000 min=18,030,000 max=18,949,999 align=no reorg_buffer=0[0m
[38;2;0;225;0m- [0m[1;37msource[0m[38;2;0;225;0m: [0m[38;2;170;170;170m[0m
    [38;2;0;225;0m- [0m[1;37mnetwork[0m[38;2;0;225;0m: [0m[38;2;170;170;170methereum[0m
    [38;2;0;225;0m- [0m[1;37mrpc url[0m[38;2;0;225;0m: [0m[38;2;170;170;170mhttps://eth.merkle.io[0m
    [38;2;0;225;0m- [0m[1;37mmax requests per second[0m[38;2;0;225;0m: [0m[38;2;170;170;170munlimited[0m
    [38;2;0;225;0m- [0m[1;37mmax concurrent requests[0m[38;

In [None]:
test_df = pl.scan_parquet("data/raw/transactions/*.parquet")

In [None]:
test_df.schema

# Schema Notes from cryo:
# - n_input_bytes: length of call data
# - n_input_zero_bytes: number of zero bytes in call data
# - n_input_nonzero_bytes: number of nonzero bytes in call data
# - n_rlp_bytes: full encoded length of tx

OrderedDict([('block_number', UInt32),
             ('transaction_index', UInt64),
             ('transaction_hash', String),
             ('nonce', UInt64),
             ('from_address', String),
             ('to_address', String),
             ('gas_limit', UInt64),
             ('gas_used', UInt64),
             ('gas_price', UInt64),
             ('transaction_type', UInt32),
             ('max_priority_fee_per_gas', UInt64),
             ('max_fee_per_gas', UInt64),
             ('success', Boolean),
             ('n_input_bytes', UInt32),
             ('n_input_zero_bytes', UInt32),
             ('n_input_nonzero_bytes', UInt32),
             ('n_rlp_bytes', UInt32),
             ('chain_id', UInt64)])

In [None]:
test_df.fetch(streaming=True)

block_number,transaction_index,transaction_hash,nonce,from_address,to_address,gas_limit,gas_used,gas_price,transaction_type,max_priority_fee_per_gas,max_fee_per_gas,success,n_input_bytes,n_input_zero_bytes,n_input_nonzero_bytes,n_rlp_bytes,chain_id
u32,u64,str,u64,str,str,u64,u64,u64,u32,u64,u64,bool,u32,u32,u32,u32,u64
19013374,0,"""0xb82b99735efe2d32ee4119053db2358e0f0c7872dd25da542a7a1842faec9cd1""",1921177,"""0xae2fc483527b8ef99eb5d9b44875f005ba1fae13""","""0x6b75d8af000000e20b7a7ddf000ba900b4009a80""",219370,153559,24053431365,2,4,24053431365,true,61,1,60,1168,1
19013374,1,"""0x14350e25dfc83210fd4ab0f372ca601f3f49e2360f6fa94094a66423ea002f32""",85,"""0x67c7af25b27e2c347ae730d4bc7a2e8a410728d4""","""0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad""",161738,126984,24063432925,2,10001560,30203228244,true,516,444,72,629,1
19013374,2,"""0x7a2b9f443b2d3e6832ac53cdd3b3eacded8dd9b26faee18c2c951a21a9e1809b""",1921178,"""0xae2fc483527b8ef99eb5d9b44875f005ba1fae13""","""0x6b75d8af000000e20b7a7ddf000ba900b4009a80""",189975,132983,1479009435541,2,1479009435537,1479009435541,true,61,1,60,1043,1
19013374,3,"""0xc52ccdffe318d581f220aac1d2e6c83ebd07167fbf2067a9dc4211c321c904e9""",8278,"""0x97b2bb85d797add22dabda43eda8570091ed4b03""","""0x525145f821d8d2abb494c454e9445e14c817f7cb""",873562,318208,24053431365,2,0,36080147047,true,1218,748,470,1332,1
19013374,4,"""0xbd40de93e22c6cfa5d4b927245f3acc1ebf9f99043865833bc3e9af14696a369""",587,"""0x8c732f7d121713209453bf4b5082d61878a7b869""","""0x249b53c173362ffe175a587427615aff21059651""",433052,426787,35239433316,0,,,true,516,382,134,624,1
19013374,5,"""0x0e9dde7b62e6e3afd8f4133e756cc3a2a7003862b80b7c15f4b0e23b38fa13a7""",0,"""0x839f02e5bcad8fac3b043287e0c6e0ca2034dcdd""","""0x41292153e7f5e78c3b7382d59e742b92461cbc70""",21000,21000,90000000000,2,90000000000,90000000000,true,0,0,0,119,1
19013374,6,"""0x3e0c4f5f3c5fedda64bdabf2d07416b7397afcc55d0e73fafda911fd1bed1e40""",685,"""0xe32e3bd2ec560513cab4757564ca50d13caf9f7f""","""0x72ce9c846789fdb6fc1f34ac4ad25dd9ef7031ef""",275703,173462,29000000000,2,29000000000,29000000000,true,324,257,67,448,1
19013374,7,"""0x1de7c7806d8a50c02be20ea8428c2d4b48042f636a0ad9930dd0c3478a58675e""",826,"""0xfe03932ded338dabb5626455be60543a4f2c3797""","""0x14778860e937f509e651192a90589de711fb88a9""",71055,56039,38013608616,0,,,true,68,35,33,174,1
19013374,8,"""0x0162267780a0ff79595e22d17e047a4be6017e1ce71b33fdd7cccd95490ed2bb""",10,"""0x65fb99239760f4412f4dac7f0fc378cbb1649b6c""","""0xa6b71e26c5e0845f74c812102ca7114b6a896ab2""",455329,446439,25553431365,2,1500000000,28000000000,true,772,531,241,886,1
19013374,9,"""0x1339a1c6567029458e9d0d78a40963d8094df0338d7893b48f57a48ec9a8126c""",238,"""0x907ae230e3e6c15831ab52db1abb684bd2b5c49f""","""0x16d104009964e694761c0bf09d7be49b7e3c26fd""",426073,307390,26053431365,2,2000000000,36578880969,true,1220,951,269,1335,1
