In [1]:
# this script will get 20 protocols including their protocol addresses
# the data this file produces can be used to calculate contextual precision using https://github.com/Olugbenga2000/forta-attack-detector-analysis
# simply run this script, which produces a csv file. Then run the forta-attack-detector-analysis, which will use the csv file produced by this script to output all alerts generated by the attack detector 60 days prior
# assuming these protocols were not attacked, any alert found is likely a FP (needs to be manually confirmed )

In [2]:
import requests
import pandas as pd
from random import random
from dotenv import load_dotenv
from urllib.parse import quote
from time import sleep
from hexbytes import HexBytes
import traceback
from web3 import Web3
import json
import rlp
from json.decoder import JSONDecodeError
from datetime import datetime, timezone
from ratelimiter import RateLimiter
import os
load_dotenv()

eth_rpc_endpoint_address=os.environ.get('MAINNET_INFURA_RPC_ENDPOINT')
eth_w3 = Web3(Web3.HTTPProvider(eth_rpc_endpoint_address))
eth_w3.manager.request_blocking

poly_rpc_endpoint_address=os.environ.get('POLYGON_RPC_ENDPOINT')
poly_w3 = Web3(Web3.HTTPProvider(poly_rpc_endpoint_address))
poly_w3.manager.request_blocking

bsc_rpc_endpoint_address=os.environ.get('BSC_RPC_ENDPOINT')
bsc_w3 = Web3(Web3.HTTPProvider(bsc_rpc_endpoint_address))
bsc_w3.manager.request_blocking

<bound method RequestManager.request_blocking of <web3.manager.RequestManager object at 0x7fb931828880>>

In [3]:

def is_contract(w3, address) -> bool:
    """
    this function determines whether address is a contract
    :return: is_contract: bool
    """
    if address is None:
        return True
    code = w3.eth.get_code(Web3.toChecksumAddress(address))
    return code != HexBytes('0x')

@RateLimiter(max_calls=1, period=1)
def getDeployerTx(addresses: list, etherscan_token:str, etherscan_host:str) -> pd.DataFrame:


    df_etherscan = pd.DataFrame(columns=['contractAddress', 'contractCreator', 'txHash'])
    if len(addresses) == 0:
        return df_etherscan
    
    for address in addresses:
        etherscan_transaction_for_address = f"https://api.{etherscan_host}/api?module=contract&action=getcontractcreation&contractaddresses={address}&apikey={etherscan_token}"
        
        data = requests.get(etherscan_transaction_for_address)
        success2 = False
        count = 0
        while not success2:
            try:
                data = requests.get(etherscan_transaction_for_address)
                if data.status_code == 200:
                    json_data = json.loads(data.content)
                    count += 1
                    if count > 10:
                        break
                    success2 = True
                    if json_data["result"] is not None and len(json_data["result"])>0:
                        try: 
                            df_etherscan = pd.concat([df_etherscan, pd.DataFrame(data=json_data["result"])])
                        except Exception as e:
                            print("RESULT:",json_data["result"])

                else:
                    print(f"Error {data.status_code} {data.content}")
            except JSONDecodeError as e:
                print(f"Error {e} {data.content}")
                sleep(1)
        if count>10:
            continue

    return df_etherscan

@RateLimiter(max_calls=1, period=1)
def getEtherScanTransactionsByAddress(addresses: list, etherscan_token:str, etherscan_host:str, firstBlockNumber: int = 0) -> pd.DataFrame:
    if os.environ.get('ETHERSCAN_TOKEN') is None:
        load_dotenv()

    if len(addresses) == 0:
        return pd.DataFrame()

    df_etherscan = pd.DataFrame(columns=['blockNumber', 'timeStamp', 'hash', 'nonce', 'blockHash',
                                            'transactionIndex', 'from', 'to', 'value', 'gas', 'gasPrice', 'isError',
                                            'txreceipt_status', 'input', 'contractAddress', 'cumulativeGasUsed',
                                            'gasUsed', 'confirmations', 'type', 'traceId', 'errCode'])


    for address in addresses:
        etherscan_transaction_for_address = f"https://api.{etherscan_host}/api?module=account&action=txlist&address={address}&startblock={firstBlockNumber}&endblock=99999999&page=1&offset=10000&sort=asc&apikey={etherscan_token}"
        #etherscan_transaction_for_address = f"https://api-goerli.etherscan.io/api?module=account&action=txlist&address={address}&startblock={firstBlockNumber}&endblock=99999999&page=1&offset=10000&sort=asc&apikey={os.environ.get('ETHERSCAN_TOKEN')}"
        data = requests.get(etherscan_transaction_for_address)
        success2 = False
        count = 0
        while not success2:
            try:
                data = requests.get(etherscan_transaction_for_address)
                if data.status_code == 200:
                    json_data = json.loads(data.content)
                    count += 1
                    if count > 10:
                        break
                    success2 = True
                    if json_data["result"] is not None and len(json_data["result"])>0:
                        try: 
                            df_etherscan = pd.concat([df_etherscan, pd.DataFrame(data=json_data["result"])])
                        except Exception as e:
                            print("RESULT:",json_data["result"])
                else:
                    print(f"Error {data.status_code} {data.content}")
            except JSONDecodeError as e:
                print(f"Error {e} {data.content}")
                sleep(1)
        if count>10:
            continue

        return df_etherscan
    

