In [None]:
# STEP 0: Install required libraries
# pip install solders solana requests

from solders.pubkey import Pubkey
from solana.rpc.api import Client
import requests

def check_solana_token_security(token_address: str):
    """Analyzes a Solana token's smart contract for security risks"""
    
    # STEP 1: Connect to Solana network
    solana_client = Client("https://api.mainnet-beta.solana.com")
    
    # STEP 2: Convert string address to Pubkey object
    try:
        token_pubkey = Pubkey.from_string(token_address)
    except:
        return {"error": "Invalid token address"}
    
    # STEP 3: Get token's mint account info
    mint_account = solana_client.get_account_info(token_pubkey).value
    if not mint_account:
        return {"error": "Mint account not found"}
    
    # STEP 4: Validate account data length
    data = bytes(mint_account.data)
    if len(data) < 82:
        return {"error": "Invalid mint account data"}
    
    # STEP 5: Check Mint Authority (bytes 0-4: option, 4-36: pubkey)
    mint_authority_option = int.from_bytes(data[0:4], "little")
    has_mint = mint_authority_option == 1
    
    # STEP 6: Check Freeze Authority (bytes 46-50: option, 50-82: pubkey)
    freeze_authority_option = int.from_bytes(data[46:50], "little")
    has_freeze = freeze_authority_option == 1
    
    # STEP 7: Check program upgradeability
    program_id = mint_account.owner
    program_account = solana_client.get_account_info(program_id).value
    is_upgradable = bool(program_account and program_account.owner)
    
    # STEP 8: Verify Metaplex metadata
    metadata = get_metaplex_metadata(token_pubkey)
    
    return {
        "mint_authority_active": has_mint,
        "freeze_authority_active": has_freeze,
        "is_upgradable": is_upgradable,
        "metadata_verified": metadata.get("verified", False)
    }

def get_metaplex_metadata(mint_pubkey: Pubkey):
    """Checks Metaplex metadata for verification status"""
    url = f"https://api.mainnet.metaplex.com/v1/tokens/{mint_pubkey}"
    try:
        response = requests.get(url)
        return response.json() if response.status_code == 200 else {}
    except:
        return {}

# Example with BONK token
CA = "58jTRhEN5w9MeevzpwgvTDdTSme1Cnzs89BWAKYaMuKi"
results = check_solana_token_security(CA)
print(results)

{'mint_authority_active': False, 'freeze_authority_active': False, 'is_upgradable': True, 'metadata_verified': False}


In [3]:
from solana.rpc.api import Client
from solders.pubkey import Pubkey
import requests

def check_solana_liquidity(token_address: str):
    """Analyzes liquidity safety for a Solana token"""
    solana_client = Client("https://api.mainnet-beta.solana.com")
    
    try:
        token_pubkey = Pubkey.from_string(token_address)
    except:
        return {"error": "Invalid token address"}

    # Get liquidity pool info
    pool_info = get_pool_info(token_address)
    
    # Check LP token status
    lp_status = check_lp_token_status(pool_info.get('lp_mint', ''))
    
    # Analyze pool concentration
    concentration = analyze_pool_concentration(pool_info.get('id', ''))
    
    return {
        "lp_burned": lp_status['burned'],
        "liquidity_locked": lp_status['locked'],
        "pool_ownership": pool_info.get('program_id', 'Unknown'),
        "top5_holders": concentration['top5_percent'],
        "total_liquidity": pool_info.get('liquidity_usd', 0)
    }

def get_pool_info(token_address: str):
    """Find the most liquid pool for the token"""
    try:
        # First check Raydium
        raydium_response = requests.get(
            "https://api.raydium.io/v2/ammV3/ammPools"
        )
        raydium_pools = raydium_response.json()['data']
        
        # Then check Orca
        orca_response = requests.get(
            "https://api.mainnet.orca.so/v1/whirlpool/list?whitelisted=true"
        )
        orca_pools = orca_response.json()['whirlpools']
        
        # Combine pools from both DEXs
        all_pools = raydium_pools + orca_pools
        
        # Find pools with our token and sort by liquidity
        matched_pools = sorted(
            [p for p in all_pools 
             if token_address in [p['base_mint'], p['quote_mint']]],  # Fixed parenthesis
            key=lambda x: x.get('liquidity_usd', 0),
            reverse=True
        )
        
        return matched_pools[0] if matched_pools else {}
    except Exception as e:
        print(f"Error fetching pools: {str(e)}")
        return {}

