In [8]:
import pandas as pd
from sqlalchemy import create_engine
from web3 import Web3
import json
from tqdm.auto import tqdm

# ==============================================================================
# SCRIPT DE ENGENHARIA DE FEATURES AVANÇADA - v2.0
# ==============================================================================

# --- PASSO 1: Carregar os dados que seu bot coletou ---
print("--- Passo 1: Carregando dados da Mainnet do seu servidor ---")
SERVER_IP = "34.79.99.180" 
db_connection_str = f'postgresql://admin:supersecret@{SERVER_IP}:5433/mempool_data'
df = pd.DataFrame()
try:
    db_engine = create_engine(db_connection_str)
    df = pd.read_sql('SELECT hash, "inputData", to_address FROM transactions', con=db_engine)
    print(f"Sucesso! Carregamos {len(df)} transações para análise avançada.")
except Exception as e:
    print(f"--- ERRO ao carregar dados: {e} ---")

# --- PASSO 2: Configurar os Decodificadores Especializados ---
if not df.empty:
    print("\n--- Passo 2: Preparando decodificadores para a 'caixa-preta' ---")
    
    # ABI completo do Roteador Universal, focado na função 'execute'
    # Nota: Este ABI é complexo. Encontramos as definições das funções de swap internas.
    universal_router_abi_str = '[{"inputs":[{"type":"bytes","name":"commands","internalType":"bytes"},{"type":"bytes[]","name":"inputs","internalType":"bytes[]"}],"name":"execute","outputs":[],"stateMutability":"payable","type":"function"}]'
    # Os comandos internos têm seus próprios ABIs!
    v3_swap_exact_in_abi = json.loads('[{"inputs":[{"type":"address","name":"recipient","internalType":"address"},{"type":"uint256","name":"amountIn","internalType":"uint256"},{"type":"uint256","name":"amountOutMin","internalType":"uint256"},{"type":"bytes","name":"path","internalType":"bytes"},{"type":"bool","name":"payerIsUserAgent","internalType":"bool"}],"name":"V3_SWAP_EXACT_IN","type":"function"}]')
    v2_swap_exact_in_abi = json.loads('[{"inputs":[{"type":"address","name":"recipient","internalType":"address"},{"type":"uint256","name":"amountIn","internalType":"uint256"},{"type":"uint256","name":"amountOutMin","internalType":"uint256"},{"type":"address[]","name":"path","internalType":"address[]"},{"type":"bool","name":"payerIsUserAgent","internalType":"bool"}],"name":"V2_SWAP_EXACT_IN","type":"function"}]')
    
    # Criamos os objetos de contrato para decodificação
    universal_router_contract = Web3().eth.contract(abi=json.loads(universal_router_abi_str))
    
    # Mapeamento dos códigos de comando para seus ABIs e nomes
    COMMAND_MAP = {
        0x00: {'name': 'V3_SWAP_EXACT_IN', 'abi': v3_swap_exact_in_abi[0]},
        0x08: {'name': 'V2_SWAP_EXACT_IN', 'abi': v2_swap_exact_in_abi[0]},
        # Adicionaríamos outros comandos aqui (0x01, 0x09, etc.)
    }

    def unpack_execute_call(input_data):
        try:
            input_bytes = bytes.fromhex(input_data[2:])
            func_obj, func_params = universal_router_contract.decode_function_input(input_bytes)
            
            if func_obj.fn_name == 'execute':
                commands = func_params['commands']
                inputs = func_params['inputs']
                decoded_swaps = []

                for i, command_code in enumerate(commands):
                    if command_code in COMMAND_MAP:
                        command_info = COMMAND_MAP[command_code]
                        # Decodifica os parâmetros do comando interno
                        internal_params = Web3().eth.codec.decode(
                            [param['type'] for param in command_info['abi']['inputs']], 
                            inputs[i]
                        )
                        swap_details = {
                            'function': command_info['name'],
                            'amountIn': internal_params[1], # amountIn é geralmente o segundo parâmetro
                            'path': internal_params[3] # path é geralmente o quarto parâmetro
                        }
                        decoded_swaps.append(swap_details)
                return decoded_swaps if decoded_swaps else None
        except Exception:
            return None
        return None

    print("Decodificador avançado pronto. Analisando transações 'execute'...")
    
    # Aplicamos o decodificador avançado para criar nossa feature final
    df['internal_swaps'] = df['inputData'].apply(unpack_execute_call)
    
    # Filtramos para ter apenas as transações que continham swaps que conseguimos decodificar
    final_df = df.dropna(subset=['internal_swaps']).copy()

    print(f"\nAnálise completa! Encontramos e decodificamos {len(final_df)} transações 'execute' contendo swaps.")
    display(final_df[['hash', 'internal_swaps']].head())

else:
    print("\nNenhum dado para analisar.")

--- Passo 1: Carregando dados da Mainnet do seu servidor ---
Sucesso! Carregamos 4893 transações para análise avançada.

--- Passo 2: Preparando decodificadores para a 'caixa-preta' ---
Decodificador avançado pronto. Analisando transações 'execute'...

