In [24]:
import requests

rpc = "https://radial-dimensional-rain.quiknode.pro/0837597cecb58fa23a77868c4623b1ae033cb0a4"
etherscan_api_key = ""

def get_latest_block_number():
    payload = {
        "jsonrpc": "2.0",
        "method": "eth_blockNumber",
        "params": [],
        "id": 1
    }
    response = requests.post(rpc, json=payload).json()
    return int(response["result"], 16)  # Hex to decimal

print("Latest block:", get_latest_block_number())

def get_contract_abi(contract_address):
    """Get contract ABI from Etherscan API"""
    url = f"https://api.etherscan.io/api?module=contract&action=getabi&address={contract_address}&apikey={etherscan_api_key}"
    
    try:
        response = requests.get(url)
        response.raise_for_status()
        return response.json()["result"]
    except Exception as e:
        print(f"Error getting ABI: {e}")
        return None

print(get_contract_abi("0xBB9bc244D798123fDe783fCc1C72d3Bb8C189413"))

def get_tx_by_address(address):
    url = f"https://api.etherscan.io/api?module=account&action=txlist&address={address}&startblock=0&endblock=99999999&page=1&offset=10&sort=asc&apikey={etherscan_api_key}"
    return requests.get(url).json()["result"]

print(get_tx_by_address("0xc5102fE9359FD9a28f877a67E36B0F050d81a3CC"))


JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [None]:
def get_block_by_number(block_number):
    payload = {
        "jsonrpc": "2.0",
        "method": "eth_getBlockByNumber",
        "params": [hex(block_number), True],  # Hex string, include transactions
        "id": 1
    }
    response = requests.post(rpc, json=payload).json()
    return response["result"]

block_data = get_block_by_number(get_latest_block_number())
for item in block_data:
    print(item)
    print(block_data[item])
    

In [28]:
import csv

def save_transactions_to_csv(block_number, filename="transactions.csv"):
    block = get_block_by_number(block_number)
    with open(filename, "a", newline="") as f:
        writer = csv.writer(f)
        for tx in block["transactions"]:
            writer.writerow([
                tx["hash"],
                int(tx["gasPrice"], 16),
                int(tx["value"], 16),
                tx["from"]
            ])

save_transactions_to_csv(get_latest_block_number())

In [32]:
import csv

def rpc_request(method, params):
    payload = {
        "jsonrpc": "2.0",
        "method": method,
        "params": params,
        "id": 1
    }
    response = requests.post(rpc, json=payload).json()
    return response["result"]

