# Xatu Public Contributors Analysis

This notebook fetches and analyzes public contributor data from Clickhouse and generates JSON files for the frontend.

In [28]:
import os
from datetime import datetime, timedelta
import pandas as pd
from pathlib import Path
from lib import Lab

# Initialize lab
lab = Lab('xatu-public-contributors', '../config.yaml')
lab.setup()
lab.setup_pandaops_clickhouse()
log = lab.log

# Get notebook specific config
notebook_config = lab.get_notebook_config()

writer = lab.get_data_writer()

pandaops_clickhouse_client = lab.get_pandaops_clickhouse_client()

log.info("Good to go!")

2025-01-09 15:45:52,390 - xatu-public-contributors - INFO - Good to go!


In [29]:
xatu_public_contributors_config = lab.get_notebook_config().as_xatu_public_contributors()
xatu_public_contributors_config


XatuPublicContributors(time_windows=[TimeWindow(file='last_30_days', step='1d', label='Last 30d', range='-30d'), TimeWindow(file='last_1_day', step='1h', label='Last 1d', range='-1d'), TimeWindow(file='last_90_days', step='3d', label='Last 90d', range='-90d')], data_dir='../data/xatu-public-contributors', networks=['mainnet', 'sepolia', 'holesky'])

In [40]:
from sqlalchemy import text
from datetime import datetime, timezone

query = text("""
    SELECT
        meta_network_name,
        meta_client_geo_country as country,
        meta_client_geo_continent_code as continent,
        meta_client_geo_city as city,
        meta_client_name,
        meta_consensus_implementation,
        count(*) as count
    FROM beacon_api_eth_v1_events_block FINAL
    WHERE
        slot_start_date_time BETWEEN toDateTime(:start_date) AND toDateTime(:end_date)
        AND meta_network_name IN (:networks)
        AND meta_client_name != ''
        AND meta_client_name IS NOT NULL
    GROUP BY meta_network_name, country, continent, city, meta_client_name, meta_consensus_implementation
""")

# Get last 1h window
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(hours=1)

# Format dates without microseconds for Clickhouse
start_str = start_date.strftime('%Y-%m-%d %H:%M:%S')
end_str = end_date.strftime('%Y-%m-%d %H:%M:%S')

log.info("Fetching data for last 1h")

result = pandaops_clickhouse_client.execute(
    query,
    {
        "start_date": start_str,
        "end_date": end_str,
        "networks": xatu_public_contributors_config.networks
    }
)
rows = result.fetchall()

if len(rows) == 0:
    log.warning("No data found for last 24h")
else:
    # Build summary data per network
    summary = {
        "updated_at": int(datetime.now(timezone.utc).timestamp()),
        "networks": {}
    }
    
    for network in xatu_public_contributors_config.networks:
        summary["networks"][network] = {
            "total_nodes": 0,
            "total_public_nodes": 0,
            "countries": {},
            "continents": {},
            "cities": {},
            "consensus_implementations": {}
        }

    for row in rows:
        network, country, continent, city, client_name, consensus_impl, count = row
        is_public = not client_name.startswith('ethpandaops')
        
        # Add to network totals
        summary["networks"][network]["total_nodes"] += 1
        if is_public:
            summary["networks"][network]["total_public_nodes"] += 1

        # Add to network countries
        if country not in summary["networks"][network]["countries"]:
            summary["networks"][network]["countries"][country] = {"total_nodes": 0, "public_nodes": 0}
        summary["networks"][network]["countries"][country]["total_nodes"] += 1
        if is_public:
            summary["networks"][network]["countries"][country]["public_nodes"] += 1

        # Add to network continents
        if continent not in summary["networks"][network]["continents"]:
            summary["networks"][network]["continents"][continent] = {"total_nodes": 0, "public_nodes": 0}
        summary["networks"][network]["continents"][continent]["total_nodes"] += 1
        if is_public:
            summary["networks"][network]["continents"][continent]["public_nodes"] += 1

        # Add to network cities
        if city not in summary["networks"][network]["cities"]:
            summary["networks"][network]["cities"][city] = {"total_nodes": 0, "public_nodes": 0}
        summary["networks"][network]["cities"][city]["total_nodes"] += 1
        if is_public:
            summary["networks"][network]["cities"][city]["public_nodes"] += 1

        # Add to network consensus implementations
        if consensus_impl not in summary["networks"][network]["consensus_implementations"]:
            summary["networks"][network]["consensus_implementations"][consensus_impl] = {
                "total_nodes": 0, 
                "public_nodes": 0
            }
        summary["networks"][network]["consensus_implementations"][consensus_impl]["total_nodes"] += 1
        if is_public:
            summary["networks"][network]["consensus_implementations"][consensus_impl]["public_nodes"] += 1

    # Write summary to file
    lab.write_json("summary.json", summary)


