In [None]:
# ! pipenv run jupyter nbconvert --to python get_all_txs.ipynb
# !uv pip install dune_client

In [None]:
print('get all txs')
import sys
sys.path.append("../helper_functions")
import duneapi_utils as d
# NOTE: google_bq_utils imported later (before BQ write) to avoid stale OIDC token in CI
sys.path.pop()

import time
import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

import numpy as np
import pandas as pd
import os

In [None]:
col_list = [
        'dt','blockchain','name','layer','chain_id'
        , 'num_raw_txs', 'num_success_txs','num_qualified_txs','source'
        ]
query_name = 'dune_all_txs'
query_gas_name = 'dune_all_gas'
query_fees_name = 'dune_all_fees'
rerun_hrs = 0#4

In [None]:
# Run Config
run_all_txs = 1 #0
run_gas = 1 #0
run_metrics = 1 #0
run_chain_level = 1 #0

In [None]:
trailing_days = 28 #45
ending_days = 0
chunk_size = 3  # Process in 3-day chunks to avoid Dune timeout
parallel_chunks = True  # Run chunks in parallel for faster execution
max_workers = 3  # Number of parallel workers

single_chain = 'none' ## if 'none' then there will be no filter applied
print('trailing days: ' + str(trailing_days))
print('chunk size: ' + str(chunk_size))
print('single chain: ' + str(single_chain))

In [None]:
chain_config = [
    ### #blockchain, display_name, count_func, count_block_func_param, transactions_table_param, tx_fee_func_param, tx_fee_currency_param, chain_id_param
    ['bitcoin','Bitcoin','COUNT_IF(fee > 0)','COUNT(DISTINCT block_height)','transactions','fee','BTC','NULL'],
    ['near','Near','COUNT(distinct case when gas_price > 0 then tx_hash else null end)','COUNT(DISTINCT block_height)','actions','cast(NULL as double)','NULL','397'],
    ['aptos','Aptos','COUNT_IF(gas_used>0)','COUNT(DISTINCT block_height)','user_transactions','(gas_used*gas_unit_price/1e8)','APT','NULL'],
    # ['stellar','Stellar','COUNT_IF(fee_charged>0)','COUNT(DISTINCT block_height)','fee_charged','history_transactions'], --not date partitioned
    ['kaia','Kaia','COUNT_IF(gas_price>0)','COUNT(DISTINCT block_number)','transactions','cast(NULL as double)','NULL','8217'],
    ['ton','TON','COUNT_IF(compute_gas_fees>0)','COUNT(DISTINCT block_seqno)','transactions','cast(NULL as double)','NULL','NULL'],
    ['berachain','Berachain','COUNT_IF(gas_price>0)','COUNT(DISTINCT block_number)','transactions','cast(gas_price/1e9*gas_used/1e9 as double)','BERA','80094'],
    ['hyperevm','Hyperliquid L1','COUNT_IF(gas_price>0)','COUNT(DISTINCT block_number)','transactions','cast(gas_price/1e9*gas_used/1e9 as double)','HYPE','999'],
    ['sonic','Sonic','COUNT_IF(gas_price>0)','COUNT(DISTINCT block_number)','transactions','cast(gas_price/1e9*gas_used/1e9 as double)','S','146'],
]

In [None]:
# These params are still used by fee_metrics and chain_level queries
days_param = d.generate_query_parameter(input=trailing_days,field_name='trailing_days',dtype='number')
end_days_param = d.generate_query_parameter(input=ending_days,field_name='ending_days',dtype='number')
single_chain_param = d.generate_query_parameter(input=single_chain,field_name='single_chain',dtype='text')

In [None]:
def fetch_fees_chunk(days_start, days_end, single_chain):
    """Fetch a single chunk of fee data from Dune.
    
    For the date range query:
    - trailing_days = how far back from NOW() to start (e.g., 25 = start 25 days ago)
    - ending_days = how far back from NOW() to end (e.g., 22 = end 22 days ago)
    So chunk (22, 25) means: trailing_days=25, ending_days=22 → data from 25 to 22 days ago
    """
    days_param = d.generate_query_parameter(input=days_end, field_name='trailing_days', dtype='number')
    end_days_param = d.generate_query_parameter(input=days_start, field_name='ending_days', dtype='number')
    single_chain_param = d.generate_query_parameter(input=single_chain, field_name='single_chain', dtype='text')
    
    return d.get_dune_data(query_id = 4229341,
        name = query_name,
        path = "outputs",
        performance="large",
        params = [days_param, end_days_param, single_chain_param],
        num_hours_to_rerun=rerun_hrs
    )

