In [35]:
import requests
import os
import json
from dotenv import load_dotenv
import time
from typing import Dict, List, Optional
from datetime import datetime
import urllib.parse
import argparse
from typing import Tuple

In [36]:
load_dotenv()

True

In [37]:
# API Configuration
AIRTABLE_TOKEN = os.getenv('REACT_APP_AIRTABLE_TOKEN')
AIRTABLE_BASE_URL = 'https://api.airtable.com/v0/appZWDvjvDmVnOici'
TABLE_NAME = 'tblcXnFAf0IEvAQA6'
TARGET_VIEW_ID = 'viwF2Xc24CGNO7u5C'  # MEV Prefilter viwF2Xc24CGNO7u5C, Targeted: viwx6juMBenBuY6hs
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')

# Contract Explorer API Configuration (for fetching transaction data)
CONTRACT_API_BASE_URL = os.getenv('CONTRACT_API_BASE_URL', 'https://api.tenderly.co')

# HyperSync API Configuration
HYPERSYNC_API_URL = os.getenv('HYPERSYNC_API_URL')
HYPERSYNC_API_KEY = os.getenv('HYPERSYNC_API_KEY')

# API Headers
AIRTABLE_HEADERS = {
    'Authorization': f'Bearer {AIRTABLE_TOKEN}',
    'Content-Type': 'application/json'
}

GITHUB_HEADERS = {
    'Authorization': f'token {GITHUB_TOKEN}',
    'Accept': 'application/vnd.github.v3+json'
}

CONTRACT_API_HEADERS = {
    'Content-Type': 'application/json'
    # Add any additional headers needed for the contract API
}

HYPERSYNC_HEADERS = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {HYPERSYNC_API_KEY}'
}

# List of GitHub repositories to exclude from search results
EXCLUDED_REPOS = [
    "HelayLiu/utils_download",
    "KeystoneHQ/Smart-Contract-Metadata-Registry",
    "tangtj/"
]

HYPERSYNC_CHAIN_CONFIG = {
    # Chain ID: {"url": "...", "supports_traces": True/False}
    1: {"url": "https://eth.hypersync.xyz", "supports_traces": True},          # Ethereum Mainnet
    10: {"url": "https://optimism.hypersync.xyz", "supports_traces": False},     # Optimism
    56: {"url": "https://bsc.hypersync.xyz", "supports_traces": False},         # Bsc (Update SUPPORTED_CHAINS if needed)
    100: {"url": "https://gnosis.hypersync.xyz", "supports_traces": False},      # Gnosis (Add to SUPPORTED_CHAINS if needed)
    137: {"url": "https://polygon.hypersync.xyz", "supports_traces": False},     # Polygon
    250: {"url": "https://fantom.hypersync.xyz", "supports_traces": False},     # Fantom (Add to SUPPORTED_CHAINS if needed)
    8453: {"url": "https://base.hypersync.xyz", "supports_traces": False},       # Base
    42161: {"url": "https://arbitrum.hypersync.xyz", "supports_traces": False},  # Arbitrum
    43114: {"url": "https://avalanche.hypersync.xyz", "supports_traces": False},# Avalanche (Add to SUPPORTED_CHAINS if needed)
    # Add other mainnets or testnets as required, checking trace support from the list
}

# List of supported blockchain chains and their IDs
SUPPORTED_CHAINS = {
    "base": 8453,
    "ethereum": 1,
    "optimism": 10,
    "arbitrum": 42161,
    "polygon": 137
}

# Check for missing configuration
if not all([AIRTABLE_TOKEN, GITHUB_TOKEN]):
    print("WARNING: Missing AIRTABLE_TOKEN or GITHUB_TOKEN environment variable!")
if CONTRACT_API_BASE_URL == 'https://api.example.com':
    print("WARNING: CONTRACT_API_BASE_URL is set to the placeholder. Please update it.")
if not HYPERSYNC_API_URL or not HYPERSYNC_API_KEY:
    print("WARNING: Missing HYPERSYNC_API_URL or HYPERSYNC_API_KEY environment variable!")

print("Configuration loaded.")

Configuration loaded.