Análise completa! Encontramos e decodificamos 0 transações 'execute' contendo swaps.


Unnamed: 0,hash,internal_swaps


In [9]:
# ==============================================================================
# SCRIPT FINAL DE ANÁLISE - v5.0 (O Decodificador Mestre)
# ==============================================================================

import pandas as pd
from sqlalchemy import create_engine
from web3 import Web3
import json
from tqdm.auto import tqdm
import warnings

warnings.filterwarnings('ignore')

# --- PASSO 1: Carregar os Dados REAIS e COMPLETOS ---
print("--- Passo 1: Carregando dados da Mainnet do seu servidor ---")
SERVER_IP = "34.79.99.180" 
db_connection_str = f'postgresql://admin:supersecret@{SERVER_IP}:5433/mempool_data'
df = pd.DataFrame()
try:
    db_engine = create_engine(db_connection_str)
    # Filtramos apenas as transações enviadas para o Roteador Universal da Uniswap
    UNIVERSAL_ROUTER_ADDRESS = "0x3fC91A3afd70395E496CB848274244cA6353f3F3"
    query = f"SELECT hash, \"inputData\" FROM transactions WHERE lower(to_address) = '{UNIVERSAL_ROUTER_ADDRESS.lower()}'"
    df = pd.read_sql(query, con=db_engine)
    print(f"Sucesso! Carregamos {len(df)} transações do Roteador Universal para análise avançada.")
    df.dropna(subset=['inputData'], inplace=True)
except Exception as e:
    print(f"--- ERRO ao carregar dados: {e} ---")

# --- PASSO 2: A Engenharia Reversa ---
if not df.empty:
    print("\n--- Passo 2: Abrindo a 'Caixa-Preta' com o ABI completo ---")
    
    # O ABI completo e real do Roteador Universal, copiado do Etherscan
    # Este é o nosso "manual de instruções" mestre.
    universal_router_abi_str = '[{"inputs":[{"type":"bytes","name":"commands","internalType":"bytes"},{"type":"bytes[]","name":"inputs","internalType":"bytes[]"},{"type":"uint256","name":"deadline","internalType":"uint256"}],"name":"execute","outputs":[],"stateMutability":"payable","type":"function"},{"inputs":[{"type":"bytes","name":"commands","internalType":"bytes"},{"type":"bytes[]","name":"inputs","internalType":"bytes[]"}],"name":"execute","outputs":[],"stateMutability":"payable","type":"function"},{"inputs":[],"name":"WETH9","outputs":[{"type":"address","name":"","internalType":"address"}],"stateMutability":"view","type":"function"},{"stateMutability":"payable","type":"receive"}]'
    universal_router_abi = json.loads(universal_router_abi_str)
    
    # Criamos o objeto de contrato que entende a função 'execute'
    w3 = Web3()
    universal_router_contract = w3.eth.contract(abi=universal_router_abi)

    # Função para extrair TODOS os swaps de dentro de uma chamada 'execute'
    def extract_swaps_from_execute(input_data):
        swaps_found = []
        try:
            input_bytes = bytes.fromhex(input_data[2:])
            func_obj, func_params = universal_router_contract.decode_function_input(input_bytes)

            if func_obj.fn_name == 'execute':
                commands = func_params['commands']
                inputs = func_params['inputs']
                
                # Para cada comando na lista de comandos...
                for i, command_code in enumerate(commands):
                    # O código '0x00' é para V3_SWAP_EXACT_IN, e '0x08' é para V2_SWAP_EXACT_IN
                    if command_code in [0x00, 0x08]:
                        # Se for um swap, adicionamos à nossa lista de resultados
                        swaps_found.append(command_code)
            return swaps_found if swaps_found else None
        except Exception:
            return None

    # Aplicamos nosso decodificador avançado para criar uma nova coluna
    df['internal_swap_commands'] = df['inputData'].apply(extract_swaps_from_execute)
    
    # Filtramos para ter apenas as transações que continham swaps
    final_df = df.dropna(subset=['internal_swap_commands']).copy()

    print(f"\nAnálise completa! Encontramos e decodificamos {len(final_df)} transações 'execute' contendo um ou mais swaps internos.")
    
    # Agora, vamos contar os tipos de swaps encontrados
    if not final_df.empty:
        # 'Explodimos' a lista de comandos para contar cada um individualmente
        all_swap_commands = final_df['internal_swap_commands'].explode()
        command_counts = all_swap_commands.value_counts()
        
        # Mapeamos os códigos para nomes legíveis
        command_counts.index = command_counts.index.map({0x00: 'Uniswap V3 Swap', 0x08: 'Uniswap V2 Swap'})
        
        print("\n--- ANÁLISE FINAL DAS FUNÇÕES INTERNAS ---")
        print("Distribuição real dos tipos de swap encontrados dentro do 'execute':")
        display(command_counts)
else:
    print("\nNenhum dado do Roteador Universal para analisar. Deixe o coletor rodar mais tempo.")

--- Passo 1: Carregando dados da Mainnet do seu servidor ---
Sucesso! Carregamos 0 transações do Roteador Universal para análise avançada.