if run_all_txs == 1:
    print('     dune runs - fees (chunked)')
    fees_dfs = []
    
    # Generate chunk ranges
    chunk_ranges = []
    days_remaining = trailing_days - ending_days
    current_end = trailing_days
    while days_remaining > 0:
        days_start = max(current_end - chunk_size, ending_days)
        chunk_ranges.append((days_start, current_end))
        current_end = days_start
        days_remaining = current_end - ending_days
    
    try:
        if parallel_chunks and len(chunk_ranges) > 1:
            print(f'     Running {len(chunk_ranges)} chunks in parallel with {max_workers} workers')
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = {
                    executor.submit(fetch_fees_chunk, start, end, single_chain): (start, end)
                    for start, end in chunk_ranges
                }
                for future in as_completed(futures):
                    start, end = futures[future]
                    try:
                        chunk_df = future.result()
                        if not chunk_df.empty:
                            fees_dfs.append(chunk_df)
                        print(f'     Completed chunk days {start}-{end}')
                    except Exception as e:
                        print(f'     Failed chunk days {start}-{end}: {e}')
        else:
            for days_start, days_end in chunk_ranges:
                print(f'     dune: day range {days_start}-{days_end}')
                chunk_df = fetch_fees_chunk(days_start, days_end, single_chain)
                if not chunk_df.empty:
                    fees_dfs.append(chunk_df)
        
        if fees_dfs:
            fees_df = pd.concat(fees_dfs, ignore_index=True)
            # Remove duplicates from overlapping chunk boundaries (BETWEEN is inclusive)
            dedupe_cols = ['blockchain', 'dt', 'tx_fee_currency']
            before_count = len(fees_df)
            fees_df = fees_df.drop_duplicates(subset=dedupe_cols, keep='first')
            after_count = len(fees_df)
            if before_count != after_count:
                print(f'     Removed {before_count - after_count} duplicate rows from chunk overlaps')
        else:
            fees_df = pd.DataFrame()
    except Exception as e:
        print(f"Dune API failed: {e}")
        fees_df = pd.DataFrame()
else:
    fees_df = pd.DataFrame()

In [None]:
def fetch_gas_chunk(days_start, days_end, single_chain):
    """Fetch a single chunk of gas data from Dune.
    
    For the date range query:
    - trailing_days = how far back from NOW() to start (e.g., 25 = start 25 days ago)
    - ending_days = how far back from NOW() to end (e.g., 22 = end 22 days ago)
    So chunk (22, 25) means: trailing_days=25, ending_days=22 → data from 25 to 22 days ago
    """
    days_param = d.generate_query_parameter(input=days_end, field_name='trailing_days', dtype='number')
    end_days_param = d.generate_query_parameter(input=days_start, field_name='ending_days', dtype='number')
    single_chain_param = d.generate_query_parameter(input=single_chain, field_name='single_chain', dtype='text')
    
    return d.get_dune_data(query_id = 4758295,  # https://dune.com/queries/4758295
        name = query_name,
        path = "outputs",
        performance="large",
        params = [days_param, end_days_param, single_chain_param],
        num_hours_to_rerun=rerun_hrs
    )