In [6]:
class AirtableAPI:
    """Handle all Airtable API interactions."""

    @staticmethod
    def save_no_github_contracts(contract_address: str, record_id: str, origin_key: str = None):
        """Save contracts with no GitHub repositories to a JSON file."""
        filename = 'no_github_contracts.json'
        timestamp = datetime.now().isoformat()
        
        # Create the data structure for this contract
        contract_data = {
            'contract_address': contract_address,
            'record_id': record_id,
            'origin_key': origin_key,
            'timestamp': timestamp
        }
        
        try:
            # Load existing data if file exists
            existing_data = []
            if os.path.exists(filename):
                with open(filename, 'r') as f:
                    try:
                        existing_data = json.load(f)
                    except json.JSONDecodeError:
                        print(f"Warning: {filename} contains invalid JSON. Initializing as empty list.")
                        existing_data = []
            
            # Ensure existing_data is a list
            if not isinstance(existing_data, list):
                print(f"Warning: {filename} does not contain a list. Initializing as empty list.")
                existing_data = []
                
            # Add new contract data
            existing_data.append(contract_data)
            
            # Save updated data
            with open(filename, 'w') as f:
                json.dump(existing_data, f, indent=2)
                
            print(f"Saved contract {contract_address} to {filename}")
            
        except Exception as e:
            print(f"Error saving to {filename}: {str(e)}")

    @staticmethod
    def get_view_structure(view_id: str = TARGET_VIEW_ID) -> Dict:
        """Fetch basic information about a view to provide context."""
        url = f"{AIRTABLE_BASE_URL}/{TABLE_NAME}?view={view_id}"
        
        try:
            response = requests.get(url, headers=AIRTABLE_HEADERS, timeout=30)
            response.raise_for_status()
            data = response.json()
            
            # Extract basic view information
            records = data.get('records', [])
            
            # Get field names from the first record
            field_names = []
            if records and len(records) > 0:
                field_names = list(records[0].get('fields', {}).keys())
            
            # Create a simple structure summary
            view_structure = {
                'view_id': view_id,
                'total_records_in_response': len(records),
                'field_names': field_names,
                'sample_record': records[0].get('fields') if records else None
            }
            
            return view_structure
            
        except requests.exceptions.RequestException as e:
            print(f"Error fetching view structure: {str(e)}")
            return {'error': str(e)}

    @staticmethod
    def fetch_all_unprocessed_contracts() -> List[Dict]:
        """Fetch all unprocessed contracts from the target view regardless of origin_key."""
        all_records = []

        # Filter for records where repo_count field doesn't exist or is null
        filter_formula = "NOT({repo_count})"
        base_url = (
            f"{AIRTABLE_BASE_URL}/{TABLE_NAME}"
            f"?view={TARGET_VIEW_ID}"
            f"&filterByFormula={urllib.parse.quote(filter_formula)}"
            f"&fields[]=address&fields[]=origin_key&fields[]=repo_count&fields[]=chain"
        )

        offset = None
        page = 1

        while True:
            try:
                # Add offset for pagination if it exists
                url = f"{base_url}&offset={offset}" if offset else base_url
                print(f"Fetching page {page}: {url}")
                response = requests.get(url, headers=AIRTABLE_HEADERS, timeout=30)
                
                if response.status_code != 200:
                    print(f"Airtable API Error ({response.status_code}): {response.text}")
                response.raise_for_status()

                data = response.json()
                records = data.get('records', [])
                all_records.extend(records)

                print(f"Page {page}: Fetched {len(records)} records. Total: {len(all_records)}")

                # Check for more pages
                offset = data.get('offset')
                if not offset:
                    break

                page += 1
                time.sleep(0.2)  # Respect rate limits

            except requests.exceptions.RequestException as e:
                print(f"Error fetching page {page}: {str(e)}")
                break
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON from Airtable on page {page}: {str(e)}")
                print(f"Response text: {response.text}")
                break

        print(f"Total unprocessed records (NULL repo_count): {len(all_records)}")
        return all_records

    @staticmethod
    def update_record(record_id: str, github_found: bool, repo_count: int, 
                     transaction_data: Optional[Dict] = None) -> bool:
        """Update an Airtable record with GitHub search results and optional transaction data."""
        url = f"{AIRTABLE_BASE_URL}/{TABLE_NAME}/{record_id}"

        # Create the fields dictionary with github_found and repo_count
        fields = {
            "github_found": github_found,
            "repo_count": repo_count
        }
        
        # Add transaction data if provided
        if transaction_data:
            try:
                fields["transaction_data"] = json.dumps(transaction_data)
            except TypeError as e:
                print(f"Error serializing transaction_data for record {record_id}: {str(e)}")
                return False

        request_body = {
            "fields": fields,
            "typecast": True
        }

        max_retries = 3
        current_retry = 0

        while current_retry < max_retries:
            try:
                response = requests.patch(
                    url,
                    headers=AIRTABLE_HEADERS,
                    json=request_body,
                    verify=True,
                    timeout=30
                )
                
                if response.status_code != 200:
                    print(f"Airtable Update Error ({response.status_code}) for record {record_id}: {response.text}")
                response.raise_for_status()
                
                status = "FOUND" if github_found else "NOT FOUND"
                print(f"Updated record {record_id} - GitHub repositories: {status} (count: {repo_count})")
                if transaction_data:
                    print(f"Added transaction data for {record_id} with {len(transaction_data.get('transactions', []))} transactions")
                return True

            except requests.exceptions.RequestException as e:
                current_retry += 1
                print(f"Error updating record {record_id} (Attempt {current_retry}/{max_retries}): {str(e)}")

                if current_retry < max_retries:
                    time.sleep(2 ** current_retry)  # Exponential backoff
                    continue

                return False

print("AirtableAPI class defined.")

AirtableAPI class defined.


In [7]:
class GitHubAPI:
    """Handle all GitHub API interactions."""

    @staticmethod
    def search_contract(address: str) -> Tuple[bool, int]:
        """Search GitHub for a contract address and return whether any results were found."""
        url = "https://api.github.com/search/code"
        params = {
            "q": address,
            "per_page": 10  # Use larger page size to minimize pagination
        }

        max_retries = 3
        current_retry = 0

        while current_retry < max_retries:
            try:
                response = requests.get(
                    url,
                    headers=GITHUB_HEADERS,
                    params=params,
                    verify=True,
                    timeout=30
                )

                # Handle rate limiting
                if response.status_code == 403:
                    remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
                    reset_time = int(response.headers.get('X-RateLimit-Reset', time.time()))
                    if remaining == 0:
                        sleep_time = max(1, reset_time - int(time.time())) + 5  # Add buffer
                        print(f"GitHub rate limit exceeded. Waiting {sleep_time} seconds...")
                        time.sleep(sleep_time)
                        continue

                if response.status_code != 200:
                    print(f"GitHub API Error ({response.status_code}): {response.text}")
                response.raise_for_status()
                search_results = response.json()
                
                # Filter out results from excluded repositories
                valid_items = []
                excluded_count = 0
                for item in search_results.get('items', []):
                    repo_full_name = item.get('repository', {}).get('full_name', '')
                    if not repo_full_name:
                        continue
                        
                    should_exclude = False
                    for excluded_repo in EXCLUDED_REPOS:
                        if excluded_repo.endswith('/'):  # This is a user/org prefix
                            if repo_full_name.startswith(excluded_repo):
                                should_exclude = True
                                break
                        elif repo_full_name == excluded_repo:  # Exact match
                            should_exclude = True
                            break
                    
                    if not should_exclude:
                        valid_items.append(item)
                    else:
                        excluded_count += 1
                
                valid_count = len(valid_items)
                total_api_count = search_results.get('total_count', 0)
                print(f"GitHub search for {address}: API found {total_api_count}, Valid found: {valid_count}, Excluded: {excluded_count}")
                
                return (valid_count > 0, valid_count)

            except requests.exceptions.RequestException as e:
                current_retry += 1
                print(f"Error searching GitHub (Attempt {current_retry}/{max_retries}): {str(e)}")

                if current_retry < max_retries:
                    time.sleep(2 ** current_retry)  # Exponential backoff
                    continue

                return (False, 0)
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON from GitHub: {str(e)}")
                print(f"Response text: {response.text}")
                return (False, 0)

print("GitHubAPI class defined.")

GitHubAPI class defined.