Nenhum dado do Roteador Universal para analisar. Deixe o coletor rodar mais tempo.


In [11]:
# ==============================================================================
# SCRIPT FINAL DE ANÁLISE - v7.0 (O Decodificador Definitivo)
# ==============================================================================

import pandas as pd
from sqlalchemy import create_engine
from web3 import Web3
import json
import warnings

warnings.filterwarnings('ignore')

# --- PASSO 1: Carregar os Dados REAIS e COMPLETOS ---
print("--- Passo 1: Carregando dados da Mainnet do seu servidor na nuvem ---")
# ATENÇÃO: Substitua pelo IP público do seu servidor do Google Cloud
SERVER_IP = "34.79.99.180" 
db_connection_str = f'postgresql://admin:supersecret@{SERVER_IP}:5433/mempool_data'
df = pd.DataFrame()
try:
    db_engine = create_engine(db_connection_str)
    df = pd.read_sql('SELECT hash, "inputData", to_address FROM transactions', con=db_engine)
    print(f"Sucesso! Carregamos {len(df)} transações REAIS para análise final.")
    df.dropna(subset=['inputData'], inplace=True)
    df = df[df['inputData'].str.len() > 10]
except Exception as e:
    print(f"--- ERRO ao carregar dados: {e} ---")

# --- PASSO 2: Decodificação com os ABIs CORRETOS e COMPLETOS ---
if not df.empty:
    print("\n--- Passo 2: Decodificando transações com os manuais de instrução corretos ---")
    
    # O "Molho de Chaves" com os ABIs corretos e completos
    # Cada string aqui é um manual para um tipo de contrato
    
    # Manual para Roteadores V2 (Uniswap V2, Sushiswap)
    v2_router_abi = json.loads('[{"inputs":[{"internalType":"uint256","name":"amountIn","type":"uint256"},{"internalType":"uint256","name":"amountOutMin","type":"uint256"},{"internalType":"address[]","name":"path","type":"address[]"},{"internalType":"address","name":"to","type":"address"},{"internalType":"uint256","name":"deadline","type":"uint256"}],"name":"swapExactTokensForTokens","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"amountOutMin","type":"uint256"},{"internalType":"address[]","name":"path","type":"address[]"},{"internalType":"address","name":"to","type":"address"},{"internalType":"uint256","name":"deadline","type":"uint256"}],"name":"swapExactETHForTokens","outputs":[],"stateMutability":"payable","type":"function"}]')
    
    # Manual para Roteadores Modernos (Uniswap V3, 1inch)
    universal_router_abi = json.loads('[{"inputs":[{"type":"bytes","name":"commands","internalType":"bytes"},{"type":"bytes[]","name":"inputs","internalType":"bytes[]"},{"type":"uint256","name":"deadline","type":"uint256"}],"name":"execute","outputs":[],"stateMutability":"payable","type":"function"}]')

    # Criamos os decodificadores
    contract_v2 = Web3().eth.contract(abi=v2_router_abi)
    contract_universal = Web3().eth.contract(abi=universal_router_abi)
    
    # Endereços dos nossos alvos para saber qual decodificador usar
    ROUTER_V2_ADDRESSES = [
        "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D".lower(), # Uniswap V2
        "0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F".lower()  # Sushiswap
    ]
    MODERN_ROUTER_ADDRESSES = [
        "0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45".lower(), # Uniswap V3
        "0x111111125421cA6dc452d289314280a0f8842A65".lower(), # 1inch
        "0x3fC91A3afd70395E496CB848274244cA6353f3F3".lower() # Uniswap Universal Router
    ]

    def decode_smart(row):
        target_address = row['to_address'].lower()
        input_data = row['inputData']
        input_bytes = bytes.fromhex(input_data[2:])
        
        try:
            if target_address in ROUTER_V2_ADDRESSES:
                func_obj, _ = contract_v2.decode_function_input(input_bytes)
                return func_obj.fn_name
            elif target_address in MODERN_ROUTER_ADDRESSES:
                func_obj, _ = contract_universal.decode_function_input(input_bytes)
                return func_obj.fn_name
            else:
                return 'alvo_nao_mapeado'
        except Exception:
            return 'funcao_desconhecida'

    df['function_name'] = df.apply(decode_smart, axis=1)
    
    print("\n--- ANÁLISE FINAL E COMPLETA ---")
    print("Distribuição real das funções em TODOS os alvos que você coletou:")
    
    display(df['function_name'].value_counts())
else:
    print("\nNenhum dado para analisar. Deixe seu coletor rodar por mais tempo.")

--- Passo 1: Carregando dados da Mainnet do seu servidor na nuvem ---
Sucesso! Carregamos 5169 transações REAIS para análise final.

--- Passo 2: Decodificando transações com os manuais de instrução corretos ---

--- ANÁLISE FINAL E COMPLETA ---
Distribuição real das funções em TODOS os alvos que você coletou:


function_name
funcao_desconhecida         4118
swapExactETHForTokens        704
swapExactTokensForTokens     347
Name: count, dtype: int64

In [12]:
# ==============================================================================
# SCRIPT FINAL DE ANÁLISE - v8.0 (O Decodificador Mestre)
# ==============================================================================

