In [1]:
from pathlib import Path
import sqlite3
from contextlib import closing
from time import time
import pandas as pd
import csv

sql_file = "all_txs.sql"

In [2]:
# Takes the provided CSV file and places it into a table called table_name in the sqlite
# database specified by sql_file. Creates the sqlite database file if it does not exist,
# and drops and creates a new table if one already exists with the same name.
def csv_to_sql(csv_file, sql_file, table_name):
    # Creates the sqlite file, if it does not already exist
    Path(sql_file).touch(exist_ok=True)
    
    # Read the CSV into a pandas dataframe. Since minerRevenue can be too large to fit
    # as an integer, convert it to a string.
    df = pd.read_csv(csv_file, converters={'minerRevenue': str})
    column_names = tuple(df.columns)
    
    # Some values are too large to store as integers, so save them as text, if they exist
    dtypes = {k: 'INTEGER' for k in column_names}
    if 'hash' in column_names:
        dtypes['hash'] = 'TEXT'
    if 'miner' in column_names:
        dtypes['miner'] = 'TEXT'
    if 'minerRevenue' in column_names:
        dtypes['minerRevenue'] = 'TEXT'
    
    # Create a connection and cursor for the sqlite file
    with closing(sqlite3.connect(sql_file)) as conn: # auto-closes the connection
        with conn, closing(conn.cursor()) as cur:    # auto-commits connection and auto-closes cursor
            # Drop the current table, if it exists
            cur.execute("DROP TABLE IF EXISTS " + table_name)

            # Create the table, with appropriate attributes
            cur.execute("CREATE TABLE " + table_name + " " + str(column_names))

            # Put the pandas dataframe into the newly created table
            df.to_sql(table_name, conn, if_exists='append', index=False, dtype=dtypes)

In [3]:
# Executes the provided query using the sqlite database in the file specified by sql_file.
# Automatically commits changes and closes the connection and cursor when done.
def query(query, sql_file=sql_file):
    # Create a connection and cursor for the sqlite file
    with closing(sqlite3.connect(sql_file)) as conn: # auto-closes the connection
        with conn, closing(conn.cursor()) as cur:    # auto-commits connection and auto-closes cursor
            cur.execute(query)
            return cur.fetchall()

In [4]:
# Finds the actual gas used for each transaction, if it exists, and places it into the table for later reference.
def create_gas_used_table(gas_table_name, pending_txs_name, txs_name, confirmed_pending_name):
    query(f"DROP TABLE IF EXISTS {gas_table_name}")
    query(  f'''CREATE TABLE {gas_table_name} AS
                WITH stage AS (
                    SELECT hash, gasLimit
                    FROM {pending_txs_name}
                    GROUP BY hash),
                confirmed_stage AS (
                    SELECT hash, gasUsed
                    FROM {confirmed_pending_name}
                    GROUP BY hash)
                SELECT stage.hash, MIN(IFNULL({txs_name}.gasUsed, stage.gasLimit),
                                       IFNULL(confirmed_stage.gasUsed, stage.gasLimit)) AS minGasUsed
                FROM stage
                LEFT JOIN {txs_name} ON stage.hash = {txs_name}.hash
                LEFT JOIN confirmed_stage ON stage.hash = confirmed_stage.hash''') 

In [15]:
# Updates the pending transactions table with information on gas used and prices paid
# For each block, keeps only one copy of each transaction if there are multiple with the same hash.
# Also cleans up the pending transactions, by examining transactions from the same sender with the same nonce
# and keeping only one of them. The transaction to be kept is chosen based on the following criteria, where
# we move on to the next criteria only if all preceding ones are identical:
# 1. maxPriorityFeePerGas: max priority fee being offered (highest first, nulls put last)
# 2. maxFeePerGas: max total fee being offered (highest first, nulls put last)
# 3. gasPrice: amount of gas offered (highest first, nulls put last)
# 4. hash: Finally, if all else is equal somehow, arbitrarily break ties using the highest-value of hash
def update_pending_txs(pending_txs_name, blocks_name, gas_table_name, updated_name=None):
    # Make a new table
    if updated_name is None:
        temp_table = pending_txs_name + '_temp'
    else:
        temp_table = updated_name
    query(f"DROP TABLE IF EXISTS {temp_table}")
    query(f'''CREATE TABLE {temp_table} AS
              WITH stage AS (
                  SELECT *,
                      IFNULL({pending_txs_name}.gasPrice - baseFeePerGas,
                             MIN(maxPriorityFeePerGas, maxFeePerGas - baseFeePerGas)) AS netGasPrice
                  FROM {pending_txs_name}
                  LEFT JOIN {blocks_name}
                      ON {pending_txs_name}.lastBlock = {blocks_name}.blockNumber
                  LEFT JOIN {gas_table_name} 
                      ON {pending_txs_name}.hash = {gas_table_name}.hash
                  WHERE NOT {pending_txs_name}.category = "queued"),
              stage_row_counts AS (
                  SELECT *, ROW_NUMBER() OVER (PARTITION BY hash, lastBlock) AS row_num_hash,
                      ROW_NUMBER() OVER (PARTITION BY sender, nonce, lastBlock ORDER BY netGasPrice DESC) AS row_num_price
                  FROM stage
                  WHERE stage.netGasPrice > 0)
              SELECT stage_row_counts.hash, stage_row_counts.lastBlock, sender, nonce, minGasUsed AS gasUsed, gasLimit, netGasPrice as gasPrice
              FROM stage_row_counts
              WHERE row_num_hash = 1
              AND row_num_price = 1''')
    
    # Replace the old one, if we didn't want to make a new separate table
    if updated_name is None:
        query(f"DROP TABLE {pending_txs_name}")
        query(f"ALTER TABLE {temp_table} RENAME TO {pending_txs_name}")