2025-01-09 18:05:04,455 - xatu-public-contributors - INFO - Fetching data for last 1h


In [36]:
from sqlalchemy import text
from datetime import datetime, timezone

query = text("""
    WITH time_slots AS (
        SELECT 
            toStartOfInterval(slot_start_date_time, INTERVAL :step_seconds second) as time_slot,
            meta_client_geo_country as country,
            meta_network_name,
            count(distinct meta_client_name) AS total
        FROM beacon_api_eth_v1_events_block FINAL
        WHERE
            slot_start_date_time BETWEEN toDateTime(:start_date) AND toDateTime(:end_date)
            AND meta_client_name NOT LIKE 'ethpandaops%'
            AND meta_network_name IN (:networks)
            AND meta_client_name != ''
            AND meta_client_name IS NOT NULL
        GROUP BY time_slot, country, meta_network_name
    )
    SELECT
        toDate(time_slot) as time,
        country,
        meta_network_name,
        total
    FROM time_slots
""")

countries_by_window = {}

for window in xatu_public_contributors_config.time_windows:
    start_date, end_date = window.get_time_range(datetime.now(timezone.utc))
    step_seconds = window.get_step_seconds()
    
    # Format dates without microseconds for Clickhouse
    start_str = start_date.strftime('%Y-%m-%d %H:%M:%S')
    end_str = end_date.strftime('%Y-%m-%d %H:%M:%S')

    total_timesteps = (end_date - start_date).total_seconds() / step_seconds

    log.info(f"Fetching data for {window.file}, total timesteps: {total_timesteps}")
    
    result = pandaops_clickhouse_client.execute(
        query,
        {
            "start_date": start_str, 
            "end_date": end_str,
            "networks": xatu_public_contributors_config.networks,
            "step_seconds": step_seconds
        }
    )
    countries = result.fetchall()

    if len(countries) == 0:
        log.warning(f"No countries found for time window {window.file}")
        continue

    countries_by_window[window.file] = countries
    log.info(f"Found {len(countries)} countries for time window {window.file}")
    
    # Group by network and write separate files
    for network in xatu_public_contributors_config.networks:
        network_countries = [c for c in countries if c[2] == network]
        if not network_countries:
            continue
        
        # Group by timestamp
        time_grouped = []
        for c in network_countries:
            timestamp = int(datetime.combine(c[0], datetime.min.time()).timestamp())
            time_grouped.append({
                "time": timestamp,
                "countries": [{
                    "name": c[1],
                    "value": c[3]
                }]
            })
            
        # Merge entries with same timestamp
        merged = {}
        for entry in time_grouped:
            if entry["time"] not in merged:
                merged[entry["time"]] = entry
            else:
                merged[entry["time"]]["countries"].extend(entry["countries"])
                
        # Convert to list and write to file
        final_data = list(merged.values())
        
        # Write to single file per time window and network
        lab.write_json(f"countries/{network}/{window.file}.json", final_data)


2025-01-09 16:15:32,052 - xatu-public-contributors - INFO - Fetching data for last_30_days, total timesteps: 30.0