In [8]:
class ContractAPI:
    """Handle API interactions with the contract explorer API."""
    
    @staticmethod
    def get_recent_transactions(contract_address: str, chain_id: int, limit: int = 5) -> List[Dict]:
        """Get the most recent transactions for a contract address."""
        url = f"{CONTRACT_API_BASE_URL}/api/v1/public-contract/{chain_id}/address/{contract_address}/explorer/transactions"
        params = {
            "limit": 5
        }
        
        max_retries = 3
        current_retry = 0
        
        while current_retry < max_retries:
            try:
                response = requests.get(
                    url,
                    headers=CONTRACT_API_HEADERS,
                    params=params,
                    verify=True,
                    timeout=30
                )
                
                if response.status_code != 200:
                    print(f"Contract API Error (Transactions, {response.status_code}) for {contract_address} on chain {chain_id}: {response.text}")
                response.raise_for_status()
                
                transactions = response.json()
                # Ensure the response is a list
                if not isinstance(transactions, list):
                    print(f"Warning: Unexpected response type for transactions (expected list, got {type(transactions)}). Data: {transactions}")
                    return []
                
                print(f"Retrieved {len(transactions)} recent transactions for contract {contract_address} on chain {chain_id}")
                return transactions
                
            except requests.exceptions.RequestException as e:
                current_retry += 1
                print(f"Error fetching transactions (Attempt {current_retry}/{max_retries}): {str(e)}")
                
                if current_retry < max_retries:
                    time.sleep(2 ** current_retry)  # Exponential backoff
                    continue
                
                return []
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON from Contract API (Transactions): {str(e)}")
                print(f"Response text: {response.text}")
                return []
    
    @staticmethod
    def get_transaction_trace(tx_hash: str, chain_id: int) -> Optional[Dict]:
        """Get the trace for a specific transaction."""
        url = f"{CONTRACT_API_BASE_URL}/api/v1/public-contract/{chain_id}/trace/{tx_hash}"
        
        max_retries = 3
        current_retry = 0
        
        while current_retry < max_retries:
            try:
                response = requests.get(
                    url,
                    headers=CONTRACT_API_HEADERS,
                    verify=True,
                    timeout=30
                )
                
                if response.status_code != 200:
                    print(f"Contract API Error (Trace, {response.status_code}) for tx {tx_hash} on chain {chain_id}: {response.text}")
                response.raise_for_status()
                
                trace_data = response.json()
                # Ensure the response is a dict
                if not isinstance(trace_data, dict):
                    print(f"Warning: Unexpected response type for trace (expected dict, got {type(trace_data)}). Data: {trace_data}")
                    return None
                
                print(f"Retrieved trace data for transaction {tx_hash} on chain {chain_id}")
                return trace_data
                
            except requests.exceptions.RequestException as e:
                current_retry += 1
                print(f"Error fetching trace data for {tx_hash} (Attempt {current_retry}/{max_retries}): {str(e)}")
                
                if current_retry < max_retries:
                    time.sleep(2 ** current_retry)  # Exponential backoff
                    continue
                
                return None
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON from Contract API (Trace): {str(e)}")
                print(f"Response text: {response.text}")
                return None
    
    @staticmethod
    def process_contract_additional_data(contract_address: str, chain: str) -> Optional[Dict]:
        """Process a contract by fetching recent transactions and their traces.
        Returns a dictionary containing 'transactions' and 'traces' lists, or None if processing fails.
        """
        result = {
            "transactions": [],
            "traces": []
        }
        
        # Get the chain ID from the chain name
        chain_id = SUPPORTED_CHAINS.get(str(chain).lower())
        if not chain_id:
            print(f"Unsupported or invalid chain: '{chain}'. Skipping additional API checks.")
            return None
        
        # Get recent transactions
        transactions = ContractAPI.get_recent_transactions(contract_address, chain_id)
        if not transactions:
            print(f"No transactions found or error fetching transactions for contract {contract_address} on chain {chain_id}")
            return None
        
        result["transactions"] = transactions
        
        # Get trace data for each transaction
        successful_traces = 0
        for tx in transactions:
            if isinstance(tx, dict) and 'hash' in tx:
                tx_hash = tx.get("hash")
                if tx_hash and isinstance(tx_hash, str):
                    trace = ContractAPI.get_transaction_trace(tx_hash, chain_id)
                    if trace:
                        result["traces"].append({
                            "tx_hash": tx_hash,
                            "trace_data": trace
                        })
                        successful_traces += 1
                    else:
                        print(f"Failed to retrieve trace for tx {tx_hash}")
                    # Add a small delay between API calls to avoid rate limiting
                    time.sleep(0.5)
                else:
                    print(f"Skipping trace fetch due to invalid or missing hash in transaction: {tx}")
            else:
                print(f"Skipping trace fetch due to invalid transaction format: {tx}")
                
        print(f"Successfully retrieved traces for {successful_traces}/{len(transactions)} transactions.")
        return result

print("ContractAPI class defined.")

ContractAPI class defined.


In [44]:
import requests
import time
import json
from typing import List, Dict, Optional, Tuple