def calc_contract_address(address, nonce) -> str:
    """
    this function calculates the contract address from sender/nonce
    :return: contract address: str
    """
    address_bytes = bytes.fromhex(address[2:].lower())
    return Web3.toChecksumAddress(Web3.keccak(rlp.encode([address_bytes, nonce]))[-20:])


def get_other_contract_creations(protocol_name:str, deployer_address:str,  etherscan_token:str, etherscan_host:str, firstBlockNumber: int = 0) -> set():
    df = getEtherScanTransactionsByAddress([deployer_address], etherscan_token, etherscan_host, firstBlockNumber)
    df = df[(df['to'] == "")]
    contracts = set()
    for index, row in df.iterrows():
        if row["from"].lower() == deployer_address.lower():
            contract_addr = calc_contract_address(row['from'], int(row['nonce'])).lower()
            print(f"{protocol_name}: Found contract {contract_addr} {row['hash']} {int(row['nonce'])} {row['hash']}")
            contracts.add(contract_addr)
    return contracts


In [4]:
protocols = requests.get('https://api.llama.fi/protocols').json()
len(protocols)



3711

In [5]:
#this will get protocols from the list of protocols ransomly
#it will then query etherscan for any addresses based on the protocol name
#if not addresses are found, it will not select that protocol; it biases towards popular/well known protocols

df_results = pd.DataFrame(columns=['protocol', 'chain', 'addresses'])

user_agent = {'User-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'}
        