def save_transactions_to_csv(block_number, filename="transactions.csv"):
    block = get_block_by_number(block_number)
    with open(filename, "a", newline="") as f:
        writer = csv.writer(f)
        for tx in block["transactions"]:
            # Get transaction receipt for status and gas used
            receipt = rpc_request("eth_getTransactionReceipt", [tx["hash"]])
            
            # Transaction Type Detection
            tx_type = "transfer"
            input_data = tx.get("input", "0x")
            contract_interaction = False
            
            if tx["to"] is None:
                tx_type = "contract_creation"
            elif input_data != "0x":
                tx_type = "contract_call"
                contract_interaction = True

            # ERC20/721 Transfer Detection (check logs)
            is_token_transfer = any(
                log["topics"][0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
                for log in receipt.get("logs", [])
            )

            # Write extended data
            writer.writerow([
                # Basic Info
                block_number,
                int(block["timestamp"], 16),  # Unix timestamp
                tx["hash"],
                tx["transactionIndex"],
                
                # Transaction Type
                tx_type,
                "token_transfer" if is_token_transfer else "",
                
                # Address Info
                tx.get("from", ""),
                tx.get("to", ""),
                receipt.get("contractAddress", ""),  # For contract creations
                
                # Value & Gas
                int(tx["value"], 16),
                int(tx["gasPrice"], 16),
                int(receipt.get("gasUsed", "0x0"), 16),
                
                # Contract Interaction Details
                contract_interaction,
                len(input_data),  # Input data size
                input_data[:10],  # Partial input (function selector)
                
                # Status
                "success" if int(receipt.get("status", "0x0"), 16) == 1 else "failed"
            ])

save_transactions_to_csv(get_latest_block_number()-10)

In [13]:
def analyze_failed_txs(block_number):
    block = get_block_by_number(block_number)
    failed = 0
    for tx_hash in block["transactions"]:
        receipt = requests.post(rpc, json={
            "jsonrpc": "2.0",
            "method": "eth_getTransactionReceipt",
            "params": [tx_hash],
            "id": 1
        }).json()["result"]
        
        if receipt and receipt["status"] == "0x0":
            failed += 1
    return failed / len(block["transactions"]) * 100  # Failure rate

def get_contract_storage(contract_address, position):
    return requests.post(rpc, json={
        "jsonrpc": "2.0",
        "method": "eth_getStorageAt",
        "params": [contract_address, hex(position), "latest"],
        "id": 1
    }).json()["result"]

def get_uniswap_swaps(pair_address):
    return requests.post(rpc, json={
        "jsonrpc": "2.0",
        "method": "eth_getLogs",
        "params": [{
            "fromBlock": "0x0",
            "address": pair_address,
            "topics": ["0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"]
        }],
        "id": 1
    }).json()["result"]


def gas_forecast():
    history = requests.post(rpc, json={
        "jsonrpc": "2.0",
        "method": "eth_feeHistory",
        "params": ["0x4", "latest", [25, 50, 75]],
        "id": 1
    }).json()["result"]
    
    return {
        "base_fee_trend": [int(x, 16) for x in history["baseFeePerGas"]],
        "priority_percentiles": {
            "25": [int(x[0], 16) for x in history["reward"]],
            "50": [int(x[1], 16) for x in history["reward"]],
            "75": [int(x[2], 16) for x in history["reward"]]
        }
    }

In [1]:
graph init \
  --protocol ethereum \
  --contract-name MyErc20Contract \
  --from-contract  \
  --network base \
  --index-events \
  myusername/test-subgraph \
  test-subgraph

Usage: python create_subgraph.py <SUBGRAPH_SLUG> <CONTRACT_ADDRESS> [NETWORK]
Example: python create_subgraph.py noel/test 0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913 base


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [37]:
import subprocess

def init_subgraph_streaming(subgraph_name, directory, contract_address, network="mainnet", contract_name="USDC"):

    cmd = [
        "graph", "init",
        "--protocol", "ethereum",
        "--contract-name", contract_name,
        "--from-contract", contract_address,
        "--network", network,
        "--index-events",
        subgraph_name,
        directory
    ]

    # Start the process and redirect both stdout and stderr to the same stream.
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)

    # Read and print output line by line as it is produced.
    while True:
        line = process.stdout.readline()
        if not line and process.poll() is not None:
            break
        if line:
            print(line.strip())

    return_code = process.wait()
    auth = ["graph", "auth", "19411d8c1971a3d6d465ec2fe6b4a6d8"]
    subprocess.run(auth)
    build = ["npm", "run", "deploy"]
    result = subprocess.run(build, cwd=directory,
                            input="0.2.0\n",  # Automatically provide the version label.
                            text=True, capture_output=True)

    # Optionally, print the deploy command's output.
    print(result.stdout)
    if result.stderr:
        print("Deploy stderr:", result.stderr)

    return return_code

# Example usage:
if __name__ == "__main__":
    subgraph_name = "baseallswaps"
    directory = "./theGraph/baseallswaps"
    contract_address = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"  # USDC contract on Ethereum mainnet

    ret_code = init_subgraph_streaming(subgraph_name, directory, contract_address)
    if ret_code == 0:
        print("Subgraph initialized successfully.")
    else:
        print("Subgraph initialization encountered an error, return code:", ret_code)


- Create subgraph scaffold
[90mGenerate subgraph[39m
- Create subgraph scaffold
[90mWrite subgraph to directory[39m
- Create subgraph scaffold
[32m✔[39m Create subgraph scaffold
- Initialize networks config
[32m✔[39m Initialize networks config
- Initialize subgraph repository
[31m✖[39m Failed to initialize subgraph repository: Command failed: git commit -m "Initialize subgraph"

Error: Command failed: git commit -m "Initialize subgraph"

Code: 1
[?25l[32mDeploy key set for https://api.studio.thegraph.com/deploy/[39m
[?25h
> deploy
> graph deploy --node https://api.studio.thegraph.com/deploy/ baseallswaps

[?25l[?25l[36m?[39m [1mWhich version label to use? (e.g. "v0.0.1")[22m [2m›[22m [46m[30m[30m [39m[30m[39m[49m[1D[?25h[?25h
await execute({ dir: import.meta.url });
^




Subgraph initialization encountered an error, return code: 1