class HyperSyncAPI:
    """Handles API interactions with the Envio HyperSync API."""

    # --- Field Selections ---
    # Adjust these lists based on the exact data fields you need from each table

    TRANSACTION_FIELDS_FOR_RECENT = [
        "hash", "block_number", "transaction_index", "from", "to", "value",
        "gas_used", "status", "input", "type", "gas_price", "effective_gas_price"
        # Add other needed fields like max_fee_per_gas etc.
    ]
    BLOCK_FIELDS_FOR_RECENT = ["timestamp", "number"] # Need timestamp for sorting

    LOG_FIELDS = [
        "log_index", "transaction_index", "transaction_hash", "block_number",
        "address", "data",
        "topic0", "topic1", "topic2", "topic3"
    ]

    TRACE_FIELDS = [
        "transaction_hash", "trace_address", "block_number", "from", "to", "value",
        "gas", "gas_used", "input", "output", "type", "call_type", "error"
    ]

    # --- Static Methods ---

    @staticmethod
    def _send_query(chain_id: int, query_payload: Dict) -> Optional[Dict]:
        """Sends a query to the HyperSync API for the specific chain and handles basic errors/retries."""

        # Get chain-specific config
        chain_config = HYPERSYNC_CHAIN_CONFIG.get(chain_id)
        if not chain_config:
            print(f"Error: HyperSync configuration not found for chain ID {chain_id}.")
            return None
        
        # Ensure URL ends with /query
        base_url = chain_config["url"]
        url = base_url if base_url.endswith('/query') else f"{base_url}/query"

        # Check API key configuration
        if not url or not HYPERSYNC_HEADERS.get('Authorization') or 'None' in HYPERSYNC_HEADERS.get('Authorization', ''):
             print("Error: HyperSync API URL or Key not configured.")
             return None

        max_retries = 3
        current_retry = 0

        while current_retry < max_retries:
            try:
                print(f"Sending HyperSync Query to {url} (Attempt {current_retry + 1})")
                # print(f"Payload: {json.dumps(query_payload, indent=2)}") # Uncomment for detailed payload logging
                response = requests.post(
                    url,
                    headers=HYPERSYNC_HEADERS,
                    json=query_payload,
                    timeout=60 # Queries can take longer
                )

                # Handle Rate Limiting
                if response.status_code == 429:
                    print(f"HyperSync rate limit hit. Retrying after delay...")
                    time.sleep(5 * (current_retry + 1))
                    current_retry += 1
                    continue

                # Handle other errors
                if response.status_code != 200:
                     print(f"HyperSync API Error ({response.status_code}): {response.text}")
                response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)

                # Decode JSON
                data = response.json()
                print("HyperSync Query successful.")
                # print(f"HyperSync Response Snippet: {json.dumps(data, indent=2)[:500]}...") # Optional
                return data

            except requests.exceptions.RequestException as e:
                current_retry += 1
                print(f"Error sending HyperSync query (Attempt {current_retry}/{max_retries}): {str(e)}")
                if current_retry < max_retries:
                    time.sleep(2 ** current_retry) # Exponential backoff
                    continue
                return None # Failed after all retries
            except json.JSONDecodeError as e:
                 print(f"Error decoding JSON from HyperSync: {str(e)}")
                 print(f"Response text: {response.text}")
                 return None # Stop retrying on decode error

        return None # Fallback if loop finishes unexpectedly

    @staticmethod
    def get_archive_height(chain_id: int) -> Optional[int]:
        """Gets the current archive height from HyperSync for a given chain."""
        height = None
        try:
            # --- Correction Start ---
            # Get the chain configuration
            chain_config = HYPERSYNC_CHAIN_CONFIG.get(chain_id)
            if not chain_config:
                print(f"Error: No HyperSync configuration found for chain {chain_id}")
                return None
            base_url = chain_config.get("url")
            if not base_url:
                 print(f"Error: No URL found in config for chain {chain_id}")
                 return None
            # --- Correction End ---

            # Attempt to hit the /height endpoint first
            # Construct the /height URL correctly (remove /query if present)
            height_url = base_url.replace('/query', '') + '/height'
            print(f"Attempting to fetch height from: {height_url}")

            response = requests.get(
                height_url,
                headers=HYPERSYNC_HEADERS, # Use the class headers
                timeout=30
            )

            if response.status_code == 200:
                data = response.json()
                height_value = data.get('height')
                if height_value is not None:
                    height = int(height_value) # Store the height if found
                    print(f"HyperSync archive height for chain {chain_id} from /height endpoint: {height}")
                else:
                    print(f"Warning: /height endpoint for chain {chain_id} did not return 'height' field.")
            else:
                print(f"Warning: Failed to fetch from /height endpoint (HTTP {response.status_code}). Will attempt fallback.")

        except requests.exceptions.RequestException as e:
            print(f"Warning: RequestException trying /height endpoint for chain {chain_id}: {e}. Will attempt fallback.")
        except json.JSONDecodeError as e:
            print(f"Warning: JSONDecodeError trying /height endpoint for chain {chain_id}: {e}. Response: {response.text}. Will attempt fallback.")
        except Exception as e: # Catch other potential errors like int conversion
             print(f"Warning: Unexpected error trying /height endpoint for chain {chain_id}: {e}. Will attempt fallback.")


        # --- Fallback Logic ---
        # If height wasn't successfully retrieved from the /height endpoint, use the query method
        if height is None:
            print(f"Falling back to using /query endpoint to get archive height for chain {chain_id}...")
            query = {"field_selection": {}} # Empty query just to get metadata
            response_data = HyperSyncAPI._send_query(chain_id, query) # Use the query method
            if response_data:
                height_value = response_data.get("archive_height")
                if height_value is not None:
                     height = int(height_value)
                     print(f"HyperSync archive height for chain {chain_id} from /query fallback: {height}")


        # Final Check and Return
        if height is None:
             print(f"Could not retrieve archive height for chain {chain_id} via /height or /query fallback.")
             return None
        else:
             return height


    @staticmethod
    def get_recent_transactions(contract_address: str, chain_id: int, limit: int = 5, block_range: int = 1000) -> List[Dict]:
        """
        Get the most recent transactions TO a specific contract address using HyperSync.
        NOTE: Requires client-side sorting as HyperSync doesn't guarantee order beyond block range.
        """
        print(f"Fetching archive height for chain {chain_id} to determine query range...")
        archive_height = HyperSyncAPI.get_archive_height(chain_id)
        if archive_height is None:
            print("Could not determine archive height. Cannot fetch recent transactions.")
            return []

        from_block = max(0, archive_height - block_range)
        # Query up to and including the archive height
        to_block = archive_height + 1

        print(f"Querying HyperSync on chain {chain_id} for transactions to {contract_address} in blocks [{from_block}, {to_block - 1}]...")

        query = {
            "from_block": from_block,
            "to_block": to_block,
            "transactions": [{
                # Ensure address is lowercase for consistent matching
                "to": [contract_address.lower()]
            }],
            "field_selection": {
                "block": HyperSyncAPI.BLOCK_FIELDS_FOR_RECENT,
                "transaction": HyperSyncAPI.TRANSACTION_FIELDS_FOR_RECENT
            },
            "join_mode": "JoinAll" # Get associated block data (like timestamp)
        }

        response = HyperSyncAPI._send_query(chain_id, query)

        if not response or 'data' not in response:
            print("No data received from HyperSync or unexpected response format.")
            return []

        # --- Start Correction ---
        # Extract and combine block/transaction data efficiently
        all_txs = []
        # Create a lookup for block timestamps, ensuring items are dictionaries
        block_timestamps = {}
        for b in response['data'].get('blocks', []):
            # Check if b is a dictionary AND has a 'number' before accessing
            if isinstance(b, dict) and b.get('number') is not None:
                block_timestamps[b.get('number')] = b.get('timestamp')
            else:
                # Log if an unexpected item is found in the blocks list
                print(f"Warning: Unexpected item found in 'blocks' list: {type(b)} - {b}")
        # --- End Correction ---

        # The rest of the transaction processing loop remains the same
        for tx in response['data'].get('transactions', []):
            block_num = tx.get('block_number')
            if block_num is not None:
                tx_data = tx.copy() # Create a copy to modify
                tx_data['timestamp'] = block_timestamps.get(block_num) # Add timestamp from block lookup
                all_txs.append(tx_data)
            else:
                 print(f"Warning: Transaction missing block_number: {tx.get('hash')}")


        # Sort client-side: Primary: block number (desc), Secondary: tx index (desc)
        all_txs.sort(key=lambda x: (x.get('block_number', 0), x.get('transaction_index', 0)), reverse=True)

        print(f"Retrieved {len(all_txs)} transactions in range via HyperSync. Returning latest {limit}.")
        return all_txs[:limit]

    @staticmethod
    def get_transaction_logs(tx_hash: str, chain_id: int, block_number: int) -> Optional[List[Dict]]:
        """
        Get the logs for a specific transaction using HyperSync.
        Requires the block_number where the transaction occurred.
        Returns a list of log dictionaries (or None if API error, empty list if no logs found for TX).
        """
        print(f"Querying HyperSync on chain {chain_id} for logs of tx {tx_hash} in block {block_number}...")

        query = {
            "from_block": block_number,
            "to_block": block_number + 1, # Query only the specific block
            "include_all_blocks": False, # Don't need block data unless specifically requested below
            "field_selection": {
                 "log": HyperSyncAPI.LOG_FIELDS,
                 # "block": ["number", "timestamp"] # Optionally include block info if needed per log
            },
            "join_mode": "Default"
        }
        # Note: We fetch all logs for the block and filter client-side by transaction_hash.

        response = HyperSyncAPI._send_query(chain_id, query)

        # Check if response is valid and contains the 'logs' key in 'data'
        if not response or 'data' not in response or 'logs' not in response['data']:
            print(f"No log data received from HyperSync or unexpected format for block {block_number}.")
            # Distinguish between API error (None) and valid response with no logs (empty list is handled below)
            if response is None:
                return None # API error occurred
            else:
                # Response received, but no 'logs' array within 'data'
                print(f"Response received, but no 'logs' field found in data for block {block_number}.")
                return []


        # Filter logs client-side for the specific transaction hash
        tx_logs = []
        target_tx_hash_lower = tx_hash.lower()
        for log in response['data']['logs']:
            log_tx_hash = log.get('transaction_hash')
            # Ensure log_tx_hash is not None before lowercasing
            if log_tx_hash and log_tx_hash.lower() == target_tx_hash_lower:
                tx_logs.append(log)

        # Log outcome
        if not tx_logs:
            print(f"No logs found specifically matching transaction hash {tx_hash} within block {block_number}.")
        else:
            print(f"Retrieved {len(tx_logs)} logs for transaction {tx_hash} via HyperSync.")

        # Return empty list if no logs found for this specific TX, or the list if found
        return tx_logs


    @staticmethod
    def get_transaction_trace(tx_hash: str, chain_id: int, block_number: int) -> Optional[Dict]:
        """
        Get the trace for a specific transaction using HyperSync.
        Requires the block_number where the transaction occurred.
        Returns a dictionary containing trace information (or None if error/not found/unsupported).
        """
        # Check trace support for the chain first
        chain_config = HYPERSYNC_CHAIN_CONFIG.get(chain_id)
        if not chain_config:
             print(f"Error: HyperSync configuration not found for chain ID {chain_id}. Cannot fetch trace.")
             return None
        if not chain_config.get("supports_traces", False):
            # Don't print warning here, let caller handle logic
            # print(f"Trace fetch skipped: Chain ID {chain_id} does not support traces.")
            return None # Indicate traces are unavailable/unsupported

        print(f"Querying HyperSync on chain {chain_id} for traces of tx {tx_hash} in block {block_number}...")

        query = {
            "from_block": block_number,
            "to_block": block_number + 1, # Query only the specific block
            "include_all_blocks": False,
            "field_selection": {
                 "trace": HyperSyncAPI.TRACE_FIELDS,
            },
            "join_mode": "Default"
        }
        # Note: Fetching all traces for the block and filtering client-side by transaction_hash.

        response = HyperSyncAPI._send_query(chain_id, query)

        if not response or 'data' not in response or 'traces' not in response['data']:
            print(f"No trace data received from HyperSync or unexpected format for block {block_number}.")
            if response is None: return None # API error
            else: return None # Treat missing traces in response as "not found" for this specific request

        # Filter traces client-side for the specific transaction hash
        tx_traces = []
        target_tx_hash_lower = tx_hash.lower()
        for trace in response['data']['traces']:
            trace_tx_hash = trace.get('transaction_hash')
            if trace_tx_hash and trace_tx_hash.lower() == target_tx_hash_lower:
                tx_traces.append(trace)

        if not tx_traces:
            print(f"No traces found specifically matching transaction hash {tx_hash} within block {block_number}.")
            return None # Return None if no traces found for this TX

        print(f"Retrieved {len(tx_traces)} trace entries for transaction {tx_hash} via HyperSync.")
        # Return traces wrapped in a dictionary for consistency
        return {"traces": tx_traces}


    @staticmethod
    def process_contract_additional_data(contract_address: str, chain: str) -> Optional[Dict]:
        """
        Process a contract using HyperSync: fetch recent transactions, their logs,
        and optionally traces if supported by the network.
        Returns a dictionary containing 'transactions', 'logs', and potentially 'traces',
        or None if fetching recent transactions fails initially.
        """
        # Prepare result structure
        result = {
            "transactions": [],
            "logs": [], # Stores logs per transaction: {tx_hash: ..., log_data: [...]}
            "traces": [] # Stores traces per transaction: {tx_hash: ..., trace_data: {...}}
        }

        # Validate chain and get ID
        chain_id = SUPPORTED_CHAINS.get(str(chain).lower())
        if not chain_id:
            print(f"Unsupported or invalid chain name: '{chain}'. Skipping HyperSync API checks.")
            return None # Cannot proceed without a valid/supported chain ID

        # --- 1. Get recent transactions ---
        transactions = HyperSyncAPI.get_recent_transactions(contract_address, chain_id)
        # Critical Check: If we fail to get transactions, we cannot proceed.
        if not transactions: # Handles both None (error) and empty list (no txs found)
            print(f"No recent transactions found via HyperSync for contract {contract_address} on chain {chain_id}. Cannot fetch logs/traces.")
            return None # Indicate failure or lack of data to process further

        result["transactions"] = transactions
        print(f"Found {len(transactions)} recent transactions to process for logs/traces.")

        # Determine if traces should be fetched for this chain
        chain_config = HYPERSYNC_CHAIN_CONFIG.get(chain_id, {})
        fetch_traces = chain_config.get("supports_traces", False)
        if fetch_traces:
             print(f"Chain {chain_id} supports traces. Will attempt to fetch them.")
        else:
             print(f"Chain {chain_id} does not support traces. Skipping trace fetch.")

        # --- 2. Get logs (and optionally traces) for each transaction ---
        successful_logs_found = 0 # Count TXs for which we found >0 logs
        successful_traces_found = 0 # Count TXs for which we found >0 trace entries

        for tx in transactions:
            tx_hash = tx.get("hash")
            block_num = tx.get("block_number")

            if tx_hash and block_num is not None:
                # --- Fetch Logs ---
                log_data = HyperSyncAPI.get_transaction_logs(tx_hash, chain_id, block_num)
                # Check if log_data is None (API error) or a list (could be empty)
                if log_data is not None:
                    result["logs"].append({
                        "tx_hash": tx_hash,
                        "log_data": log_data # Append the list of logs (could be empty)
                    })
                    if log_data: # Increment count only if the list wasn't empty
                         successful_logs_found += 1
                else:
                    # Log API error if get_transaction_logs returned None
                    print(f"Failed to retrieve logs for tx {tx_hash} due to API error or timeout.")

                # --- Optionally Fetch Traces ---
                if fetch_traces:
                    trace_data = HyperSyncAPI.get_transaction_trace(tx_hash, chain_id, block_num)
                    # Check if trace_data is None (error/unsupported/not found) or a dict
                    if trace_data: # Contains {"traces": [...]}
                        result["traces"].append({
                            "tx_hash": tx_hash,
                            "trace_data": trace_data
                        })
                        successful_traces_found += 1
                    # else: No need to print failure again, get_transaction_trace already did

                # --- Delay ---
                # Small delay to avoid overwhelming the API when processing multiple transactions
                time.sleep(0.3) # Adjusted delay slightly
            else:
                print(f"Skipping log/trace fetch for a transaction due to missing hash or block_number: {tx}")

        # --- Final Logging ---
        print(f"Finished processing additional data: Found logs for {successful_logs_found}/{len(transactions)} transactions.")
        if fetch_traces:
            print(f"Finished processing additional data: Found traces for {successful_traces_found}/{len(transactions)} transactions.")

        # Return the result dictionary (even if logs/traces arrays are empty for some TXs)
        return result

