### Only supports EVM chains for now.

In [11]:
from web3 import Web3
from dotenv import load_dotenv
import os
import json
import requests
import json
from datetime import datetime
# Load environment variables from .env file
load_dotenv()

blacklisted_wallets = ["0xdead000000000000000042069420694206942069", "0x0000000000000000000000000000000000000000"]

web3 = Web3(Web3.HTTPProvider(os.getenv('ALCHEMY_ETHEREUM_RPC_URL')))

with open('abis/ERC20.json', 'r') as f:
    ERC20_ABI = json.load(f)

In [6]:
def is_contract_address(address: str) -> bool:
    """
    Check if an address is a contract address
    
    Args:
        address: Ethereum address to check
        
    Returns:
        bool: True if contract address, False if EOA (externally owned account)
    """
    # First verify it's a valid address
    if not web3.isAddress(address):
        return False
    
    # Get the code at the address
    code = web3.eth.get_code(web3.toChecksumAddress(address))
    
    # If there's code at the address, it's a contract
    # If no code (b'0x' or empty bytes), it's an EOA
    return code != b'' and code != b'0x'

In [20]:
def get_bitquery_access_token() -> str:
    url = "https://oauth2.bitquery.io/oauth2/token"

    payload = f'grant_type=client_credentials&client_id={os.getenv("BITQUERY_CLIENT_ID")}&client_secret={os.getenv("BITQUERY_CLIENT_SECRET")}&scope=api'

    headers = {'Content-Type': 'application/x-www-form-urlencoded'}

    response = requests.request("POST", url, headers=headers, data=payload)
    resp = json.loads(response.text)

    return resp['access_token']

# Headers with your authorization token
def get_token_holders(token_address: str, limit: int = 10) -> dict:
    """
    Get top token holders for a given token address using BitQuery API
    
    Args:
        token_address: Ethereum token contract address
        limit: Number of top holders to return (default 10)
        
    Returns:
        dict: Response containing token holder data
    """

    access_token = get_bitquery_access_token()
    url = "https://streaming.bitquery.io/graphql"
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f"Bearer {access_token}"
    }

    payload = json.dumps({
        "query": """
        {
      EVM(network: eth, dataset: archive) {
        TokenHolders(
          date: "%s"
          tokenSmartContract: "%s"
          limit: {count: %d}
          orderBy: {descendingByField: "Balance_Amount"}
        ) {
          Balance {
            Amount
          }
          Holder {
            Address
          }
        }
      }
    }
        """ % (datetime.now().strftime("%Y-%m-%d"), token_address, limit)
    })

    try:
        response = requests.post(url, headers=headers, data=payload)
        holders = response.json()['data']['EVM']['TokenHolders']
        return [{'balance': float(h['Balance']['Amount']), 'address': h['Holder']['Address']} for h in holders]
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return None
    

def assert_token_address(token_address: str):
    assert web3.isAddress(token_address), "Invalid token address"
    

def get_recent_transfers(token_address: str, wallet_address: str, days: int = 7):
    """
    Get the transfers of a wallet in the last n days
    
    Args:
        token_address: The token contract address
        wallet_address: The wallet address to get transfers for
    """
    assert_token_address(token_address)
    assert_token_address(wallet_address)

    # Create contract instance
    token_contract = web3.eth.contract(address=token_address, abi=ERC20_ABI)
    
    # Get current block
    current_block = web3.eth.block_number
    
    # Calculate block from n days ago (assuming 12.07 seconds per block)
    blocks_per_day = 24 * 60 * 60 // 12.07
    from_block = current_block - int(blocks_per_day * days)
    
    try:
        # Get transfers FROM the wallet
        outgoing_filter = token_contract.events.Transfer.createFilter(
            fromBlock=from_block,
            toBlock='latest',
            argument_filters={'from': wallet_address}
        )
        outgoing_events = outgoing_filter.get_all_entries()
        
        # Get transfers TO the wallet
        incoming_filter = token_contract.events.Transfer.createFilter(
            fromBlock=from_block,
            toBlock='latest',
            argument_filters={'to': wallet_address}
        )
        incoming_events = incoming_filter.get_all_entries()
        
        # Combine and format transfers
        all_transfers = []
        
        for event in outgoing_events + incoming_events:
            transfer = {
                'blockNumber': event['blockNumber'],
                'transactionHash': event['transactionHash'].hex(),
                'from': event['args']['from'],
                'to': event['args']['to'],
                'value': event['args']['value'],
                'type': 'out' if event['args']['from'].lower() == wallet_address.lower() else 'in'
            }
            all_transfers.append(transfer)
        
        # Sort by block number (most recent first)
        all_transfers.sort(key=lambda x: x['blockNumber'], reverse=True)
        
        return all_transfers
        
    except Exception as e:
        print(f"Error fetching transfers: {str(e)}")
        return []

