In [1]:
from clickhouse_connect.datatypes.format import set_read_format
from dotenv import load_dotenv
import clickhouse_connect
import holoviews as hv
import os
import panel as pn
import polars as pl
import time
from holoviews import opts

pn.extension("plotly", template="material", sizing_mode="stretch_width")
pl.Config.set_fmt_str_lengths(200)
pl.Config.set_fmt_float("full")

# Clickhouse Format Settings - https://clickhouse.com/docs/en/integrations/python#read-formats

# Return both IPv6 and IPv4 values as strings
set_read_format("IPv*", "string")

# Return binary as string
set_read_format("FixedString", "string")

# sets large ints to floats so that there are no large int overflow errors when converting to polars dataframe
set_read_format("Int*", "float")

load_dotenv()
# Create ClickHouse client
client = clickhouse_connect.get_client(
    host=os.environ.get("HOST"),
    username=os.environ.get("USERNAME"),
    password=os.environ.get("PASSWORD"),
    secure=True,
)

### Data Query

In [2]:
def query_data(
    n_days: int = 7,
) -> dict[str]:
    """
    query_data() makes queries to the Ethpandaops clickhouse instance to get mempool and canonical beacon block sidecar data for a specific rollup.
    """

    # query mempool data
    mempool_query = f"""
    SELECT
        # MIN(event_date_time) AS earliest_event_date_time, -- not sure if I want this for now.
        event_date_time,
        type,
        blob_sidecars_size,
        blob_sidecars_empty_size,
        hash,
        to,
        from,
        blob_hashes,
        nonce,
        meta_network_name,
        length(blob_hashes) as blob_hashes_length,
        ROUND(100 - (blob_sidecars_empty_size / blob_sidecars_size) * 100, 2) AS fill_percentage,
        blob_gas,
        blob_gas_fee_cap,
        gas_price,
        gas_tip_cap,
        gas_fee_cap
    FROM mempool_transaction
    WHERE event_date_time > NOW() - INTERVAL '{n_days} DAYS'
    AND type = 3
    # GROUP BY hash, type, blob_sidecars_size, blob_sidecars_empty_size, blob_hashes, nonce, meta_network_name, blob_hashes_length, fill_percentage, to, from
    """

    # query canonical beacon block sidecar data
    canonical_beacon_blob_sidecar_query = f"""
    SELECT 
        slot,
        slot_start_date_time,
        block_root,
        kzg_commitment,
        meta_network_name,
        blob_index,
        versioned_hash,
        blob_size,
        blob_empty_size
    FROM canonical_beacon_blob_sidecar
    WHERE event_date_time > NOW() - INTERVAL '{n_days} DAYS'
    """

    # query dfs and return polars dataframes
    mempool_df = pl.from_pandas(client.query_df(mempool_query))
    canonical_beacon_blob_sidecar_df = pl.from_pandas(
        client.query_df(canonical_beacon_blob_sidecar_query)
    )

    return {
        "mempool_df": mempool_df,
        "canonical_beacon_blob_sidecar_df": canonical_beacon_blob_sidecar_df,
    }