In [6]:
# Names of tables
pending_txs_name = "pending_txs"
pending_raw_name = "pending_txs_raw"
gas_table_name = "pending_txs_gas"
txs_name = "txs"
confirmed_pending_name = "confirmed_pending_txs"
blocks_name = "blocks"

In [7]:
# Load all CSVs and put them into sqlite tables in the same database, stored in the file sql_file
csv_to_sql("confirmed_txs.csv", sql_file, txs_name)
csv_to_sql("pending_txs_erigon.csv", sql_file, pending_raw_name)
csv_to_sql("confirmed_mempool_txs_erigon.csv", sql_file, confirmed_pending_name)
csv_to_sql("blocks.csv", sql_file, blocks_name)

In [11]:
# Get all values of gas used
create_gas_used_table(gas_table_name, pending_raw_name, txs_name, confirmed_pending_name)

In [16]:
# Update the pending transaction table with actual gas prices used (if available) and gas prices paid,
# and then remove unnecessary columns.
update_pending_txs(pending_raw_name, blocks_name, gas_table_name, pending_txs_name)

In [30]:
res = query('SELECT 1.0*SUM(gasUsed < gasLimit)/COUNT(hash) FROM pending_txs GROUP BY lastBlock')
res

[(0.8876755070202809,),
 (0.6047197640117994,),
 (0.5625,),
 (0.654054054054054,),
 (0.6305084745762712,),
 (0.5673469387755102,),
 (0.6437768240343348,),
 (0.7304216867469879,),
 (0.48490945674044267,),
 (0.7171052631578947,),
 (0.5952380952380952,),
 (0.6388888888888888,),
 (0.4117647058823529,),
 (0.3867924528301887,),
 (0.5851851851851851,),
 (0.5978260869565217,),
 (0.49382716049382713,),
 (0.574585635359116,),
 (0.5324675324675324,),
 (0.4444444444444444,),
 (0.6121076233183856,),
 (0.4751958224543081,),
 (0.6081081081081081,),
 (0.475,),
 (0.5068493150684932,),
 (0.5333333333333333,),
 (0.4891304347826087,),
 (0.5742574257425742,),
 (0.4679144385026738,),
 (0.5263157894736842,),
 (0.6509635974304069,),
 (0.5662650602409639,),
 (0.5388888888888889,),
 (0.4431279620853081,),
 (0.6568457538994801,),
 (0.5800865800865801,),
 (0.5072463768115942,),
 (0.464,),
 (0.6681818181818182,),
 (0.43661971830985913,),
 (0.47560975609756095,),
 (0.4857142857142857,),
 (0.6063829787234043,),
 (0.

In [None]:
res = query('SELECT lastBlock, COUNT(hash) FROM pending_txs GROUP BY lastBlock')
res

In [None]:
res = query( '''SELECT lastBlock, COUNT(hash)
                FROM pending_txs_raw
                LEFT JOIN blocks
                    ON lastBlock=blockNumber
                WHERE gasPrice IS NULL
                GROUP BY lastBlock''')
res

In [None]:
res = query( '''SELECT lastBlock, COUNT(hash), baseFeePerGas
                FROM pending_txs_raw
                LEFT JOIN blocks
                    ON lastBlock=blockNumber
                WHERE gasPrice IS NULL
                AND baseFeePerGas >= maxFeePerGas
                GROUP BY lastBlock''')
res