if run_gas == 1:
    print('run dune - gas level (chunked)')
    gas_dfs = []
    
    # Generate chunk ranges (reuse same logic as fees)
    chunk_ranges = []
    days_remaining = trailing_days - ending_days
    current_end = trailing_days
    while days_remaining > 0:
        days_start = max(current_end - chunk_size, ending_days)
        chunk_ranges.append((days_start, current_end))
        current_end = days_start
        days_remaining = current_end - ending_days
    
    try:
        if parallel_chunks and len(chunk_ranges) > 1:
            print(f'     Running {len(chunk_ranges)} gas chunks in parallel with {max_workers} workers')
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = {
                    executor.submit(fetch_gas_chunk, start, end, single_chain): (start, end)
                    for start, end in chunk_ranges
                }
                for future in as_completed(futures):
                    start, end = futures[future]
                    try:
                        chunk_df = future.result()
                        if not chunk_df.empty:
                            gas_dfs.append(chunk_df)
                        print(f'     Completed gas chunk days {start}-{end}')
                    except Exception as e:
                        print(f'     Failed gas chunk days {start}-{end}: {e}')
        else:
            for days_start, days_end in chunk_ranges:
                print(f'     dune gas: day range {days_start}-{days_end}')
                chunk_df = fetch_gas_chunk(days_start, days_end, single_chain)
                if not chunk_df.empty:
                    gas_dfs.append(chunk_df)
        
        if gas_dfs:
            gas_df = pd.concat(gas_dfs, ignore_index=True)
            # Remove duplicates from overlapping chunk boundaries (BETWEEN is inclusive)
            dedupe_cols = ['chain_id', 'dt']
            before_count = len(gas_df)
            gas_df = gas_df.drop_duplicates(subset=dedupe_cols, keep='first')
            after_count = len(gas_df)
            if before_count != after_count:
                print(f'     Removed {before_count - after_count} duplicate rows from chunk overlaps')
        else:
            gas_df = pd.DataFrame()
    except Exception as e:
        print(f"Dune gas API failed: {e}")
        gas_df = pd.DataFrame()
else:
    gas_df = pd.DataFrame()

In [None]:
if run_all_txs == 1:
    fees_df.sample(5)
    unique_blockchains = fees_df['blockchain'].unique().tolist()
else:
    unique_blockchains = []

In [None]:
if run_metrics == 1:
    print('run dune - fees metrics')
    fee_metrics_df = d.get_dune_data(query_id = 4902405, #https://dune.com/queries/4902405
        name = "dune fees metrics",
        path = "outputs",
        performance="large",
        params = [days_param, end_days_param],
        num_hours_to_rerun=rerun_hrs
    )
    if single_chain != 'none':
        fee_metrics_df = fee_metrics_df[fee_metrics_df['blockchain'] == single_chain]
else:
    fee_metrics_df = pd.DataFrame()

In [None]:
chain_df_agg = pd.DataFrame()

In [None]:
if run_chain_level == 1:
    print('run dune - chain level')
    chain_df_agg = pd.DataFrame()
    chain_dfs = []
    for row in chain_config:
        blockchain = row[0]
        print(blockchain)
        if blockchain in unique_blockchains:
            continue
        else:
            #blockchain, display_name, count_func, gas_field, transactions_table
            blockchain_param = d.generate_query_parameter(input=blockchain,field_name='blockchain',dtype='text')
            display_name_param = d.generate_query_parameter(input=row[1],field_name='display_name',dtype='text')
            count_func_param = d.generate_query_parameter(input=row[2],field_name='count_func',dtype='text')
            count_block_func_param = d.generate_query_parameter(input=row[3],field_name='count_block_func',dtype='text')
            # gas_field_param = d.generate_query_parameter(input=row[4],field_name='gas_field',dtype='text')
            transactions_table_param = d.generate_query_parameter(input=row[4],field_name='transactions_table',dtype='text')
            tx_fee_func_param = d.generate_query_parameter(input=row[5],field_name='tx_fee_func_internal',dtype='text')
            tx_fee_currency_param = d.generate_query_parameter(input=row[6],field_name='tx_fee_currency',dtype='text')
            chain_id_param = d.generate_query_parameter(input=row[7],field_name='chain_id_param',dtype='text')

            chain_df = d.get_dune_data(query_id = 4230061, #https://dune.com/queries/4230061
                name = query_name + '_by_chain',
                path = "outputs",
                performance="large",
                params = [
                        days_param,end_days_param,blockchain_param,display_name_param,count_func_param,count_block_func_param
                        ,transactions_table_param,tx_fee_func_param,tx_fee_currency_param,chain_id_param
                        ],
                num_hours_to_rerun=rerun_hrs
            )
            # print(chain_df.sample(3))
            chain_dfs.append(chain_df)

    chain_df_agg = pd.concat(chain_dfs)
else:
    chain_df_agg = pd.DataFrame()

In [None]:
print(fees_df.columns)
# print(chain_df_agg.columns)

In [None]:
if not chain_df_agg.empty:
    dune_df = pd.concat([fees_df,chain_df_agg])
else: 
    dune_df = fees_df.copy()

if run_metrics == 1:
# First, find chains that are only in metric_fees_df
    unique_fee_metrics_df = fee_metrics_df[~fee_metrics_df['blockchain'].isin(dune_df['blockchain'])]
    dune_df = pd.concat([dune_df, unique_fee_metrics_df])