In [21]:
def monitor_whale_wallets(token_address: str, days: int = 7):
    """
    Monitor whale wallets for a given token, including their balances and recent transfers
    
    Args:
        token_address: The token contract address
        days: Number of days to look back for transfers
        
    Returns:
        dict: Structured data containing whale information and their activities
    """
    assert_token_address(token_address)
    token_contract = web3.eth.contract(address=token_address, abi=ERC20_ABI)
    
    # Get token metadata
    decimals = token_contract.functions.decimals().call()
    total_supply = token_contract.functions.totalSupply().call() / (10 ** decimals)
    token_symbol = token_contract.functions.symbol().call()
    
    # Define whale threshold (>1% of supply)
    MIN_WHALE_HOLDINGS = total_supply * 0.01

    # Get all token holders
    holders = get_token_holders(token_address, 25)
    
    # Filter holders and get their data
    whales_data = {
        "token_info": {
            "address": token_address,
            "symbol": token_symbol,
            "total_supply": total_supply,
            "whale_threshold": MIN_WHALE_HOLDINGS
        },
        "analysis_timeframe": f"Last {days} days",
        "whales": []
    }
    
    # Filter and process whale data
    for holder in holders:
        if (holder['balance'] > MIN_WHALE_HOLDINGS and 
            not is_contract_address(holder['address']) and 
            holder['address'] not in blacklisted_wallets):
            
            # Get recent transfers for this whale
            recent_transfers = get_recent_transfers(token_address, holder['address'], days)
            
            # Calculate transfer statistics
            outgoing_transfers = [t for t in recent_transfers if t['type'] == 'out']
            incoming_transfers = [t for t in recent_transfers if t['type'] == 'in']
            
            total_outgoing = sum(t['value'] for t in outgoing_transfers) / (10 ** decimals)
            total_incoming = sum(t['value'] for t in incoming_transfers) / (10 ** decimals)
            
            whale_data = {
                "address": holder['address'],
                "current_balance": holder['balance'],
                "percentage_of_supply": (holder['balance'] / total_supply) * 100,
                "transfer_activity": {
                    "total_transfers": len(recent_transfers),
                    "outgoing_transfers": len(outgoing_transfers),
                    "incoming_transfers": len(incoming_transfers),
                    "total_outgoing_amount": total_outgoing,
                    "total_incoming_amount": total_incoming,
                    "net_flow": total_incoming - total_outgoing
                },
                "recent_transfers": [
                    {
                        "block_number": transfer['blockNumber'],
                        "transaction_hash": transfer['transactionHash'],
                        "type": transfer['type'],
                        "counterparty": transfer['to'] if transfer['type'] == 'out' else transfer['from'],
                        "amount": transfer['value'] / (10 ** decimals),
                        "direction": "outgoing" if transfer['type'] == 'out' else "incoming"
                    }
                    for transfer in recent_transfers
                ]
            }
            
            whales_data["whales"].append(whale_data)
    
    # Sort whales by balance
    whales_data["whales"].sort(key=lambda x: x["current_balance"], reverse=True)
    
    return whales_data

# Example usage:
whale_analysis = monitor_whale_wallets("0x95aD61b0a150d79219dCF64E1E6Cc01f0B64C4cE", days=2)

In [22]:
whale_analysis

{'token_info': {'address': '0x95aD61b0a150d79219dCF64E1E6Cc01f0B64C4cE',
  'symbol': 'SHIB',
  'total_supply': 999982339482145.2,
  'whale_threshold': 9999823394821.453},
 'analysis_timeframe': 'Last 7 days',
 'whales': [{'address': '0x02e2201576fbbefb52812f2ee7f08eb4774b481e',
   'current_balance': 44800701939919.05,
   'percentage_of_supply': 4.480149315748887,
   'transfer_activity': {'total_transfers': 52,
    'outgoing_transfers': 3,
    'incoming_transfers': 49,
    'total_outgoing_amount': 306000000000.0,
    'total_incoming_amount': 248029272628.02435,
    'net_flow': -57970727371.97565},
   'recent_transfers': [{'block_number': 21844524,
     'transaction_hash': '0x145435e6757e1ade65e01d3c971a8f1d4aacfe5945fd18ac3184c91a489a1d28',
     'type': 'in',
     'counterparty': '0xcC8d320F26D621890542a15BF1AEd3C79F7213B7',
     'amount': 4749986378.731407,
     'direction': 'incoming'},
    {'block_number': 21844524,
     'transaction_hash': '0xc4639334500b32e86709c9ab20346efffb7ff176

In [18]:
transfers

[{'blockNumber': 21844524,
  'transactionHash': '0x145435e6757e1ade65e01d3c971a8f1d4aacfe5945fd18ac3184c91a489a1d28',
  'from': '0xcC8d320F26D621890542a15BF1AEd3C79F7213B7',
  'to': '0x02E2201576FBbeFb52812f2eE7F08eB4774B481e',
  'value': 4749986378731407340545440038,
  'type': 'in'},
 {'blockNumber': 21844524,
  'transactionHash': '0xc4639334500b32e86709c9ab20346efffb7ff176589bcb7d0a175050b254678d',
  'from': '0xbE40941eE469c9092e1739482b4bccd7B209c6C3',
  'to': '0x02E2201576FBbeFb52812f2eE7F08eB4774B481e',
  'value': 1492598595193736857284003015,
  'type': 'in'},
 {'blockNumber': 21844524,
  'transactionHash': '0xa9a5b40ef0bfbd44514b721399a32ce9f6b079ea9ffa29e8e87ba55041321bd2',
  'from': '0x2F88813395C4C0b86f6D74Bbaa813cDF8daE2642',
  'to': '0x02E2201576FBbeFb52812f2eE7F08eB4774B481e',
  'value': 5361903720758171632600583485,
  'type': 'in'},
 {'blockNumber': 21844523,
  'transactionHash': '0x3f51a5a3f6b792537c6d014ba2b5246b94a5a3843b1b0d4873586c2272d3c9d9',
  'from': '0x56f24bb3e12