def process_data(
    mempool_df: pl.DataFrame,
    canonical_beacon_blob_sidecar_df: pl.DataFrame,
    rollup: str = "0x5050F69a9786F081509234F1a7F4684b5E5b76C9",
    network: str = "mainnet",
) -> pl.DataFrame:
    """
    The datasets are preprocessed and joined together into a single dataframe, which is the output.
    """
    blob_mempool_table = (
        mempool_df.filter(pl.col("from") == rollup)
        .filter(pl.col("meta_network_name") == network)
        .rename({"blob_hashes": "versioned_hash"})
        .sort(by="event_date_time")
        .group_by(
            (
                pl.col("versioned_hash").cast(pl.List(pl.Categorical)),
                "hash",
                "from",
                "to",
                "nonce",
            )
        )
        .agg(
            [
                # min/max datetime
                pl.col("event_date_time").min().alias("event_date_time_min"),
                pl.col("event_date_time").max().alias("event_date_time_max"),
                # blob sidecars
                pl.col("blob_hashes_length").mean().alias("blob_hashes_length"),
                pl.col("blob_sidecars_size").mean().alias("blob_sidecars_size"),
                # blob utilization
                pl.col("fill_percentage").mean().alias("fill_percentage"),
                pl.col("blob_gas").mean(),
                pl.col("blob_gas_fee_cap").mean(),
                pl.col("gas_price").mean(),
                pl.col("gas_tip_cap").mean(),
                pl.col("gas_fee_cap").mean(),
            ]
        )
        .with_columns(
            # count number of times a versioned hash gets resubmitted under a new transaction hash
            pl.len().over("versioned_hash").alias("submission_count")
        )
        .sort(by="submission_count")
    )

    # prepare for join - explode blob hashes
    canonical_sidecar_df = canonical_beacon_blob_sidecar_df.drop("blob_index")

    return (
        (
            blob_mempool_table.explode("versioned_hash")
            .with_columns(pl.col("versioned_hash").cast(pl.String))
            .drop("event_date_time_max")
        )
        .join(canonical_sidecar_df, on="versioned_hash", how="left")
        .sort(by="slot")
        .unique()
        .with_columns(
            ((pl.col("slot_start_date_time") - pl.col("event_date_time_min")) / 1000)
            .alias("beacon_inclusion_time")
            .cast(pl.Float64),
        )
        .with_columns(
            (pl.col("beacon_inclusion_time") / 12)
            .abs()
            .ceil()
            .alias("num_block_inclusion")
        )
    )

### Load Data

In [3]:
# load data - dashboard will have network filter and "user" filter
# labeled blobs - https://dune.com/queries/3521610 , uses "from" address to classify blobs.
# base only has single blob hashes (reference hashes). However not all reference hashes appear to be unique - https://blobscan.com/blob/0x01d6ec33ab0e33abdde980fca2e8752266f73a51317292b0915e80708cd8cc40

# arbitrum - 0xC1b634853Cb333D3aD8663715b08f41A3Aec47cc
# base - 0x5050F69a9786F081509234F1a7F4684b5E5b76C9
# df = (
#     pl.scan_parquet("all_blob_analysis_3w.parquet")
#     .filter(pl.col("meta_network_name") == "mainnet")
#     .filter(pl.col("from") == "0x5050F69a9786F081509234F1a7F4684b5E5b76C9")  # base
# )  # base

# # this dataset only contains blob data where blob hash = versioned hash, not kzg hash
# canonical_sidecars = pl.scan_parquet("canonical_beacon_blob_sidecars.parquet").filter(
#     pl.col("meta_network_name") == "mainnet"
# )

In [4]:
# blob_mempool_table = (
#     df.rename({"blob_hashes": "versioned_hash"})
#     .sort(by="event_date_time")
#     .group_by(
#         (
#             pl.col("versioned_hash").cast(pl.List(pl.Categorical)),
#             "hash",
#             "from",
#             "to",
#             "nonce",
#         )
#     )
#     .agg(
#         [
#             # min/max datetime
#             pl.col("event_date_time").min().alias("event_date_time_min"),
#             pl.col("event_date_time").max().alias("event_date_time_max"),
#             # blob sidecars
#             pl.col("blob_hashes_length").mean().alias("blob_hashes_length"),
#             pl.col("blob_sidecars_size").mean().alias("blob_sidecars_size"),
#             # blob utilization
#             pl.col("fill_percentage").mean().alias("fill_percentage"),
#             pl.col("blob_gas").mean(),
#             pl.col("blob_gas_fee_cap").mean(),
#             pl.col("gas_price").mean(),
#             pl.col("gas_tip_cap").mean(),
#             pl.col("gas_fee_cap").mean(),
#         ]
#     )
#     .with_columns(
#         # count number of times a versioned hash gets resubmitted under a new transaction hash
#         pl.len()
#         .over("versioned_hash")
#         .alias("submission_count")
#     )
#     .sort(by="submission_count")
# ).collect(streaming=True)

# # prepare for join - explode blob hashes
# canonical_sidecar_df = canonical_sidecars.collect(streaming=True).drop("blob_index")