In [None]:
dune_df

In [None]:
if not dune_df.empty:
    dune_df['source'] = 'dune'
    dune_df['dt'] = dune_df['dt'].str.replace(' UTC', '', regex=False)
    dune_df['dt'] = pd.to_datetime(dune_df['dt'], format='mixed').dt.tz_localize(None)
    dune_df = dune_df[dune_df['dt'].dt.date < datetime.datetime.now(datetime.timezone.utc).date()]
    dune_df['chain_id'] = dune_df['chain_id'].astype(str)
    dune_df['chain_id'] = dune_df['chain_id'].astype(str).str.replace(r'\.0$', '', regex=True)
    dune_df['num_blocks'] = dune_df['num_blocks'].fillna(0).astype('Int64')
    dune_df['num_txs'] = dune_df['num_txs'].fillna(0).astype('Int64')

In [None]:
if not gas_df.empty:
    gas_df['source'] = 'dune'
    gas_df['dt'] = pd.to_datetime(gas_df['dt']).dt.tz_localize(None)
    gas_df = gas_df[gas_df['dt'].dt.date < datetime.datetime.now(datetime.timezone.utc).date()]
    gas_df['chain_id'] = gas_df['chain_id'].astype(str)
    gas_df['chain_id'] = gas_df['chain_id'].astype(str).str.replace(r'\.0$', '', regex=True)

In [None]:
# Verify that all elements are strings
assert dune_df['chain_id'].apply(type).eq(str).all(), "Not all elements are strings"
# print(dune_df['chain_id'].dtype)

In [None]:
if not fee_metrics_df.empty:
    fee_metrics_df['dt'] = pd.to_datetime(fee_metrics_df['dt']).dt.tz_localize(None)
    fee_metrics_df['chain_id'] = fee_metrics_df['chain_id'].astype(str)

In [None]:
if not gas_df.empty:
    assert gas_df['chain_id'].apply(type).eq(str).all(), "Not all elements are strings"
if not fee_metrics_df.empty:
    assert fee_metrics_df['chain_id'].apply(type).eq(str).all(), "Not all elements are strings"

In [None]:
# dune_df.sample(5)
print(dune_df.dtypes)


In [None]:
print(f"dune_df shape: {dune_df.shape}")

In [None]:
unique_cols = ['blockchain', 'dt','tx_fee_currency']

In [None]:
# Identify duplicate combinations
duplicates = dune_df.duplicated(subset=unique_cols, keep=False)

# View the duplicate rows
duplicate_rows = dune_df[duplicates]

# Display the duplicate rows
print(duplicate_rows)

# Get a count of duplicates for each combination
duplicate_counts = dune_df.groupby(unique_cols).size().reset_index(name='count')
duplicate_counts = duplicate_counts[duplicate_counts['count'] > 1]

print("\nDuplicate combination counts:")
print(duplicate_counts)

In [None]:
dune_df.dtypes

In [None]:
dune_df.dtypes

In [None]:
dune_df['num_blocks'] = pd.to_numeric(dune_df['num_blocks'], errors='coerce').fillna(0).round().astype(int)
dune_df['num_txs'] = pd.to_numeric(dune_df['num_txs'], errors='coerce').fillna(0).round().astype(int)

In [None]:
#Check
if not dune_df.empty:
    print(dune_df.sample(5))

In [None]:
# Import BQ utils here (right before use) to get fresh OIDC token in CI
sys.path.append("../helper_functions")
import google_bq_utils as bqu
sys.path.pop()

# Prepare credentials before BQ operations (refreshes OIDC token if needed)
bqu.prepare_bq_credentials()

#BQ Upload
if not dune_df.empty:
    bqu.append_and_upsert_df_to_bq_table(dune_df, query_name, unique_keys = unique_cols)

In [None]:
if not gas_df.empty:
    bqu.append_and_upsert_df_to_bq_table(gas_df, query_gas_name, unique_keys = ['chain_id','dt'])

In [None]:
# fee_metrics_df[fee_metrics_df['blockchain']=='tron'].sort_values(by='dt',ascending=False)

In [None]:
if not fee_metrics_df.empty:
    bqu.append_and_upsert_df_to_bq_table(fee_metrics_df, query_fees_name, unique_keys = ['blockchain','dt'])