def check_lp_token_status(lp_mint: str):
    """Checks LP token status with proper error handling"""
    if not lp_mint:
        return {'burned': False, 'locked': False}
    
    try:
        # Check LP token supply
        solana_client = Client("https://api.mainnet-beta.solana.com")
        lp_pubkey = Pubkey.from_string(lp_mint)
        account_info = solana_client.get_token_supply(lp_pubkey).value
        
        # Check lock status via Solscan with headers
        headers = {
            'User-Agent': 'Mozilla/5.0',
            'Accept': 'application/json'
        }
        lock_response = requests.get(
            f"https://api.solscan.io/v2/token/lock?token={lp_mint}",
            headers=headers
        )
        
        burned = int(account_info.value.amount) == 0
        locked = lock_response.json().get('data', {}).get('lockPositionList', []) != []
        
        return {'burned': burned, 'locked': locked}
    except Exception as e:
        print(f"LP check error: {str(e)}")
        return {'burned': False, 'locked': False}

def analyze_pool_concentration(pool_id: str):
    """Improved DexScreener integration"""
    if not pool_id:
        return {'top5_percent': 100}
    
    try:
        response = requests.get(
            f"https://io.dexscreener.com/dex/log/v2/pool/solana/{pool_id}"
        )
        data = response.json()
        
        return {
            'top5_percent': data.get('concentration', {}).get('top5', 100),
            'total_holders': data.get('holderCount', 1)
        }
    except Exception as e:
        print(f"Concentration error: {str(e)}")
        return {'top5_percent': 100}

# Bonk (Raydium)
print(check_solana_liquidity("DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263"))

# USDC (Orca)
print(check_solana_liquidity("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"))

Error fetching pools: 'whirlpools'
{'lp_burned': False, 'liquidity_locked': False, 'pool_ownership': 'Unknown', 'top5_holders': 100, 'total_liquidity': 0}
Error fetching pools: 'whirlpools'
{'lp_burned': False, 'liquidity_locked': False, 'pool_ownership': 'Unknown', 'top5_holders': 100, 'total_liquidity': 0}


In [4]:
from solana.rpc.api import Client
from solders.pubkey import Pubkey
import requests
import time

def get_pool_info(token_address: str):
    """Find the most liquid pool for the token"""
    try:
        # Get Raydium pools
        raydium_response = requests.get(
            "https://api.raydium.io/v2/ammV3/ammPools",
            headers={'User-Agent': 'Mozilla/5.0'}
        )
        raydium_pools = raydium_response.json().get('data', [])
        
        # Get Orca pools using updated API
        orca_response = requests.get(
            "https://api.orca.so/all-pools",
            headers={'User-Agent': 'Mozilla/5.0'}
        )
        orca_pools = orca_response.json().get('pools', [])
        
        # Normalize pool structures
        all_pools = []
        for p in raydium_pools:
            all_pools.append({
                'base_mint': p['base_mint'],
                'quote_mint': p['quote_mint'],
                'lp_mint': p['lp_mint'],
                'liquidity_usd': p['liquidity'],
                'program_id': 'Raydium',
                'id': p['id']
            })
            
        for p in orca_pools:
            all_pools.append({
                'base_mint': p['tokenA']['mint'],
                'quote_mint': p['tokenB']['mint'],
                'lp_mint': p['lpTokenMint'],
                'liquidity_usd': p['liquidity'],
                'program_id': 'Orca',
                'id': p['address']
            })
        
        # Find matching pools
        matched_pools = [
            p for p in all_pools
            if token_address in [p['base_mint'], p['quote_mint']]
        ]
        
        # Sort by liquidity
        matched_pools.sort(key=lambda x: x.get('liquidity_usd', 0), reverse=True)
        
        return matched_pools[0] if matched_pools else {}
    
    except Exception as e:
        print(f"Pool fetch error: {str(e)}")
        return {}
    
# BONK Token
print(check_solana_liquidity("DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263"))

# USDC
print(check_solana_liquidity("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"))

Pool fetch error: 'base_mint'
{'lp_burned': False, 'liquidity_locked': False, 'pool_ownership': 'Unknown', 'top5_holders': 100, 'total_liquidity': 0}
Pool fetch error: 'base_mint'
{'lp_burned': False, 'liquidity_locked': False, 'pool_ownership': 'Unknown', 'top5_holders': 100, 'total_liquidity': 0}