import pandas as pd
from sqlalchemy import create_engine
from web3 import Web3
import json
from tqdm.auto import tqdm
import warnings

warnings.filterwarnings('ignore')

# --- PASSO 1: Carregar os Dados REAIS e COMPLETOS ---
print("--- Passo 1: Carregando dados da Mainnet do seu servidor na nuvem ---")
# ATENÇÃO: Substitua pelo IP público do seu servidor
SERVER_IP = "34.79.99.180" 
db_connection_str = f'postgresql://admin:supersecret@{SERVER_IP}:5433/mempool_data'
df = pd.DataFrame()
try:
    db_engine = create_engine(db_connection_str)
    df = pd.read_sql('SELECT hash, "inputData", to_address FROM transactions', con=db_engine)
    print(f"Sucesso! Carregamos {len(df)} transações REAIS para a engenharia reversa.")
    df.dropna(subset=['inputData'], inplace=True)
    df = df[df['inputData'].str.len() > 10]
except Exception as e:
    print(f"--- ERRO ao carregar dados: {e} ---")

# --- PASSO 2: A Engenharia Reversa da Caixa-Preta ---
if not df.empty:
    print("\n--- Passo 2: Decodificando transações 'execute' com a chave-mestra ---")
    
    # O "Manual Mestre" para a função 'execute'
    execute_abi = json.loads('[{"inputs":[{"type":"bytes","name":"commands","internalType":"bytes"},{"type":"bytes[]","name":"inputs","internalType":"bytes[]"}],"name":"execute","outputs":[],"stateMutability":"payable","type":"function"}]')
    
    # Os "Manuais Internos" para os comandos de swap que nos interessam
    # Extraídos da documentação e código-fonte do Roteador Universal
    v3_swap_abi = [{'name': 'V3_SWAP_EXACT_IN', 'type': 'function', 'inputs': [{'name': 'recipient', 'type': 'address'}, {'name': 'amountIn', 'type': 'uint256'}, {'name': 'amountOutMin', 'type': 'uint256'}, {'name': 'path', 'type': 'bytes'}, {'name': 'payerIsUserAgent', 'type': 'bool'}]}]
    v2_swap_abi = [{'name': 'V2_SWAP_EXACT_IN', 'type': 'function', 'inputs': [{'name': 'recipient', 'type': 'address'}, {'name': 'amountIn', 'type': 'uint256'}, {'name': 'amountOutMin', 'type': 'uint256'}, {'name': 'path', 'type': 'address[]'}, {'name': 'payerIsUserAgent', 'type': 'bool'}]}]
    
    # Nossos "Decodificadores"
    w3 = Web3()
    execute_contract = w3.eth.contract(abi=execute_abi)
    # Mapeamos o código do comando para o decodificador correto
    COMMAND_DECODERS = {
        0x00: w3.eth.contract(abi=v3_swap_abi), # V3_SWAP_EXACT_IN
        0x08: w3.eth.contract(abi=v2_swap_abi), # V2_SWAP_EXACT_IN
    }

    # A função que abre a caixa-preta
    def open_black_box(input_data):
        try:
            input_bytes = bytes.fromhex(input_data[2:])
            func_obj, func_params = execute_contract.decode_function_input(input_bytes)

            if func_obj.fn_name == 'execute':
                commands = func_params['commands']
                inputs = func_params['inputs']
                swaps_found = []

                for i, command_code in enumerate(commands):
                    if command_code in COMMAND_DECODERS:
                        decoder = COMMAND_DECODERS[command_code]
                        # Decodifica os parâmetros do comando interno
                        internal_func, internal_params = decoder.decode_function_input(inputs[i])
                        swaps_found.append(internal_func.fn_name) # Guardamos o nome do swap encontrado
                return swaps_found if swaps_found else ['execute_sem_swap_conhecido']
        except Exception:
            return ['falha_na_decodificacao']
        return ['nao_e_execute']

    tqdm.pandas(desc="Analisando 'caixas-pretas'")
    df['internal_functions'] = df['inputData'].progress_apply(open_black_box)
    
    # "Explodimos" a lista para poder contar cada swap individualmente
    analysis_df = df.explode('internal_functions')

    print("\n--- ANÁLISE FINAL E COMPLETA ---")
    print("Distribuição real das funções, incluindo as que estavam dentro do 'execute':")
    
    display(analysis_df['internal_functions'].value_counts())
else:
    print("\nNenhum dado para analisar.")

--- Passo 1: Carregando dados da Mainnet do seu servidor na nuvem ---
Sucesso! Carregamos 5579 transações REAIS para a engenharia reversa.

--- Passo 2: Decodificando transações 'execute' com a chave-mestra ---


Analisando 'caixas-pretas':   0%|          | 0/5579 [00:00<?, ?it/s]


--- ANÁLISE FINAL E COMPLETA ---
Distribuição real das funções, incluindo as que estavam dentro do 'execute':


internal_functions
falha_na_decodificacao    5579
Name: count, dtype: int64

