In [None]:
import os
import pandas as pd
from dotenv import load_dotenv
import requests
from datetime import datetime, timedelta
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from concurrent.futures import ThreadPoolExecutor, as_completed

In [None]:
load_dotenv()

try:
    ETHERSCAN_API_KEY = os.getenv("ETHERSCAN_API_KEY")
    MODEL = "gpt-4o-mini"
    AZURE_API_KEY = os.getenv("AZURE_API_KEY")
    AZURE_ENDPOINT = os.getenv("AZURE_ENDPOINT")
    AZURE_API_VERSION = os.getenv("AZURE_API_VERSION")
    PROMPTLAYER_API_KEY = os.getenv("PROMPTLAYER_API_KEY")
except KeyError as e:
    raise KeyError(f"Environment variable {e} is not set. Please set it before running the script.")

In [None]:
def get_day_timestamps(now):
    """Get the start and end timestamps."""
    
    start_of_day = now - timedelta(days = 1)
    start_timestamp = int(start_of_day.timestamp())
    end_timestamp = int(now.timestamp())

    return start_timestamp, end_timestamp

def get_prompt(prompt_template_identifier = "80865"):
  
    prompt_template_identifier = prompt_template_identifier

    url = f"https://api.promptlayer.com/prompt-templates/{prompt_template_identifier}"
    headers = {
        "X-API-KEY": PROMPTLAYER_API_KEY,
        "Content-Type": "application/json"
    }

    response = requests.post(url, headers = headers)
    response.raise_for_status()
    data = response.json()
    
    messages = data.get("prompt_template", {}).get("messages", {})
    for m in messages:
        if m.get("role", {}) == "system":
            system_prompt = m.get("content", [])[0].get("text", "")
        if m.get("role", {}) == "user":
            user_prompt = m.get("content", [])[0].get("text", "")

    if not system_prompt or not user_prompt:
        raise ValueError("System or User prompt not found in the PromptLayer response.")

    return system_prompt, user_prompt

def get_block_number_by_timestamp(target_timestamp, api_key):

    url = (f"https://api.etherscan.io/api?module=block&action=getblocknobytime"
           f"&timestamp={target_timestamp}&closest=before&apikey={api_key}")
    
    try:
        res = requests.get(url)
        res.raise_for_status()
        data = res.json()
        
        if data["status"] == "1":
            return int(data["result"])
        else:
            raise ValueError(f"Error fetching block number: {data['message']}")
    except requests.RequestException as e:
        raise Exception(f"API Request failed: {e}")

def get_block_transactions(block_number, api_key):

    url = (f"https://api.etherscan.io/api?module=proxy&action=eth_getBlockByNumber"
           f"&tag={hex(block_number)}&boolean=true&apikey={api_key}")

    try:
        res = requests.get(url)
        res.raise_for_status()
        data = res.json()
        if 'result' in data and data['result'] and 'transactions' in data['result']:
            return data['result']['transactions']
        else:
            return []
    except requests.RequestException as e:
        print(f"Warning: Could not fetch block {block_number}. Error: {e}")
        return []

def process_and_aggregate_transactions(transactions, target_date):

    if not transactions:
        return pd.DataFrame()

    df = pd.DataFrame(transactions)

    numeric_cols = ['gas', 'gasPrice', 'value']
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col].apply(lambda x: int(x, 16) if isinstance(x, str) else x), errors = 'coerce')
    
    df['value'] = df['value'] / 1e18

    daily_metrics = {
        'metric_date': target_date,
        'total_transactions': len(df),
        'average_gas_used': df['gas'].mean(),
        'total_eth_transferred': df['value'].sum(),
        'unique_active_wallets': pd.concat([df['from'], df['to']]).nunique()
    }

    return df, pd.DataFrame([daily_metrics])