# canonical_blob_df = (
#     (
#         blob_mempool_table.explode("versioned_hash")
#         .with_columns(pl.col("versioned_hash").cast(pl.String))
#         .drop("event_date_time_max")
#     )
#     .join(canonical_sidecar_df, on="versioned_hash", how="left")
#     .sort(by="slot")
#     .unique()
#     .with_columns(
#         ((pl.col("slot_start_date_time") - pl.col("event_date_time_min")) / 1000)
#         .alias("beacon_inclusion_time")
#         .cast(pl.Float64),
#     )
#     .with_columns(
#         (pl.col("beacon_inclusion_time") / 12).abs().ceil().alias("num_block_inclusion")
#     )
# )

### KDE Bidding Charts

In [5]:
def create_max_gas_bivariate_chart(canonical_blob_df: pl.DataFrame) -> hv.NdLayout:
    # max_gas_fee_bivariate kde for bidding strategy
    max_gas_fee_bivariate = hv.Bivariate(
        canonical_blob_df.select("gas_fee_cap", "num_block_inclusion")
        .with_columns((pl.col("gas_fee_cap") / 10**9))
        .to_numpy(),
    )

    return hv.NdLayout(
        {
            bw: hv.Bivariate(max_gas_fee_bivariate).opts(
                axiswise=True,
                bandwidth=bw,
                width=500,
                height=400,
                xlabel="gas_fee_cap (gwei)",
                ylabel="num block inclusion",
                colorbar=True,
                cmap="RdBu",
                filled=True,
                cut=0,
                tools=["hover"],
            )
            for bw in [0.1, 1]
        },
        "block inclusion vs max gas cap | Smoothness",
    )


def create_max_blob_gas_bivariate_chart(canonical_blob_df: pl.DataFrame) -> hv.NdLayout:
    # max_blob_gas_fee_bivariate kde for bidding strategy
    max_blob_gas_fee_bivariate = hv.Bivariate(
        canonical_blob_df.select("blob_gas_fee_cap", "num_block_inclusion")
        .with_columns((pl.col("blob_gas_fee_cap") / 10**9))
        .to_numpy(),
    )

    return hv.NdLayout(
        {
            bw: hv.Bivariate(max_blob_gas_fee_bivariate).opts(
                axiswise=True,
                bandwidth=bw,
                width=500,
                height=400,
                xlabel="blob_gas_fee_cap (gwei)",
                ylabel="num block inclusion",
                colorbar=True,
                cmap="RdBu",
                filled=True,
                cut=0,
                tools=["hover"],
            )
            for bw in [0.1, 1]
        },
        "block inclusion vs max blob gas cap | Smoothness",
    )


def create_beacon_block_inclusion_chart(canonical_blob_df: pl.DataFrame) -> hv.Scatter:
    return (
        canonical_blob_df.sort(by="slot_start_date_time")
        .with_columns(
            pl.col("num_block_inclusion")
            .rolling_mean(50)
            .alias("rolling_num_block_inclusion_50"),
            pl.lit(2).alias("base_line_2_slots"),
        )
        .rename(
            {
                "num_block_inclusion": "slot inclusion",
                "rolling_num_block_inclusion_50": "slot inclusion 50tx average",
                "base_line_2_slots": "2 slot inclusion target",
            }
        )
        .plot.line(
            x="slot_start_date_time",
            y=[
                "slot inclusion",
                "slot inclusion 50tx average",
                "2 slot inclusion target",
            ],
            color=["blue", "red", "black"],
            ylabel="Beacon Slot Inclusion Count",
            xlabel="Date",
            title="Beacon Slot Inclusion Time",
            # TODO - color by resubmission count
            width=800,
            height=400,
        )
    )

### Submission Count (WIP)

In [7]:
# canonical_blob_df.group_by("submission_count").agg((pl.len() / 6).alias("count")).sort(
#     by="count"
# )