In [13]:
# ==============================================================================
# SCRIPT FINAL DE ANÁLISE E ENGENHARIA DE FEATURES - v9.0 (O Decodificador Definitivo)
# ==============================================================================

import pandas as pd
from sqlalchemy import create_engine
from web3 import Web3
import json
from tqdm.auto import tqdm
import warnings

warnings.filterwarnings('ignore')

# --- PASSO 1: Carregar os Dados REAIS e COMPLETOS ---
print("--- Passo 1: Carregando dados da Mainnet do seu servidor na nuvem ---")
# ATENÇÃO: Substitua pelo IP público do seu servidor
SERVER_IP = "34.79.99.180" 
db_connection_str = f'postgresql://admin:supersecret@{SERVER_IP}:5433/mempool_data'
df = pd.DataFrame()
try:
    db_engine = create_engine(db_connection_str)
    # Vamos focar nossa análise nas transações do Roteador Universal, onde a ação acontece
    UNIVERSAL_ROUTER_ADDRESS = "0x3fC91A3afd70395E496CB848274244cA6353f3F3"
    query = f"SELECT hash, \"inputData\" FROM transactions WHERE lower(to_address) = '{UNIVERSAL_ROUTER_ADDRESS.lower()}'"
    df = pd.read_sql(query, con=db_engine)
    print(f"Sucesso! Carregamos {len(df)} transações do Roteador Universal para a engenharia reversa.")
    df.dropna(subset=['inputData'], inplace=True)
except Exception as e:
    print(f"--- ERRO ao carregar dados: {e} ---")

# --- PASSO 2: A Engenharia Reversa Final ---
if not df.empty:
    print("\n--- Passo 2: Abrindo a 'Caixa-Preta' com o ABI Definitivo ---")
    
    # CORREÇÃO FINAL: Este é o ABI completo e correto para a função 'execute'
    execute_abi_str = '[{"inputs":[{"type":"bytes","name":"commands","internalType":"bytes"},{"type":"bytes[]","name":"inputs","internalType":"bytes[]"},{"type":"uint256","name":"deadline","internalType":"uint256"}],"name":"execute","outputs":[],"stateMutability":"payable","type":"function"}]'
    execute_abi = json.loads(execute_abi_str)
    
    # Criamos o decodificador com o manual correto
    w3 = Web3()
    execute_contract = w3.eth.contract(abi=execute_abi)

    def decode_the_black_box(input_data):
        try:
            input_bytes = bytes.fromhex(input_data[2:])
            func_obj, func_params = execute_contract.decode_function_input(input_bytes)
            # Se a decodificação funcionar, sabemos que é uma chamada 'execute'
            return "execute"
        except Exception:
            # Se falhar, é outra função que não está no nosso manual
            return "funcao_desconhecida"

    tqdm.pandas(desc="Decodificando as transações")
    df['function_name'] = df['inputData'].progress_apply(decode_the_black_box)
    
    print("\n--- ANÁLISE FINAL E CONCLUSIVA ---")
    print("Distribuição real das funções encontradas no Roteador Universal:")
    
    display(df['function_name'].value_counts())
else:
    print("\nNenhum dado do Roteador Universal para analisar. Deixe o coletor rodar por mais tempo.")

--- Passo 1: Carregando dados da Mainnet do seu servidor na nuvem ---
Sucesso! Carregamos 0 transações do Roteador Universal para a engenharia reversa.

Nenhum dado do Roteador Universal para analisar. Deixe o coletor rodar por mais tempo.


In [15]:
# ==============================================================================
# SCRIPT FINAL DE ANÁLISE FORENSE - v10.0 (O Revelador de Assinaturas)
# ==============================================================================

import pandas as pd
from sqlalchemy import create_engine
import warnings

warnings.filterwarnings('ignore')

# --- PASSO 1: Carregar os Dados REAIS e COMPLETOS ---
print("--- Passo 1: Carregando dados da Mainnet do seu servidor ---")
# ATENÇÃO: Substitua pelo IP público do seu servidor
SERVER_IP = "34.79.99.180" 
db_connection_str = f'postgresql://admin:supersecret@{SERVER_IP}:5433/mempool_data'
df = pd.DataFrame()
try:
    db_engine = create_engine(db_connection_str)
    df = pd.read_sql('SELECT "inputData" FROM transactions', con=db_engine)
    print(f"Sucesso! Carregamos {len(df)} transações REAIS para análise forense.")
    df.dropna(subset=['inputData'], inplace=True)
    df = df[df['inputData'].str.len() >= 10] # Garante que temos dados para extrair a assinatura
except Exception as e:
    print(f"--- ERRO ao carregar dados: {e} ---")

# --- PASSO 2: Extração da Impressão Digital (Function Signature) ---
if not df.empty:
    print("\n--- Passo 2: Extraindo a 'impressão digital' de cada transação ---")
    
    # Criamos uma nova coluna que contém apenas os 10 primeiros caracteres do inputData
    df['function_signature'] = df['inputData'].str.slice(0, 10)
    
    print("\n--- ANÁLISE DE ASSINATURAS DE FUNÇÃO ---")
    print("Esta é a distribuição real das funções que seu bot está capturando:")
    
    # Contamos a frequência de cada impressão digital encontrada
    signature_counts = df['function_signature'].value_counts()
    
    display(signature_counts)