def generate_daily_summary(metrics, system_prompt, user_prompt):

    print("Generating LLM summary...")

    llm = AzureChatOpenAI(
        azure_deployment = MODEL,
        api_version = AZURE_API_VERSION,
        azure_endpoint = AZURE_ENDPOINT,
        api_key = AZURE_API_KEY,
        temperature = 0.2
    )

    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", """
        Please provide a summary for the Ethereum network activity on {metric_date}.
        
        Key Metrics:
        - Total Transactions: {total_transactions:,.0f}
        - Average Gas Used per Transaction: {average_gas_used:,.0f}
        - Total ETH Transferred: {total_eth_transferred:,.2f} ETH
        - Unique Active Wallets: {unique_active_wallets:,.0f}
        
        """ + user_prompt)
    ])

    chain = prompt | llm | StrOutputParser()

    try:
        summary = chain.invoke(metrics)
        return summary
    except Exception as e:
        print(f"Error generating LLM summary: {e}")
        return

def run_pipeline_for_date(run_date, api_key):
    """Main function to run the full ETL pipeline for a specific date."""

    print(f"\n--- Starting On-Chain ETL Pipeline for {run_date.strftime('%Y-%m-%d')} ---")

    start_timestamp, end_timestamp = get_day_timestamps(run_date)

    print("Step 1: Extracting data from Etherscan...")
    try:
        start_block = get_block_number_by_timestamp(start_timestamp, ETHERSCAN_API_KEY)
        end_block = get_block_number_by_timestamp(end_timestamp, ETHERSCAN_API_KEY)
    except Exception as e:
        print(f"Error getting block range: {e}. Skipping this date.")
        return
    
    print(f"Processing blocks from {start_block} to {end_block}.")
    all_transactions = []
    block_numbers = range(start_block, end_block + 1)

    # call the API in parallel for each block
    with ThreadPoolExecutor(max_workers = 5) as executor:
        future_to_block = {executor.submit(get_block_transactions, b, ETHERSCAN_API_KEY): b for b in block_numbers}

        for future in as_completed(future_to_block):
            block_number = future_to_block[future]
            try:
                transactions = future.result()
                if transactions:
                    all_transactions.extend(transactions)
            except Exception as e:
                print(f"Error processing block {block_number}: {e}")

    print("Step 2: Transforming and aggregating data...")
    df, metrics_df = process_and_aggregate_transactions(all_transactions, run_date)

    if metrics_df.empty:
        print("Pipeline finished for this date with no data to save.")
        return

    print("Step 3: Generating LLM Insights...")
    metrics_dict = metrics_df.to_dict('records')[0]
    llm_summary = generate_daily_summary(metrics_dict, system_prompt, user_prompt)
    metrics_df['llm_summary'] = llm_summary

    print("Step 4: Loading data to Csv file...")
    output_dir = "output"
    os.makedirs(output_dir, exist_ok = True)
    filename = f"daily_metrics_{run_date.strftime('%Y%m%d')}.csv"
    output_path = os.path.join(output_dir, filename)
    metrics_df.to_csv(output_path, index = False)
    print(f"Successfully saved daily metrics and summary to: {output_path}")
    print(f"--- Pipeline Finished for {run_date.strftime('%Y-%m-%d')} ---")

    return df, metrics_df

In [61]:
now = datetime.now()

print("=====================================================")
print("Starting Daily On-Chain Data Processing Job")
print("=====================================================")

df, metrics_df = run_pipeline_for_date(now, ETHERSCAN_API_KEY)
    
print("\n=====================================================")
print("All tasks for today completed.")
print("=====================================================")

Starting Daily On-Chain Data Processing Job

--- Starting On-Chain ETL Pipeline for 2025-08-10 ---
Step 1: Extracting data from Etherscan...
Processing blocks from 23103849 to 23111003.
Step 2: Transforming and aggregating data...
Step 3: Generating LLM Insights...
Generating LLM summary...
Step 4: Loading data to Csv file...
Successfully saved daily metrics and summary to: output/daily_metrics_20250810.csv
--- Pipeline Finished for 2025-08-10 ---

All tasks for today completed.