In [8]:
# canonical_blob_df.group_by("nonce", "versioned_hash").agg(
#     pl.col("submission_count").max(),
#     ((pl.col("gas_tip_cap").max()) - (pl.col("gas_tip_cap").min())).alias(
#         "gas_tip_diff"
#     ),
#     pl.col("num_block_inclusion").mean(),
#     pl.col("gas_tip_cap").min().alias("min_gas_tip_cap"),
#     pl.col("gas_tip_cap").max().alias("max_gas_tip_cap"),
#     pl.col("gas_fee_cap").min().alias("min_gas_fee_cap"),
#     pl.col("gas_fee_cap").max().alias("max_gas_fee_cap"),
#     pl.col("blob_gas_fee_cap").min().alias("min_blob_gas_fee_cap"),
#     pl.col("blob_gas_fee_cap").max().alias("max_blob_gas_fee_cap"),
# ).sort(by="submission_count", descending=True).head(10)

In [9]:
# # make an aggregate stats table
# (
#     # min/max bidding values per nonce
#     canonical_blob_df.group_by("nonce", "versioned_hash")
#     .agg(
#         pl.col("submission_count").max(),
#         ((pl.col("gas_tip_cap").max()) - (pl.col("gas_tip_cap").min())).alias(
#             "gas_tip_diff"
#         ),
#         pl.col("num_block_inclusion").mean(),
#         pl.col("gas_tip_cap").min().alias("min_gas_tip_cap"),
#         pl.col("gas_tip_cap").max().alias("max_gas_tip_cap"),
#         pl.col("gas_fee_cap").min().alias("min_gas_fee_cap"),
#         pl.col("gas_fee_cap").max().alias("max_gas_fee_cap"),
#         pl.col("blob_gas_fee_cap").min().alias("min_blob_gas_fee_cap"),
#         pl.col("blob_gas_fee_cap").max().alias("max_blob_gas_fee_cap"),
#     )
#     # mean bidding statistics
#     .group_by("submission_count")
#     .agg(
#         pl.len().alias("agg_count"),
#         pl.col("num_block_inclusion").mean().round(3).alias("block inclusion mean"),
#         ((pl.col("max_gas_fee_cap") - pl.col("min_gas_fee_cap")) / 10**9)
#         .mean()
#         .round(3)
#         .alias("gas_diff_mean"),
#         # ((pl.col('max_blob_gas_fee_cap') - pl.col('min_blob_gas_fee_cap')) / 10**9).mean().round(3).alias('blob_gas_diff_mean'),
#         # ((pl.col('max_gas_tip_cap') - pl.col('min_gas_tip_cap')) / 10**9).mean().round(3).alias('gas_tip_diff_mean')
#         # (pl.col("gas_fee_cap") / 10**9).mean().round(3).alias("gas mean (gwei)"),
#         # (pl.col("blob_gas_fee_cap") / 10**9)
#         # .mean()
#         # .round(3)
#         # .alias("blob mean (gwei)"),
#         # (pl.col("gas_tip_cap") / 10**9)
#         # .mean()
#         # .round(3)
#         # .alias("priority tip mean (gwei)"),
#         # # difference between and mean
#         # (pl.col("gas_tip_cap") - pl.col("gas_tip_cap"))
#         # .mean()
#         # .round(8)
#         # .alias("gas_tip_diff")
#     )
#     .sort(by="submission_count")
# )

# # this is a per blob basis

### Assemble Dashboard

In [10]:
# Define placeholders for the dashboard components
data_dict = query_data()
# dict_keys(['mempool_df', 'canonical_beacon_blob_sidecar_df'])

In [11]:
canonical_blob_df = process_data(
    data_dict["mempool_df"],
    data_dict["canonical_beacon_blob_sidecar_df"],
    rollup="0x5050F69a9786F081509234F1a7F4684b5E5b76C9",
    network="mainnet",
)

In [12]:
beacon_block_inclusion_chart_panel = create_beacon_block_inclusion_chart(
    canonical_blob_df
)
max_blob_gas_bivariate_panel = create_max_blob_gas_bivariate_chart(canonical_blob_df)
max_gas_bivariate_panel = create_max_gas_bivariate_chart(canonical_blob_df)