for chain in ["Ethereum","Binance","Polygon"]:
    host = ''
    cookie_str = ''
    token = ''
    w3 = None
    if chain == 'Ethereum':
        host = 'etherscan.io'
        w3 = eth_w3
        cookie_str = os.getenv('ETHERSCAN_COOKIE')
        token = os.getenv('ETHERSCAN_TOKEN')
    elif chain == 'Binance':
        host = 'bscscan.com'
        w3 = bsc_w3
        cookie_str = os.getenv('BSCSCAN_COOKIE')
        token = os.getenv('BSCSCAN_TOKEN')
    elif chain == 'Polygon':
        host = 'polygonscan.com'
        w3 = poly_w3
        cookie_str = os.getenv('POLYGONSCAN_COOKIE')
        token = os.getenv('POLYGONSCAN_TOKEN')

    processed = set()    
    selected = set()
    while len(selected) < 20:
        for protocol in protocols:
            if chain in protocol['chains']:
                protocol_name = protocol['name']
                
                #print(f"{protocol_name}: processing")
                if protocol['category'] not in ['CEX']:
                    if random() < 0.01 and len(selected) < 20: # need at least 20 protocols with addresses per chain
                        if protocol_name in processed:
                            continue
                        processed.add(protocol_name)

                        sleep(10)
                        print(f"{protocol_name}: evaluating")
                        try:
                            address_list = list()
                            if protocol['address'] is not None and protocol['address'] != '':
                                if ":" in protocol['address']:
                                    print(f"{protocol_name}: contract from defillama added: {protocol['address'].split(':')[1]}")
                                    address_list.append(protocol['address'].split(":")[1])
                                else:
                                    print(f"{protocol_name}: contract from defillama added: {protocol['address']}")
                                    address_list.append(protocol['address'])

                            url_encoded_protocol = quote(protocol_name)
                            url = f'https://{host}/searchHandler?term={url_encoded_protocol}&filterby=0'
                            cookie = {'Cookie': cookie_str}
                            response = requests.get(url, headers = user_agent, cookies = cookie)
                            if response.status_code != 200:
                                print(f"{protocol_name}: failed fetching addresses from etherscan {response.status_code}")
                                df_results = pd.concat([df_results, pd.DataFrame(data=[[protocol_name, chain, str(response.status_code)]], columns=['protocol', 'chain', 'addresses'])])
                            else:
                                addresses_json = json.loads(response.text)
                                for address in addresses_json:
                                    #print("address",address)
                                    address_list.append(address['address'])
                                    selected.add(protocol['name'])
                                
                                print(f"{protocol_name}: fetched {len(address_list)} addresses from etherscan")

                                if len(address_list) == 0:
                                    df_results = pd.concat([df_results, pd.DataFrame(data=[[protocol_name, chain, 'None']], columns=['protocol', 'chain', 'addresses'])])
                                else:
                                    deployer_contracts = set()
                                    for address in address_list.copy():
                                        if ":" in address:
                                            address_cleansed = address.split(":")[1]
                                            address_list.remove(address)
                                            address_list.append(address_cleansed)
                                        
                                    df_deployer = getDeployerTx(address_list, token, host)
                                    deployer_set = set()
                                    for index, row in df_deployer.iterrows():
                                        deployer_set.add(row['contractCreator'])

                                    for deployer in deployer_set:
                                        contracts = get_other_contract_creations(protocol_name, deployer, token, host)
                                        print(f"{protocol_name}: fetched {len(contracts)} addresses for deployer {deployer} of contract {address}.")
                                        for contract in contracts:
                                            deployer_contracts.add(contract)

                                    address_list.extend(deployer_contracts)

                                    print(f"{protocol_name}: fetched total {len(address_list)} addresses")
                                    df_results = pd.concat([df_results, pd.DataFrame(data=[[protocol_name, chain, ','.join(address_list)]], columns=['protocol', 'chain', 'addresses'])])
                                    
                                    selected.add(protocol['name'])
                        except Exception as e:
                            print(f"{protocol_name}\t{chain}\t{e}: {traceback.format_exc()}")
                        


Rainbow Bridge: evaluating
Rainbow Bridge: fetched 3 addresses from etherscan
Rainbow Bridge: Found contract 0x0ddae29ff6bed2db780d1d1073f79eb125910aa5 0xebe8e19a6fdf80e481b41e9c1de0385d3f72aea2fd2c8c8c35447ad976423fa6 0 0xebe8e19a6fdf80e481b41e9c1de0385d3f72aea2fd2c8c8c35447ad976423fa6
Rainbow Bridge: Found contract 0x23ddd3e3692d1861ed57ede224608875809e127f 0x3bc94b4cd0c9423126310a33e71a4d5922620e97f82806786167f03ac068a31e 1 0x3bc94b4cd0c9423126310a33e71a4d5922620e97f82806786167f03ac068a31e
Rainbow Bridge: fetched 2 addresses for deployer 0xcc7f3dffa25147f60b59504f0f745c57af2e365b of contract 0x3497b57fe49e90a783cc7b1d62dbabf560785744.
Rainbow Bridge: fetched total 5 addresses
Badger DAO: evaluating
Badger DAO: contract from defillama added: 0x3472A5A71965499acd81997a54BBA8D852C6E53d
Badger DAO: fetched 3 addresses from etherscan
Badger DAO: fetched 0 addresses for deployer 0xcdab3acc1ad3870a93bb72377092b67e290d76f3 of contract 0x30a9c1d258f6c2d23005e6450e72bdd42c541105.
Badger DAO: 

In [6]:
# rename columns so they are compatible with https://github.com/Olugbenga2000/forta-attack-detector-analysis
df_results.rename(columns={'chain':'Network'}, inplace=True)
# replace Binance with BSC
df_results['Network'].replace({'Binance':'BSC'}, inplace=True)
df_results['Network'].replace({'Ethereum':'Mainnet  '}, inplace=True)
df_results.rename(columns={'addresses':'ProtocolContracts'}, inplace=True)
df_results.rename(columns={'protocol':'Attack Name'}, inplace=True)
df_results["Attacker"] = "0x0000000000000000000000000000000000000000"

df_results.to_csv('202312_precision.csv', index=False)