# --- End of Class Definition ---

# Optional: Add a print statement after the class definition in your notebook cell
print("HyperSyncAPI class defined with log fetching and network awareness.")


HyperSyncAPI class defined with log fetching and network awareness.


In [17]:
def process_all_contracts():
    """Process all unprocessed contracts from the target view."""
    processed_count = 0
    updated_count = 0
    failed_update_count = 0
    github_not_found_count = 0
    additional_api_checked_count = 0
    
    try:
        # Fetch all unprocessed records directly from the view
        print(f"Fetching unprocessed records from Airtable view {TARGET_VIEW_ID}...")
        records = AirtableAPI.fetch_all_unprocessed_contracts()

        if not records:
            print(f"No unprocessed records found in view {TARGET_VIEW_ID}.")
            return
            
        total_records = len(records)
        print(f"Found {total_records} unprocessed records to process.")

        # Process each record
        for index, record in enumerate(records, 1):
            processed_count += 1
            record_id = record.get('id')
            fields = record.get('fields', {})
            contract_address = fields.get('address')
            origin_key = fields.get('origin_key')
            # Default to 'base' chain if missing
            chain = fields.get('chain') or 'base'

            if not record_id:
                print(f"Skipping record {index}/{total_records} - missing record ID")
                continue
                
            if not contract_address:
                print(f"Skipping record {record_id} ({index}/{total_records}) - no contract address")
                continue

            print(f"--- Processing {index}/{total_records}: Record ID {record_id}, Address {contract_address}, Chain {chain} ---")

            # Search GitHub
            github_found, repo_count = GitHubAPI.search_contract(contract_address)

            # If no GitHub repositories found, try the additional API
            transaction_data = None
            if not github_found:
                github_not_found_count += 1
                print(f"No GitHub repositories found for {contract_address}, checking Contract API...")
                additional_api_checked_count += 1
                transaction_data = ContractAPI.process_contract_additional_data(contract_address, chain)
                
                # Save to our tracking file regardless of transaction data success
                AirtableAPI.save_no_github_contracts(contract_address, record_id, origin_key)
            
            # Update Airtable with github_found, repo_count, and transaction data if available
            update_successful = AirtableAPI.update_record(record_id, github_found, repo_count, transaction_data)
            if update_successful:
                updated_count += 1
            else:
                failed_update_count += 1
                print(f"Failed to update Airtable record {record_id}")

            # Rate limiting delay between processing records
            print(f"--- Finished processing record {record_id}. Sleeping... ---")
            time.sleep(2)

        print(f"\n=== Processing Summary ===")
        print(f"Total records processed: {processed_count}")
        print(f"Airtable records updated successfully: {updated_count}")
        print(f"Airtable records failed to update: {failed_update_count}")
        print(f"Contracts without GitHub presence: {github_not_found_count}")
        print(f"Contracts checked via Contract API: {additional_api_checked_count}")
        print(f"=========================")

    except Exception as e:
        print(f"An unexpected error occurred during the process: {str(e)}")