In [13]:
dash = pn.Column(
    pn.pane.Markdown(
        """
        # Blob Inclusion Dashboard
        This dashboard measures blob inclusion from the mempool to the canonical beacon block for Base.
        """
    ),
    pn.Column(
        pn.pane.Markdown(
            """
        This chart is a time series chart that shows how long it takes for a block tx to get included, 
        from when it's first submitted into the mempool to when it shows up in a canonical beacon block sidecar.

        While a lot of blobs get finalized on the beacon chain within 1-2 blocks, there are still a non-trivial amount of 
        outliers that can take upwards of 5-10 blocks before finalizing.
        """
        ),
        beacon_block_inclusion_chart_panel,
    ),
    pn.Column(
        pn.pane.Markdown(
            """
        
        The two below charts show different KDEs between block inclusion and the max gwei amount that the blob producer is willing to pay for both the base gas and blob gas.
        A Kernel density estimation (KDE) is a non-parametric method used to estimate the probability density function of a random variable using kernels as weights for smoothing.
        A smoothing parameter is used on a finite sample size to make inferences about the distribution.
        There are two smoothness parameters - 0.1 and 1. A lower smoothness factor tends to neglect outliers whereas a higher
        smoothing factor increases the weight of outliers when estimating the probability distribution.
        """
        ),
        max_gas_bivariate_panel,
        max_blob_gas_bivariate_panel,
    ),
)

In [14]:
# dash.show()
dash.servable()

Launching server at http://localhost:46761


<panel.io.server.Server at 0x7faf58fecc50>

Gtk-Message: 16:40:37.669: Failed to load module "canberra-gtk-module"
Gtk-Message: 16:40:37.669: Failed to load module "canberra-gtk-module"


Opening in existing browser session.


### Make Dashboard Interactive (WIP)

In [None]:
def update_dashboard(event):
    # Start the timer and try to execute the query and update the panels
    start_time = time.time()
    timer_display.value = "Query is refreshing..."

    try:
        # Perform the data processing/filtering
        data = process_data(
            data_dict["mempool_df"],
            data_dict["canonical_beacon_blob_sidecar_df"],
            rollup=rollup_input.value,
            network=network_input.value,
        )

        # Update the chart placeholders with the new charts
        beacon_block_inclusion_chart_panel.object = create_beacon_block_inclusion_chart(
            data
        )
        max_blob_gas_bivariate_panel.object = create_max_blob_gas_bivariate_chart(data)
        max_gas_bivariate_panel.object = create_max_gas_bivariate_chart(data)

        # Stop the timer and update the display
        elapsed_time = time.time() - start_time
        timer_display.value = f"Query completed in {elapsed_time:.2f} seconds."
    except Exception as e:
        timer_display.value = f"Error during query: {e}"

In [None]:
# Create input widgets
rollup_input = pn.widgets.TextInput(
    name="Rollup Address", value="0x5050F69a9786F081509234F1a7F4684b5E5b76C9"
)
network_input = pn.widgets.TextInput(name="Network", value="mainnet")

# Create a button to trigger the update and bind the callback function
update_button = pn.widgets.Button(name="Query Data", button_type="primary")
update_button.on_click(update_dashboard)

# Timer display
timer_display = pn.widgets.StaticText(name="Timer", value="Ready to query")

# Assemble the dashboard
interactive_dash = pn.Column(
    pn.pane.Markdown("# Blob Inclusion Dashboard"),
    pn.pane.Markdown(
        "This dashboard measures blob inclusion from the mempool to the canonical beacon block."
    ),
    pn.Row(rollup_input, network_input, update_button),
    pn.Row(timer_display),
    beacon_block_inclusion_chart_panel,
    max_blob_gas_bivariate_panel,
    max_gas_bivariate_panel,
)

In [None]:
# Serve the dashboard or display it in a notebook
# interactive_dash.show()

# arb - 0xC1b634853Cb333D3aD8663715b08f41A3Aec47cc
# base - 0x5050F69a9786F081509234F1a7F4684b5E5b76C9

Launching server at http://localhost:36813


<panel.io.server.Server at 0x7d72c2a468d0>

Gtk-Message: 16:22:30.520: Failed to load module "canberra-gtk-module"
Gtk-Message: 16:22:30.521: Failed to load module "canberra-gtk-module"


Opening in existing browser session.