DatabaseException: Orig exception: Code: 47. DB::Exception: Missing columns: 'meta_client_geo_continent' while processing query: 'SELECT toStartOfInterval(slot_start_date_time, toIntervalSecond(86400)) AS time_slot, meta_client_geo_country AS country, meta_client_geo_continent AS continent, meta_client_geo_city AS city, meta_network_name, uniqExact(meta_client_name) AS total FROM beacon_api_eth_v1_events_block FINAL WHERE ((slot_start_date_time >= toDateTime('2024-12-10 06:15:32')) AND (slot_start_date_time <= toDateTime('2025-01-09 06:15:32'))) AND (meta_client_name NOT LIKE 'ethpandaops%') AND (meta_network_name IN ['mainnet', 'sepolia', 'holesky']) AND (meta_client_name != '') AND (meta_client_name IS NOT NULL) GROUP BY time_slot, country, continent, city, meta_network_name', required columns: 'meta_client_geo_country' 'meta_network_name' 'meta_client_name' 'meta_client_geo_continent' 'slot_start_date_time' 'meta_client_geo_city', maybe you meant: 'meta_client_geo_country', 'meta_network_name', 'meta_client_name', 'meta_client_geo_continent_code', 'slot_start_date_time' or 'meta_client_geo_city'. (UNKNOWN_IDENTIFIER) (version 24.2.3.70 (official build))


In [32]:
from sqlalchemy import text
from datetime import datetime, timezone

query = text("""
    WITH time_slots AS (
        SELECT 
            toStartOfInterval(slot_start_date_time, INTERVAL :step_seconds second) as time_slot,
            extractAll(meta_client_name, '/([^/]+)/[^/]+$')[1] as username,
            meta_network_name,
            count(distinct meta_client_name) AS node_count
        FROM beacon_api_eth_v1_events_block FINAL
        WHERE
            slot_start_date_time BETWEEN toDateTime(:start_date) AND toDateTime(:end_date)
            AND meta_client_name NOT LIKE 'ethpandaops%'
            AND meta_network_name IN (:networks)
            AND meta_client_name != ''
            AND meta_client_name IS NOT NULL
        GROUP BY time_slot, username, meta_network_name
    )
    SELECT
        toDate(time_slot) as time,
        username,
        meta_network_name,
        node_count
    FROM time_slots
""")

users_by_window = {}

for window in xatu_public_contributors_config.time_windows:
    start_date, end_date = window.get_time_range(datetime.now(timezone.utc))
    step_seconds = window.get_step_seconds()
    
    start_str = start_date.strftime('%Y-%m-%d %H:%M:%S')
    end_str = end_date.strftime('%Y-%m-%d %H:%M:%S')

    total_timesteps = (end_date - start_date).total_seconds() / step_seconds

    log.info(f"Fetching data for {window.file}, total timesteps: {total_timesteps}")
    
    result = pandaops_clickhouse_client.execute(
        query,
        {
            "start_date": start_str,
            "end_date": end_str,
            "networks": xatu_public_contributors_config.networks,
            "step_seconds": step_seconds
        }
    )
    users = result.fetchall()

    if len(users) == 0:
        log.warning(f"No users found for time window {window.file}")
        continue

    users_by_window[window.file] = users
    log.info(f"Found {len(users)} user entries for time window {window.file}")
    
    # Group by network and write separate files
    for network in xatu_public_contributors_config.networks:
        network_users = [u for u in users if u[2] == network]
        if not network_users:
            continue
        
        # Group by timestamp
        time_grouped = []
        for u in network_users:
            timestamp = int(datetime.combine(u[0], datetime.min.time()).timestamp())
            time_grouped.append({
                "time": timestamp,
                "users": [{
                    "name": u[1],
                    "nodes": u[3]
                }]
            })
            
        # Merge entries with same timestamp
        merged = {}
        for entry in time_grouped:
            if entry["time"] not in merged:
                merged[entry["time"]] = entry
            else:
                merged[entry["time"]]["users"].extend(entry["users"])
                
        # Convert to list and write to file
        final_data = list(merged.values())
        
        # Write to single file per time window and network
        lab.write_json(f"users/{network}/{window.file}.json", final_data)


2025-01-09 15:45:54,529 - xatu-public-contributors - INFO - Fetching data for last_30_days, total timesteps: 30.0
2025-01-09 15:45:55,145 - xatu-public-contributors - INFO - Found 573 user entries for time window last_30_days
2025-01-09 15:45:55,150 - xatu-public-contributors - INFO - Fetching data for last_1_day, total timesteps: 24.0
2025-01-09 15:45:55,614 - xatu-public-contributors - INFO - Found 475 user entries for time window last_1_day
2025-01-09 15:45:55,619 - xatu-public-contributors - INFO - Fetching data for last_90_days, total timesteps: 30.0
2025-01-09 15:45:57,982 - xatu-public-contributors - INFO - Found 446 user entries for time window last_90_days