print("process_all_contracts function defined.")

process_all_contracts function defined.


In [6]:
# Test 1: Check Airtable View Structure
if AIRTABLE_TOKEN:
    print(f"Checking structure of view: {TARGET_VIEW_ID}")
    view_info = AirtableAPI.get_view_structure()
    print(json.dumps(view_info, indent=2))
else:
    print("Skipping Airtable view structure check (missing AIRTABLE_TOKEN).")

Checking structure of view: viwF2Xc24CGNO7u5C
{
  "view_id": "viwF2Xc24CGNO7u5C",
  "total_records": 48,
  "field_names": [
    "address",
    "gas_eth",
    "origin_key",
    "avg_daa",
    "contract_name",
    "txcount",
    "Blockexplorer",
    "Github Search",
    "Google",
    "avg txcost",
    "Dedaub",
    "rel_cost",
    "avg_success",
    "is_proxy",
    "day_range",
    "Tenderly",
    "block_explorer_url"
  ],
  "sample_record": {
    "address": "0x83885CaB02a0b906836225442Fa17DcFE9cBf797",
    "gas_eth": 32.28007307967115,
    "origin_key": [
      "recII4EbzHDPNgVY1"
    ],
    "avg_daa": 7,
    "contract_name": "StrategyExecutor",
    "txcount": 332519,
    "Blockexplorer": "https://arbiscan.io/address/0x83885CaB02a0b906836225442Fa17DcFE9cBf797",
    "Github Search": "https://github.com/search?q=0x83885CaB02a0b906836225442Fa17DcFE9cBf797&type=code",
    "Google": "https://www.google.com/search?q=0x83885CaB02a0b906836225442Fa17DcFE9cBf797",
    "avg txcost": 9.707737927658

In [7]:
# Test 2: Fetch Unprocessed Contracts from Airtable
if AIRTABLE_TOKEN:
    print("Fetching a batch of unprocessed contracts...")
    unprocessed_records = AirtableAPI.fetch_all_unprocessed_contracts()
    if unprocessed_records:
        print(f"Fetched {len(unprocessed_records)} records. First record:")
        print(json.dumps(unprocessed_records[0], indent=2))
    else:
        print("No unprocessed records found or error fetching.")
else:
    print("Skipping Airtable fetch test (missing AIRTABLE_TOKEN).")

Fetching a batch of unprocessed contracts...
Page 1: Fetched 48 records. Total: 48
Total unprocessed records (NULL repo_count): 48
Fetched 48 records. First record:
{
  "id": "recwOBsxuN87BDpXV",
  "createdTime": "2025-04-24T13:05:14.000Z",
  "fields": {
    "address": "0x83885CaB02a0b906836225442Fa17DcFE9cBf797",
    "origin_key": [
      "recII4EbzHDPNgVY1"
    ]
  }
}


In [29]:
# Test 3: Search GitHub for a Specific Contract Address
test_address = "0x347cee5cC8C6FB4872123B40B799A8750f0E7EA2" # Example address
if GITHUB_TOKEN:
    print(f"Searching GitHub for address: {test_address}")
    found, count = GitHubAPI.search_contract(test_address)
    print(f"GitHub Search Result: Found={found}, Valid Count={count}")
else:
    print("Skipping GitHub search test (missing GITHUB_TOKEN).")

Searching GitHub for address: 0x347cee5cC8C6FB4872123B40B799A8750f0E7EA2
GitHub search for 0x347cee5cC8C6FB4872123B40B799A8750f0E7EA2: API found 1, Valid found: 0, Excluded: 1
GitHub Search Result: Found=False, Valid Count=0


In [30]:
# Test 4: Get Recent Transactions from Contract API
test_tx_address = "0xA8C70657eBd7C9005A3Caa062B0e01308BE9c422" # Example address
test_tx_chain = 'base' # Example chain 

if CONTRACT_API_BASE_URL != 'https://api.example.co':
    print(f"Fetching recent transactions for {test_tx_address} on chain {test_tx_chain}...")
    chain_id = SUPPORTED_CHAINS.get(test_tx_chain.lower())
    if chain_id:
        transactions = ContractAPI.get_recent_transactions(test_tx_address, chain_id)
        if transactions:
            print(f"Found {len(transactions)} transactions. First transaction:")
            print(json.dumps(transactions[0], indent=2))
        else:
            print("No transactions found or error fetching.")
    else:
        print(f"Chain '{test_tx_chain}' is not supported.")
else:
    print("Skipping Contract API transaction test (CONTRACT_API_BASE_URL not set).")

Fetching recent transactions for 0xA8C70657eBd7C9005A3Caa062B0e01308BE9c422 on chain base...
Contract API Error (Transactions, 404) for 0xA8C70657eBd7C9005A3Caa062B0e01308BE9c422 on chain 8453: {"error":{"id":"c35b2531-5234-42b4-ad72-9be9e20a17fd","slug":"resource_not_found","message":"Contract not found","data":{"rid":"rid:contract"}}}
Error fetching transactions (Attempt 1/3): 404 Client Error: Not Found for url: https://api.tenderly.co/api/v1/public-contract/8453/address/0xA8C70657eBd7C9005A3Caa062B0e01308BE9c422/explorer/transactions?limit=5
Contract API Error (Transactions, 404) for 0xA8C70657eBd7C9005A3Caa062B0e01308BE9c422 on chain 8453: {"error":{"id":"a3904fd7-44cc-4a07-b2bb-b09f16dccc51","slug":"resource_not_found","message":"Contract not found","data":{"rid":"rid:contract"}}}
Error fetching transactions (Attempt 2/3): 404 Client Error: Not Found for url: https://api.tenderly.co/api/v1/public-contract/8453/address/0xA8C70657eBd7C9005A3Caa062B0e01308BE9c422/explorer/transactio

In [46]:
# Test 5: Get Transaction Trace from Contract API
test_trace_tx_hash = "0x94056f84cf1f9895560df8ced4ee712b36a1b46c6dde51a94be60b2cd0937736" # Example hash
test_trace_chain = 'base' # Example chain

if CONTRACT_API_BASE_URL != 'https://api.example.com':
    print(f"Fetching trace for transaction {test_trace_tx_hash} on chain {test_trace_chain}...")
    chain_id = SUPPORTED_CHAINS.get(test_trace_chain.lower())
    if chain_id:
        trace_data = ContractAPI.get_transaction_trace(test_trace_tx_hash, chain_id)
        if trace_data:
            print(f"Found trace data (showing specific keys):")
            # Show method
            if 'method' in trace_data:
                print(f"Method: {trace_data['method']}")
            # Extract and show unique event names from logs
            if 'logs' in trace_data:
                event_names = set()
                for log in trace_data['logs']:
                    if 'name' in log:
                        event_names.add(log['name'])
                print(f"Event Names: {list(event_names)}")
            # Fallback if keys don't exist
            if 'method' not in trace_data and 'logs' not in trace_data:
                print(f"Requested keys 'method' and 'logs' not found in trace data.")
                print(f"Available keys: {list(trace_data.keys())}")
        else:
            print("No trace data found or error fetching.")
    else:
        print(f"Chain '{test_trace_chain}' is not supported.")
else:
    print("Skipping Contract API trace test (CONTRACT_API_BASE_URL not set).")

Fetching trace for transaction 0x94056f84cf1f9895560df8ced4ee712b36a1b46c6dde51a94be60b2cd0937736 on chain base...
Retrieved trace data for transaction 0x94056f84cf1f9895560df8ced4ee712b36a1b46c6dde51a94be60b2cd0937736 on chain 8453
Found trace data (showing specific keys):
Method: fallback


TypeError: 'NoneType' object is not iterable

In [None]:
# Test 6: Process Additional Data for a Specific Contract
test_proc_address = "0x347cee5cc8c6fb4872123b40b799a8750f0e7ea2" # Example address
test_proc_chain = 'base' # Example chain

if CONTRACT_API_BASE_URL != 'https://api.example.com':
    print(f"Processing additional data for {test_proc_address} on chain {test_proc_chain}...")
    additional_data = ContractAPI.process_contract_additional_data(test_proc_address, test_proc_chain)
    if additional_data:
        print(f"Processed data: Found {len(additional_data.get('transactions', []))} transactions and {len(additional_data.get('traces', []))} traces.")
        # print(json.dumps(additional_data, indent=2)) # Uncomment to print full data
    else:
        print("No additional data processed (check logs for reasons).")
else:
    print("Skipping Contract API processing test (CONTRACT_API_BASE_URL not set).")

In [32]:
# === Debug Cell: Check HyperSync Environment Variables ===

print(f"HYPERSYNC_API_URL: {os.getenv('HYPERSYNC_API_URL')}")
print(f"HYPERSYNC_API_KEY: {'*' * 5 + os.getenv('HYPERSYNC_API_KEY')[-4:] if os.getenv('HYPERSYNC_API_KEY') else None}") # Print partial key for verification

# You can also check the values stored in the variables used by the code:
print(f"\nVariable HYPERSYNC_API_URL: {HYPERSYNC_API_URL}")
print(f"Variable HYPERSYNC_API_KEY: {'*' * 5 + HYPERSYNC_API_KEY[-4:] if HYPERSYNC_API_KEY else None}")

# Check the condition components:
print(f"\nCondition Check:")
print(f"  HYPERSYNC_API_URL truthy? {bool(HYPERSYNC_API_URL)}")
print(f"  HYPERSYNC_API_KEY truthy? {bool(HYPERSYNC_API_KEY)}")
print(f"  HYPERSYNC_API_URL != placeholder? {HYPERSYNC_API_URL != 'https://api.example.com'}")

HYPERSYNC_API_URL: https://base.hypersync.xyz
HYPERSYNC_API_KEY: *****f3ae

Variable HYPERSYNC_API_URL: https://base.hypersync.xyz
Variable HYPERSYNC_API_KEY: *****f3ae

Condition Check:
  HYPERSYNC_API_URL truthy? True
  HYPERSYNC_API_KEY truthy? True
  HYPERSYNC_API_URL != placeholder? True


In [45]:
# === Test Cell: Process Additional Data for a Specific Contract via HyperSync ===

# --- Input ---
test_contract_address = "0xA8C70657eBd7C9005A3Caa062B0e01308BE9c422" # Replace with your desired test address
test_chain_name = 'base' # Replace with the desired chain name (e.g., 'ethereum', 'polygon')

# --- Execution ---
print(f"--- Starting HyperSync Test for Contract: {test_contract_address} on Chain: {test_chain_name} ---")

# Check if HyperSync is configured
hypersync_configured = HYPERSYNC_API_URL and HYPERSYNC_API_KEY and HYPERSYNC_API_URL != 'https://api.example.com'

if hypersync_configured:
    # Call the main processing function for the specific contract
    additional_data = HyperSyncAPI.process_contract_additional_data(test_contract_address, test_chain_name)

    # --- Output ---
    if additional_data is not None:
        print("\n--- HyperSync Processing Results ---")
        
        # Print summary counts
        num_txs = len(additional_data.get('transactions', []))
        logs_per_tx = additional_data.get('logs', [])
        traces_per_tx = additional_data.get('traces', [])
        num_logs_total = sum(len(item.get('log_data', [])) for item in logs_per_tx)
        num_trace_entries_total = sum(len(item.get('trace_data', {}).get('traces', [])) for item in traces_per_tx)
        
        print(f"Fetched Transactions: {num_txs}")
        print(f"Total Logs Found Across Transactions: {num_logs_total}")
        print(f"Total Trace Entries Found Across Transactions: {num_trace_entries_total}")
        
        # Optionally print detailed data (can be very verbose)
        print("\n--- Details (Sample) ---")
        if additional_data['transactions']:
             print("\nSample Transaction:")
             print(json.dumps(additional_data['transactions'][0], indent=2))
        else:
             print("\nNo transactions found.")

        if logs_per_tx:
             # Find the first transaction that actually had logs
             first_tx_with_logs = next((item for item in logs_per_tx if item.get('log_data')), None)
             if first_tx_with_logs and first_tx_with_logs['log_data']:
                   print(f"\nSample Log (from tx {first_tx_with_logs['tx_hash'][:10]}...):")
                   print(json.dumps(first_tx_with_logs['log_data'][0], indent=2))
             else:
                   print("\nNo logs found for any fetched transaction.")
        else:
             print("\nLog data array is empty (check logs for fetch status).")


        if traces_per_tx:
             # Find the first transaction that actually had trace data
             first_tx_with_traces = next((item for item in traces_per_tx if item.get('trace_data', {}).get('traces')), None)
             if first_tx_with_traces:
                  print(f"\nSample Trace Entry (from tx {first_tx_with_traces['tx_hash'][:10]}...):")
                  print(json.dumps(first_tx_with_traces['trace_data']['traces'][0], indent=2))
             else:
                  print("\nNo traces found for any fetched transaction (or network doesn't support traces).")
        else:
             print("\nTrace data array is empty (check logs for fetch status or network support).")

        # Uncomment below to print the entire structure (can be very large)
        # print("\n--- Full Result Structure ---")
        # print(json.dumps(additional_data, indent=2))

    else:
        print("\n--- HyperSync Processing Failed ---")
        print("Failed to fetch initial transaction data or encountered an unrecoverable error.")
        print("Check previous logs for details (e.g., unsupported chain, API errors).")

else:
    print("\n--- HyperSync Test Skipped ---")
    print("HyperSync API URL or Key is not configured. Please set HYPERSYNC_API_URL and HYPERSYNC_API_KEY in your environment or notebook.")

print(f"\n--- Finished HyperSync Test for Contract: {test_contract_address} ---")

--- Starting HyperSync Test for Contract: 0xA8C70657eBd7C9005A3Caa062B0e01308BE9c422 on Chain: base ---
Fetching archive height for chain 8453 to determine query range...
Attempting to fetch height from: https://base.hypersync.xyz/height
HyperSync archive height for chain 8453 from /height endpoint: 29530340
Querying HyperSync on chain 8453 for transactions to 0xA8C70657eBd7C9005A3Caa062B0e01308BE9c422 in blocks [29529340, 29530340]...
Sending HyperSync Query to https://base.hypersync.xyz/query (Attempt 1)
HyperSync Query successful.


AttributeError: 'list' object has no attribute 'get'

In [None]:
# Run the full process
# Warning: This will modify your Airtable data for unprocessed records.

run_full = False # Set to True to execute

if run_full:
    if AIRTABLE_TOKEN and GITHUB_TOKEN and CONTRACT_API_BASE_URL != 'https://api.example.com':
        print("Starting full contract processing...")
        process_all_contracts()
        print("Full contract processing finished.")
    else:
        print("Cannot run full process. Check:")
        if not AIRTABLE_TOKEN: print("- Missing AIRTABLE_TOKEN")
        if not GITHUB_TOKEN: print("- Missing GITHUB_TOKEN")
        if CONTRACT_API_BASE_URL == 'https://api.example.com': print("- CONTRACT_API_BASE_URL is not set")
else:
    print("Full process run skipped (run_full is False). Set run_full = True in the cell above to execute.")