else:
    print("\nNenhum dado para analisar. Deixe seu coletor rodar por mais tempo.")

--- Passo 1: Carregando dados da Mainnet do seu servidor ---
Sucesso! Carregamos 101 transações REAIS para análise forense.

--- Passo 2: Extraindo a 'impressão digital' de cada transação ---

--- ANÁLISE DE ASSINATURAS DE FUNÇÃO ---
Esta é a distribuição real das funções que seu bot está capturando:


function_signature
0x07ed2379    58
0x04e45aaf    10
0x5ae401dc    10
0x83800a8e     6
0xa76dfc3b     3
0xb858183f     2
0x89e7c650     2
0x175accdc     2
0x5023b4df     2
0xcc713a04     1
0xea76dddf     1
0xb68fb020     1
0x8770ba91     1
0x09b81346     1
0x89af926a     1
Name: count, dtype: int64

In [16]:
# ==============================================================================
# SCRIPT FINAL DE ENGENHARIA REVERSA - v11.0 (O Decodificador Mestre)
# ==============================================================================

import pandas as pd
from sqlalchemy import create_engine
from web3 import Web3
import json
from tqdm.auto import tqdm
import warnings

warnings.filterwarnings('ignore')

# --- PASSO 1: Carregar os Dados ---
print("--- Passo 1: Carregando dados da Mainnet do seu servidor ---")
SERVER_IP = "34.79.99.180" 
db_connection_str = f'postgresql://admin:supersecret@{SERVER_IP}:5433/mempool_data'
df = pd.DataFrame()
try:
    db_engine = create_engine(db_connection_str)
    # Filtramos apenas as transações do Roteador Universal
    UNIVERSAL_ROUTER_ADDRESS = "0x3fC91A3afd70395E496CB848274244cA6353f3F3"
    query = f"SELECT hash, \"inputData\" FROM transactions WHERE lower(to_address) = '{UNIVERSAL_ROUTER_ADDRESS.lower()}'"
    df = pd.read_sql(query, con=db_engine)
    print(f"Sucesso! Carregamos {len(df)} transações do Roteador Universal para a engenharia reversa.")
    df.dropna(subset=['inputData'], inplace=True)
except Exception as e:
    print(f"--- ERRO ao carregar dados: {e} ---")

# --- PASSO 2: A Engenharia Reversa ---
if not df.empty:
    print("\n--- Passo 2: Abrindo a 'Caixa-Preta' com o decodificador mestre ---")
    
    # Manuais de Instrução (ABIs)
    execute_abi = json.loads('[{"inputs":[{"type":"bytes","name":"commands","internalType":"bytes"},{"type":"bytes[]","name":"inputs","internalType":"bytes[]"},{"type":"uint256","name":"deadline","internalType":"uint256"}],"name":"execute","outputs":[],"stateMutability":"payable","type":"function"}]')
    v3_swap_abi = json.loads('[{"name":"V3_SWAP_EXACT_IN", "type":"function", "inputs":[{"name":"recipient","type":"address"},{"name":"amountIn","type":"uint256"},{"name":"amountOutMin","type":"uint256"},{"name":"path","type":"bytes"},{"name":"payerIsUserAgent","type":"bool"}]}]')
    v2_swap_abi = json.loads('[{"name":"V2_SWAP_EXACT_IN", "type":"function", "inputs":[{"name":"recipient","type":"address"},{"name":"amountIn","type":"uint256"},{"name":"amountOutMin","type":"uint256"},{"name":"path","type":"address[]"},{"name":"payerIsUserAgent","type":"bool"}]}]')

    w3 = Web3()
    execute_contract = w3.eth.contract(abi=execute_abi)
    COMMAND_DECODERS = {
        0x00: w3.eth.contract(abi=v3_swap_abi), # V3_SWAP_EXACT_IN
        0x08: w3.eth.contract(abi=v2_swap_abi)  # V2_SWAP_EXACT_IN
    }

    def open_black_box(input_data):
        try:
            input_bytes = bytes.fromhex(input_data[2:])
            _, func_params = execute_contract.decode_function_input(input_bytes)
            
            commands = func_params['commands']
            inputs = func_params['inputs']
            swaps_found = []

            for i, command_code in enumerate(commands):
                if command_code in COMMAND_DECODERS:
                    decoder = COMMAND_DECODERS[command_code]
                    internal_func, internal_params = decoder.decode_function_input(inputs[i])
                    
                    path = internal_params.get('path', [])
                    # O 'path' da V3 é um bytestring, da V2 é uma lista. Nós apenas confirmamos que existe.
                    is_path_valid = (isinstance(path, bytes) and len(path) > 0) or (isinstance(path, list) and len(path) > 0)

                    if is_path_valid:
                        swaps_found.append(internal_func.fn_name)
                        
            return swaps_found if swaps_found else None # Retorna a lista de swaps ou nada
        except Exception:
            return None

    tqdm.pandas(desc="Analisando as 'caixas-pretas'")
    df['decoded_swaps'] = df['inputData'].progress_apply(open_black_box)
    
    final_df = df.dropna(subset=['decoded_swaps']).copy()

    print(f"\nAnálise completa! Encontramos e decodificamos {len(final_df)} transações 'execute' contendo um ou mais swaps internos.")
    
    if not final_df.empty:
        all_swaps = final_df['decoded_swaps'].explode()
        swap_counts = all_swaps.value_counts()
        
        print("\n--- ANÁLISE FINAL DAS FUNÇÕES INTERNAS ---")
        print("Distribuição real dos tipos de swap encontrados dentro do 'execute':")
        display(swap_counts)
