# OnchainBot Testing Notebook

This notebook contains tests to:

1. Test API connections for Ethereum and Solana ingestion modules.
2. Compute and display PnL of tracked wallets over different time windows.


## 1. API Connections Test


In [None]:
# Load environment variables
from dotenv import load_dotenv
import os

load_dotenv()

import asyncio
from pathlib import Path

# Ethereum ingestion
from ingestion.eth import load_wallets as load_eth_wallets, event_bus as eth_event_bus
# Solana ingestion
from ingestion.sol import load_wallets as load_sol_wallets, get_jupiter_price, event_bus as sol_event_bus

# Test wallet loading
eth_wallet_file = Path(os.getenv("ETH_WALLETS_FILE", "wallets_eth.json"))
sol_wallet_file = Path(os.getenv("SOL_WALLETS_FILE", "wallets_sol.json"))
eth_wallets = asyncio.run(load_eth_wallets(eth_wallet_file))
sol_wallets = asyncio.run(load_sol_wallets(sol_wallet_file))

print("ETH wallets:", eth_wallets)
print("SOL wallets:", sol_wallets)

# Test Jupiter price quote for a sample swap (lamports)
if len(sol_wallets) >= 2:
    token_in = sol_wallets[0]
    token_out = sol_wallets[1]
    sample_amount = 1_000_000
    price = asyncio.run(get_jupiter_price(token_in, token_out, sample_amount))
    print(f"Sample Jupiter quote for swapping {sample_amount} lamports from {token_in} to {token_out}: {price}")
else:
    print("Not enough SOL wallets to test Jupiter price")


## 2. Wallet PnL Analysis


In [None]:
import os
import asyncio
from datetime import datetime, timedelta

import pandas as pd
from web3 import Web3
from IPython.display import display

# Load wallets
eth_wallets = pd.read_json(os.getenv("ETH_WALLETS_FILE", "wallets_eth.json"))["address"].tolist()
sol_wallets = pd.read_json(os.getenv("SOL_WALLETS_FILE", "wallets_sol.json"))["address"].tolist()

# Initialize Web3 provider (replace with your Alchemy HTTP URL)
alchemy_ws = os.getenv("ALCHEMY_WS_URL", "")
alchemy_http = alchemy_ws.replace("wss://", "https://", 1) if alchemy_ws.startswith("wss") else alchemy_ws
w3 = Web3(Web3.HTTPProvider(alchemy_http))

def fetch_eth_swaps(wallet: str, start_timestamp: int):
    """
    Placeholder: Fetch swap transactions for an ETH wallet since start_timestamp.
    Implement using Alchemy transaction API or web3 filters.
    """
    return []

async def fetch_sol_swaps(wallet: str, start_timestamp: int):
    """
    Placeholder: Fetch decoded swap events for a SOL wallet since start_timestamp.
    Could reuse ingestion.sol subscription or REST endpoints.
    """
    return []

# Define analysis periods
now = datetime.utcnow()
periods = {
    'Last Week': now - timedelta(weeks=1),
    'Last Month': now - timedelta(days=30),
    'Last 3 Months': now - timedelta(days=90),
    'Last Year': now - timedelta(days=365),
}

# Compute PnL summary
pnl_summary = {}
for label, start_dt in periods.items():
    start_ts = int(start_dt.timestamp())
    entries = []
    # ETH PnL
    for w in eth_wallets:
        swaps = fetch_eth_swaps(w, start_ts)
        pnl = sum([tx.get('amountOutMin', 0) - tx.get('amountIn', 0) for tx in swaps])
        entries.append({'chain': 'ETH', 'wallet': w, 'pnl': pnl, 'tx_count': len(swaps)})
    # SOL PnL
    for w in sol_wallets:
        swaps = asyncio.run(fetch_sol_swaps(w, start_ts))
        # Example: swap dict may include 'amountIn', 'amountOutMin' and 'price'
        pnl = sum([(tx.get('amountOutMin', 0) * tx.get('price', 1)) - tx.get('amountIn', 0) for tx in swaps])
        entries.append({'chain': 'SOL', 'wallet': w, 'pnl': pnl, 'tx_count': len(swaps)})
    pnl_summary[label] = pd.DataFrame(entries)
    print(f"\n## {label}")\n    display(pnl_summary[label])