else:
    print("\nNenhum dado para analisar.")

--- Passo 1: Carregando dados da Mainnet do seu servidor ---
Sucesso! Carregamos 0 transações do Roteador Universal para a engenharia reversa.

Nenhum dado para analisar.


In [17]:
# ==============================================================================
# SCRIPT FINAL DE ANÁLISE - v8.0 (Análise do Dataset Completo)
# ==============================================================================

import pandas as pd
from sqlalchemy import create_engine
from web3 import Web3
import json
from tqdm.auto import tqdm
import warnings

warnings.filterwarnings('ignore')

# --- PASSO 1: Carregar TODOS os Dados Coletados ---
print("--- Passo 1: Carregando TODO o dataset da Mainnet do seu servidor ---")
# ATENÇÃO: Substitua pelo IP público do seu servidor
SERVER_IP = "34.79.99.180" 
db_connection_str = f'postgresql://admin:supersecret@{SERVER_IP}:5433/mempool_data'
df = pd.DataFrame()
try:
    db_engine = create_engine(db_connection_str)
    # A MUDANÇA CRUCIAL: Selecionamos TUDO da tabela, sem filtro WHERE
    df = pd.read_sql('SELECT hash, "inputData", to_address FROM transactions', con=db_engine)
    print(f"Sucesso! Carregamos um total de {len(df)} transações de TODOS os alvos para análise final.")
    df.dropna(subset=['inputData'], inplace=True)
    df = df[df['inputData'].str.len() > 10]
except Exception as e:
    print(f"--- ERRO ao carregar dados: {e} ---")

# --- PASSO 2: Decodificação Inteligente Baseada no Alvo ---
if not df.empty:
    print("\n--- Passo 2: Decodificando transações com múltiplos ABIs ---")
    
    # Nossos "Manuais de Instrução"
    v2_router_abi = json.loads('[{"name":"swapExactTokensForTokens","type":"function"},{"name":"swapExactETHForTokens","type":"function"}]')
    universal_router_abi = json.loads('[{"name":"execute","type":"function"}]')

    # Nossos "Decodificadores"
    contract_v2 = Web3().eth.contract(abi=v2_router_abi)
    contract_universal = Web3().eth.contract(abi=universal_router_abi)
    
    # Endereços dos nossos alvos para saber qual decodificador usar
    ROUTER_V2_ADDRESSES = ["0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D".lower(), "0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F".lower()]
    MODERN_ROUTER_ADDRESSES = ["0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45".lower(), "0x111111125421cA6dc452d289314280a0f8842A65".lower(), "0x3fC91A3afd70395E496CB848274244cA6353f3F3".lower()]

    def decode_smart(row):
        target_address = row['to_address'].lower()
        input_data = row['inputData']
        try:
            input_bytes = bytes.fromhex(input_data[2:])
            if target_address in ROUTER_V2_ADDRESSES:
                func_obj, _ = contract_v2.decode_function_input(input_bytes)
                return func_obj.fn_name
            elif target_address in MODERN_ROUTER_ADDRESSES:
                func_obj, _ = contract_universal.decode_function_input(input_bytes)
                return func_obj.fn_name
            else:
                return 'alvo_nao_mapeado'
        except Exception:
            return 'funcao_desconhecida'

    tqdm.pandas(desc="Decodificando todo o dataset")
    df['function_name'] = df.progress_apply(decode_smart, axis=1)
    
    print("\n--- ANÁLISE FINAL E COMPLETA ---")
    print("Distribuição real das funções em TODOS os alvos que você coletou:")
    
    display(df['function_name'].value_counts())
else:
    print("\nNenhum dado para analisar. Deixe seu coletor rodar por mais tempo.")

--- Passo 1: Carregando TODO o dataset da Mainnet do seu servidor ---
Sucesso! Carregamos um total de 456 transações de TODOS os alvos para análise final.

--- Passo 2: Decodificando transações com múltiplos ABIs ---


Decodificando todo o dataset:   0%|          | 0/456 [00:00<?, ?it/s]


--- ANÁLISE FINAL E COMPLETA ---
Distribuição real das funções em TODOS os alvos que você coletou:


function_name
funcao_desconhecida    454
alvo_nao_mapeado         2
Name: count, dtype: int64

In [18]:
# ==============================================================================
# SCRIPT FINAL DE ANÁLISE - v12.0 (O Decodificador Mestre Definitivo)
# ==============================================================================

import pandas as pd
from sqlalchemy import create_engine
from web3 import Web3
import json
import warnings

warnings.filterwarnings('ignore')

# --- PASSO 1: Carregar os Dados ---
print("--- Passo 1: Carregando dados da Mainnet do seu servidor na nuvem ---")
# ATENÇÃO: Substitua pelo IP público do seu servidor
SERVER_IP = "34.79.99.180" 
db_connection_str = f'postgresql://admin:supersecret@{SERVER_IP}:5433/mempool_data'
df = pd.DataFrame()
try:
    db_engine = create_engine(db_connection_str)
    df = pd.read_sql('SELECT hash, "inputData", to_address FROM transactions', con=db_engine)
    print(f"Sucesso! Carregamos {len(df)} transações REAIS para a engenharia reversa.")
    df.dropna(subset=['inputData'], inplace=True)
except Exception as e:
    print(f"--- ERRO ao carregar dados: {e} ---")

# --- PASSO 2: A Engenharia Reversa da Caixa-Preta ---
if not df.empty:
    print("\n--- Passo 2: Abrindo a 'Caixa-Preta' com o decodificador mestre ---")
    
    # "MANUAL MESTRE": O ABI COMPLETO da função 'execute' do Roteador Universal
    execute_abi_str = '[{"inputs":[{"type":"bytes","name":"commands","internalType":"bytes"},{"type":"bytes[]","name":"inputs","internalType":"bytes[]"},{"type":"uint256","name":"deadline","internalType":"uint256"}],"name":"execute","outputs":[],"stateMutability":"payable","type":"function"}]'
    execute_abi = json.loads(execute_abi_str)
    
    # "MANUAIS INTERNOS": Os ABIs para as funções de swap que podem estar DENTRO do 'execute'
    v3_swap_abi_str = '[{"name":"V3_SWAP_EXACT_IN","type":"function","inputs":[{"name":"recipient","type":"address"},{"name":"amountIn","type":"uint256"},{"name":"amountOutMin","type":"uint256"},{"name":"path","type":"bytes"},{"name":"payerIsUserAgent","type":"bool"}]}]'
    v2_swap_abi_str = '[{"name":"V2_SWAP_EXACT_IN","type":"function","inputs":[{"name":"recipient","type":"address"},{"name":"amountIn","type":"uint256"},{"name":"amountOutMin","type":"uint256"},{"name":"path","type":"address[]"},{"name":"payerIsUserAgent","type":"bool"}]}]'
    
    # Criamos os "Decodificadores"
    w3 = Web3()
    execute_contract = w3.eth.contract(abi=execute_abi)
    v3_swap_decoder = w3.eth.contract(abi=json.loads(v3_swap_abi_str))
    v2_swap_decoder = w3.eth.contract(abi=json.loads(v2_swap_abi_str))
    
    # Mapeamento dos códigos de comando para seus decodificadores específicos
    COMMAND_DECODERS = {
        0x00: {'decoder': v3_swap_decoder, 'name': 'V3_SWAP_EXACT_IN'},
        0x08: {'decoder': v2_swap_decoder, 'name': 'V2_SWAP_EXACT_IN'},
    }

    def open_black_box(input_data):
        try:
            # 1. Tenta abrir o "contêiner" execute
            input_bytes = bytes.fromhex(input_data[2:])
            func_obj, func_params = execute_contract.decode_function_input(input_bytes)
            
            if func_obj.fn_name == 'execute':
                commands = func_params['commands']
                inputs = func_params['inputs']
                swaps_found = []

                # 2. Loop através dos comandos internos
                for i, command_code in enumerate(commands):
                    if command_code in COMMAND_DECODERS:
                        # 3. Usa o decodificador correto para o comando de swap
                        decoder_info = COMMAND_DECODERS[command_code]
                        internal_func, internal_params = decoder_info['decoder'].decode_function_input(inputs[i])
                        swaps_found.append(decoder_info['name'])
                        
                return swaps_found if swaps_found else ['execute_sem_swap']
        except Exception:
            # Se não for uma chamada 'execute', retorna 'nao_e_execute'
            return ['nao_e_execute']
        return ['nao_e_execute']

    print("Analisando as 'caixas-pretas'...")
    df['decoded_functions'] = df['inputData'].apply(open_black_box)
    
    # "Explodimos" a lista para contar cada swap individualmente
    analysis_df = df.explode('decoded_functions')

    print("\n--- ANÁLISE FINAL E DEFINITIVA ---")
    print("Distribuição real das funções, incluindo as que estavam dentro do 'execute':")
    
    display(analysis_df['decoded_functions'].value_counts())
else:
    print("\nNenhum dado para analisar.")

--- Passo 1: Carregando dados da Mainnet do seu servidor na nuvem ---
Sucesso! Carregamos 498 transações REAIS para a engenharia reversa.

--- Passo 2: Abrindo a 'Caixa-Preta' com o decodificador mestre ---
Analisando as 'caixas-pretas'...

--- ANÁLISE FINAL E DEFINITIVA ---
Distribuição real das funções, incluindo as que estavam dentro do 'execute':


decoded_functions
nao_e_execute    498
Name: count, dtype: int64