### Importing the Neccessary Modules

In [3]:
# Importing the Necessary Modules for Wallet Explorer for Realized and Unrealized PnL
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import requests
import os
import json
import datetime
import time 
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import List, Dict, Optional, Tuple
from dotenv import load_dotenv
from dune_client.client import DuneClient



In [16]:
# Load query output from Dune

load_dotenv()
API_KEY = os.getenv('DUNE_CLIENT')
GECKO_API = os.getenv('GECKO_API_KEY')
dune = DuneClient(API_KEY)
response = dune.get_custom_endpoint_result(
  "firstbml",
  "wallet-explorer-drilldown",
  limit = 100
)

In [14]:
query_id = 5687370  

# 4. Get latest results from Dune
result = dune.get_latest_result(query_id)

# 5. Convert results to pandas DataFrame
df = pd.DataFrame(result.result.rows)

pd.set_option("display.max_columns", None)   # show all columns
pd.set_option("display.width", 2000)         # set a very wide display
pd.set_option("display.colheader_justify", "center")  # nicer alignment
# 6. Display
print(df.head(100))

     action            block_time          blockchain   gas_usd               token_in_address               token_in_amount token_in_symbol              token_out_address               token_out_amount token_out_symbol  trade_value_usd                      tx_hash                      
0     deposit  2025-08-26 15:17:35.000 UTC  ethereum   0.329005  0x0000000000000000000000000000000000000000      0.00314            ETH                                             None         NaN             None           14.067256     0xf4bc9f27926535c0c4a47c108a6a4d03c75991c072ed...
1     deposit  2025-08-24 19:59:11.000 UTC  ethereum   0.301943  0x0000000000000000000000000000000000000000      0.00100            ETH                                             None         NaN             None            4.792751     0x500839672aee63a5934bb5d23bbd6dc90eb58a091196...
2     deposit  2025-08-24 19:58:11.000 UTC  ethereum   0.239185  0x0000000000000000000000000000000000000000      0.00100            ETH 

In [17]:
headers = {  
    "accept": "application/json",  # Tells the API server to return the response in JSON format
    "x-cg-demo-api-key": GECKO_API  # Sends your CoinGecko API key to authenticate the request (used for Pro API)
}

In [24]:
query_id = 5687370  

# 4. Get latest results from Dune
result = dune.get_latest_result(query_id)

# 5. Convert results to pandas DataFrame
df = pd.DataFrame(result.result.rows)

# Save both CSV and raw data
df.to_csv('joblist.csv', index=False)
df.to_pickle('joblist.pkl')  # preserves data types
print("Results saved to joblist.csv and joblist.pkl")

# Display settings
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 2000)
pd.set_option("display.colheader_justify", "center")

# 6. Display
print(df.head(100))

Results saved to joblist.csv and joblist.pkl
     action            block_time          blockchain   gas_usd               token_in_address               token_in_amount token_in_symbol              token_out_address               token_out_amount token_out_symbol  trade_value_usd                      tx_hash                      
0     deposit  2025-08-26 15:17:35.000 UTC  ethereum   0.329005  0x0000000000000000000000000000000000000000      0.00314            ETH                                             None         NaN             None           14.067256     0xf4bc9f27926535c0c4a47c108a6a4d03c75991c072ed...
1     deposit  2025-08-24 19:59:11.000 UTC  ethereum   0.301943  0x0000000000000000000000000000000000000000      0.00100            ETH                                             None         NaN             None            4.792751     0x500839672aee63a5934bb5d23bbd6dc90eb58a091196...
2     deposit  2025-08-24 19:58:11.000 UTC  ethereum   0.239185  0x000000000000000000000000

In [25]:
query_id = 5687370  
result = dune.get_latest_result(query_id)
df = pd.DataFrame(result.result.rows)

# Get comprehensive info about the DataFrame
print("Complete DataFrame info:")
df.info()

Complete DataFrame info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8 entries, 0 to 7
Data columns (total 12 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   action             8 non-null      object 
 1   block_time         8 non-null      object 
 2   blockchain         8 non-null      object 
 3   gas_usd            8 non-null      float64
 4   token_in_address   7 non-null      object 
 5   token_in_amount    7 non-null      float64
 6   token_in_symbol    7 non-null      object 
 7   token_out_address  1 non-null      object 
 8   token_out_amount   1 non-null      float64
 9   token_out_symbol   1 non-null      object 
 10  trade_value_usd    8 non-null      float64
 11  tx_hash            8 non-null      object 
dtypes: float64(4), object(8)
memory usage: 900.0+ bytes


In [23]:
import pandas as pd
import requests
from datetime import datetime
import time
from collections import defaultdict, deque
import os

# --- Configuration ---
COINGECKO_API_KEY = "YOUR_API_KEY"  # Replace with your actual API key
REQUEST_DELAY = 1.5  # Delay between API calls to respect rate limits

# --- 1. Inspect the DataFrame Structure ---
print("DataFrame columns:", df.columns.tolist())
print("Data loaded from Dune. Shape:", df.shape)
print("\nFirst few rows:")
print(df.head())

# --- 2. Data Preprocessing ---
df['block_time'] = pd.to_datetime(df['block_time'])
df['date'] = df['block_time'].dt.date

# Create a unified long-format DataFrame for easier processing
# Since we don't have tx_hash, we'll create a unique identifier for each transaction
df['tx_identifier'] = df['block_time'].astype(str) + '_' + df['blockchain'] + '_' + df.index.astype(str)

in_transactions = df[df['token_in_amount'].notna()].copy()
out_transactions = df[df['token_out_amount'].notna()].copy()

in_transactions['type'] = 'in'
in_transactions['token_address'] = in_transactions['token_in_address']
in_transactions['amount'] = in_transactions['token_in_amount']
in_transactions['coingecko_id'] = None  # Placeholder

out_transactions['type'] = 'out'
out_transactions['token_address'] = out_transactions['token_out_address']
out_transactions['amount'] = -out_transactions['token_out_amount']  # Negative for outflows
out_transactions['coingecko_id'] = None  # Placeholder

# Select columns for concatenation - only use what we actually have
cols_to_keep = ['block_time', 'tx_identifier', 'token_address', 'amount', 'type', 'blockchain', 'date']

# Combine all transactions
all_txs = pd.concat([
    in_transactions[cols_to_keep],
    out_transactions[cols_to_keep]
], ignore_index=True).sort_values('block_time')

print(f"Created unified transaction list: {len(all_txs)} events")

# --- 3. Map Token Addresses to CoinGecko IDs ---
print("Mapping token addresses to CoinGecko IDs...")

# First, get a list of all unique token addresses
unique_token_addresses = all_txs['token_address'].dropna().unique()
print(f"Found {len(unique_token_addresses)} unique token addresses to map.")

# Fetch the complete CoinGecko ID list (do this once)
def get_coingecko_id_list():
    url = "https://api.coingecko.com/api/v3/coins/list?include_platform=true"
    try:
        response = requests.get(url)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching CoinGecko ID list: {e}")
        return []

print("Fetching CoinGecko coin list...")
coin_list = get_coingecko_id_list()
time.sleep(REQUEST_DELAY)

# Create mapping from contract address to CoinGecko ID
address_to_id = {}
for coin in coin_list:
    if 'platforms' in coin:
        for platform, address in coin['platforms'].items():
            if address:  # Only add if address exists
                address_to_id[address.lower()] = coin['id']

print(f"Created mapping for {len(address_to_id)} token addresses")

# Map the IDs to our transactions
def map_to_coingecko_id(row):
    if pd.isna(row['token_address']):
        return None
    
    # Try by contract address (most reliable)
    address_lower = row['token_address'].lower()
    if address_lower in address_to_id:
        return address_to_id[address_lower]
    
    return None

all_txs['coingecko_id'] = all_txs.apply(map_to_coingecko_id, axis=1)

# Check mapping success rate
mapped_count = all_txs['coingecko_id'].notna().sum()
print(f"Successfully mapped {mapped_count}/{len(all_txs)} transactions ({mapped_count/len(all_txs)*100:.1f}%)")

# Show some unmapped tokens for debugging
unmapped_tokens = all_txs[all_txs['coingecko_id'].isna()]['token_address'].unique()
if len(unmapped_tokens) > 0:
    print(f"Sample of unmapped tokens: {unmapped_tokens[:5]}")
else:
    print("All tokens mapped successfully!")

# --- 4. Fetch Historical Prices ---
print("Fetching historical prices...")

# Get unique (coingecko_id, date) pairs
unique_price_points = all_txs[all_txs['coingecko_id'].notna()].copy()
unique_price_points['date_str'] = unique_price_points['date'].astype(str)
unique_price_points = unique_price_points[['coingecko_id', 'date_str']].drop_duplicates()

print(f"Need to fetch {len(unique_price_points)} historical price points")

# Fetch historical prices with rate limiting
historical_prices = {}
for idx, (coingecko_id, date_str) in enumerate(unique_price_points.values):
    if coingecko_id not in historical_prices:
        historical_prices[coingecko_id] = {}
    
    if date_str not in historical_prices[coingecko_id]:
        # Convert date string to format required by CoinGecko (dd-mm-yyyy)
        try:
            date_obj = datetime.strptime(date_str, '%Y-%m-%d')
            coingecko_date_str = date_obj.strftime('%d-%m-%Y')
        except:
            coingecko_date_str = date_str
        
        url = f"https://api.coingecko.com/api/v3/coins/{coingecko_id}/history"
        params = {
            'date': coingecko_date_str,
            'localization': 'false'
        }
        
        if COINGECKO_API_KEY:
            params['x_cg_demo_api_key'] = COINGECKO_API_KEY
        
        try:
            response = requests.get(url, params=params)
            if response.status_code == 429:
                print("Rate limit hit. Increasing delay...")
                time.sleep(30)  # Wait longer if rate limited
                continue
                
            response.raise_for_status()
            data = response.json()
            
            if 'market_data' in data and 'current_price' in data['market_data']:
                usd_price = data['market_data']['current_price'].get('usd')
                if usd_price:
                    historical_prices[coingecko_id][date_str] = usd_price
                    print(f"✓ {coingecko_id} on {date_str}: ${usd_price}")
                else:
                    print(f"✗ No USD price for {coingecko_id} on {date_str}")
            else:
                print(f"✗ No market data for {coingecko_id} on {date_str}")
                
        except requests.exceptions.RequestException as e:
            print(f"Error fetching price for {coingecko_id} on {date_str}: {e}")
        
        # Respect rate limits
        time.sleep(REQUEST_DELAY)
    
    if idx % 5 == 0 and idx > 0:
        print(f"Processed {idx}/{len(unique_price_points)} price points")

# Add prices to transactions
def get_historical_price(row):
    if pd.isna(row['coingecko_id']):
        return None
    date_str = str(row['date'])
    return historical_prices.get(row['coingecko_id'], {}).get(date_str)

all_txs['price_at_tx'] = all_txs.apply(get_historical_price, axis=1)
all_txs['usd_value'] = all_txs['amount'] * all_txs['price_at_tx']

priced_count = all_txs['price_at_tx'].notna().sum()
print(f"Successfully priced {priced_count}/{len(all_txs)} transactions")

# --- 5. FIFO Inventory Accounting ---
print("Running FIFO inventory accounting...")

# Initialize FIFO inventory for each token
fifo_inventory = defaultdict(deque)
realized_pnl = []
holdings = defaultdict(float)

# Sort transactions by time to ensure FIFO order
all_txs_sorted = all_txs.sort_values('block_time')

for idx, tx in all_txs_sorted.iterrows():
    if pd.isna(tx['coingecko_id']) or pd.isna(tx['price_at_tx']):
        # Skip transactions we couldn't price
        realized_pnl.append(0)
        continue
        
    token_id = tx['coingecko_id']
    amount = tx['amount']
    price = tx['price_at_tx']
    
    if amount > 0:  # BUY
        # Add to FIFO queue
        fifo_inventory[token_id].append({
            'amount': amount,
            'price': price,
            'timestamp': tx['block_time']
        })
        holdings[token_id] += amount
        realized_pnl.append(0)  # No PnL on buys
        
    else:  # SELL (amount is negative)
        sell_amount_remaining = -amount
        pnl_from_this_tx = 0
        
        # Process through FIFO inventory
        while sell_amount_remaining > 0 and fifo_inventory[token_id]:
            oldest_lot = fifo_inventory[token_id][0]
            
            # Calculate how much we can take from this lot
            amount_to_use = min(sell_amount_remaining, oldest_lot['amount'])
            
            # Calculate cost basis and PnL for this portion
            cost_basis_portion = amount_to_use * oldest_lot['price']
            proceeds_portion = amount_to_use * price
            pnl_portion = proceeds_portion - cost_basis_portion
            
            pnl_from_this_tx += pnl_portion
            
            # Update the lot and remaining amount
            sell_amount_remaining -= amount_to_use
            oldest_lot['amount'] -= amount_to_use
            
            # Remove empty lots
            if oldest_lot['amount'] <= 1e-10:  # Account for floating point errors
                fifo_inventory[token_id].popleft()
        
        holdings[token_id] += amount  # amount is negative for sells
        realized_pnl.append(pnl_from_this_tx)

# Add realized PnL to dataframe
all_txs_sorted['realized_pnl'] = realized_pnl

# --- 6. Calculate Current Holdings and Unrealized PnL ---
print("Calculating current holdings and unrealized PnL...")

# Fetch current prices only for tokens we still hold
current_prices = {}
tokens_with_holdings = {token_id: amount for token_id, amount in holdings.items() if amount > 0}

print(f"Fetching current prices for {len(tokens_with_holdings)} tokens with holdings...")

for token_id in tokens_with_holdings.keys():
    url = "https://api.coingecko.com/api/v3/simple/price"
    params = {
        'ids': token_id,
        'vs_currencies': 'usd'
    }
    
    if COINGECKO_API_KEY:
        params['x_cg_demo_api_key'] = COINGECKO_API_KEY
    
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        data = response.json()
        current_prices[token_id] = data[token_id]['usd']
        print(f"✓ Current price for {token_id}: ${current_prices[token_id]}")
    except requests.exceptions.RequestException as e:
        print(f"Error fetching current price for {token_id}: {e}")
        current_prices[token_id] = 0
    
    time.sleep(REQUEST_DELAY)

# Calculate unrealized PnL for each token
unrealized_pnl_summary = []
for token_id, amount in tokens_with_holdings.items():
    if token_id in current_prices and amount > 0:
        current_value = amount * current_prices[token_id]
        
        # Calculate total cost basis from remaining FIFO inventory
        total_cost = sum(lot['amount'] * lot['price'] for lot in fifo_inventory[token_id])
        unrealized_pnl = current_value - total_cost
        
        unrealized_pnl_summary.append({
            'token': token_id,
            'amount': amount,
            'current_price': current_prices[token_id],
            'current_value': current_value,
            'average_cost': total_cost / amount if amount > 0 else 0,
            'unrealized_pnl': unrealized_pnl
        })

# --- 7. Create Final Output ---
print("\n=== RESULTS ===")
total_realized = all_txs_sorted['realized_pnl'].sum()
print(f"Total Realized PnL: ${total_realized:,.2f}")

total_unrealized = sum(item['unrealized_pnl'] for item in unrealized_pnl_summary)
print(f"Total Unrealized PnL: ${total_unrealized:,.2f}")

print(f"\nCurrent Holdings ({len(unrealized_pnl_summary)} tokens):")
for holding in unrealized_pnl_summary:
    print(f"{holding['token']}: {holding['amount']:,.6f} tokens "
          f"(Value: ${holding['current_value']:,.2f}, "
          f"Unrealized PnL: ${holding['unrealized_pnl']:,.2f})")

# Save detailed transactions with PnL
all_txs_sorted.to_csv('wallet_pnl_analysis.csv', index=False)
print("\nDetailed transaction data saved to 'wallet_pnl_analysis.csv'")

# Show final inventory state
print("\nFinal Inventory State:")
for token_id, lots in fifo_inventory.items():
    if lots:  # Only show tokens with remaining inventory
        total_remaining = sum(lot['amount'] for lot in lots)
        print(f"{token_id}: {total_remaining:,.6f} tokens remaining in inventory")

DataFrame columns: ['block_time', 'blockchain', 'token_in_address', 'token_in_amount', 'token_out_address', 'token_out_amount', 'date']
Data loaded from Dune. Shape: (2, 7)

First few rows:
       block_time     blockchain               token_in_address               token_in_amount              token_out_address               token_out_amount     date   
0 2025-07-17 05:10:09  optimism   0x7f5c764cbc14f9669b88837ca1490cca17c31607        100        0x4200000000000000000000000000000000000042        250.0       2025-07-17
1 2025-08-03 06:43:19       bnb   0xe9e7cea3dedca5984780bafc599bd69add087d56         50        0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c          0.2       2025-08-03
Created unified transaction list: 4 events
Mapping token addresses to CoinGecko IDs...
Found 4 unique token addresses to map.
Fetching CoinGecko coin list...
Created mapping for 21691 token addresses
Successfully mapped 4/4 transactions (100.0%)
All tokens mapped successfully!
Fetching historical prices..

In [28]:
import pandas as pd
import requests
import asyncio
import aiohttp
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import streamlit as st
from functools import lru_cache
import logging
from concurrent.futures import ThreadPoolExecutor
import threading

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ScalablePnLCalculator:
    def __init__(self, coingecko_api_key: Optional[str] = None):
        self.coingecko_api_key = coingecko_api_key
        self.coingecko_cache = {}
        self.token_mapping_cache = {}
        self.executor = ThreadPoolExecutor(max_workers=5)
        self.cache_lock = threading.Lock()
        self.session = None
        
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    def _get_coingecko_headers(self) -> Dict:
        """Get headers for CoinGecko API requests"""
        if self.coingecko_api_key:
            return {'x-cg-pro-api-key': self.coingecko_api_key}
        return {}
    
    async def get_coingecko_id_from_address(self, token_address: str, blockchain: str) -> Optional[str]:
        """
        Dynamically map token address to CoinGecko ID using CoinGecko API
        """
        cache_key = f"{blockchain}_{token_address.lower()}"
        
        # Check cache first
        with self.cache_lock:
            if cache_key in self.token_mapping_cache:
                return self.token_mapping_cache[cache_key]
        
        # Map blockchain names to CoinGecko platform IDs
        platform_map = {
            'ethereum': 'ethereum',
            'bnb': 'binance-smart-chain',
            'arbitrum': 'arbitrum-one',
            'optimism': 'optimistic-ethereum',
            'base': 'base'
        }
        
        if blockchain not in platform_map:
            return None
            
        try:
            url = "https://api.coingecko.com/api/v3/coins/list"
            params = {
                'include_platform': 'true'
            }
            
            async with self.session.get(url, params=params, headers=self._get_coingecko_headers()) as response:
                if response.status == 200:
                    coins = await response.json()
                    
                    # Find coin with matching address on the correct platform
                    for coin in coins:
                        platforms = coin.get('platforms', {})
                        if platforms.get(platform_map[blockchain], '').lower() == token_address.lower():
                            with self.cache_lock:
                                self.token_mapping_cache[cache_key] = coin['id']
                            return coin['id']
                
                # If not found, try direct contract address lookup
                url = f"https://api.coingecko.com/api/v3/coins/{platform_map[blockchain]}/contract/{token_address}"
                async with self.session.get(url, headers=self._get_coingecko_headers()) as response:
                    if response.status == 200:
                        coin_data = await response.json()
                        with self.cache_lock:
                            self.token_mapping_cache[cache_key] = coin_data['id']
                        return coin_data['id']
                        
        except Exception as e:
            logger.warning(f"Failed to map token {token_address} on {blockchain}: {e}")
        
        return None
    
    @lru_cache(maxsize=1000)
    async def get_historical_price(self, coingecko_id: str, timestamp: datetime) -> Optional[float]:
        """
        Get historical price with caching
        """
        cache_key = f"{coingecko_id}_{timestamp.date()}"
        
        with self.cache_lock:
            if cache_key in self.coingecko_cache:
                return self.coingecko_cache[cache_key]
        
        try:
            date_str = timestamp.strftime('%d-%m-%Y')
            url = f"https://api.coingecko.com/api/v3/coins/{coingecko_id}/history"
            params = {
                'date': date_str,
                'localization': 'false'
            }
            
            async with self.session.get(url, params=params, headers=self._get_coingecko_headers()) as response:
                if response.status == 200:
                    data = await response.json()
                    price = data['market_data']['current_price']['usd']
                    
                    with self.cache_lock:
                        self.coingecko_cache[cache_key] = price
                    
                    # Respect rate limits
                    await asyncio.sleep(0.5)
                    return price
                    
        except Exception as e:
            logger.warning(f"Failed to get price for {coingecko_id} on {timestamp}: {e}")
        
        return None
    
    async def process_transactions_batch(self, df_batch: pd.DataFrame) -> Dict:
        """
        Process a batch of transactions for FIFO PnL calculation
        """
        inventory = {}  # {token_address: [(amount, cost_basis, timestamp)]}
        realized_pnl = 0
        transactions_processed = []
        
        for _, row in df_batch.iterrows():
            try:
                action = str(row['action']).lower().strip()
                blockchain = str(row['blockchain']).lower().strip()
                
                if action == 'deposit' and pd.notna(row['token_in_address']):
                    # Handle deposit
                    token_address = str(row['token_in_address']).lower()
                    amount = float(row['token_in_amount'])
                    
                    # Get CoinGecko ID
                    coingecko_id = await self.get_coingecko_id_from_address(token_address, blockchain)
                    if not coingecko_id:
                        continue
                    
                    # Get historical price
                    price = await self.get_historical_price(coingecko_id, row['block_time'])
                    if not price:
                        continue
                    
                    # Calculate cost basis with gas
                    gas_cost = float(row['gas_usd']) if pd.notna(row['gas_usd']) else 0
                    effective_cost_basis = price + (gas_cost / amount) if amount > 0 else price
                    
                    # Add to inventory
                    if token_address not in inventory:
                        inventory[token_address] = []
                    inventory[token_address].append((amount, effective_cost_basis, row['block_time']))
                    
                    transactions_processed.append({
                        'type': 'DEPOSIT',
                        'token': coingecko_id,
                        'amount': amount,
                        'cost_basis': effective_cost_basis,
                        'timestamp': row['block_time']
                    })
                    
                elif action == 'withdrawal' and pd.notna(row['token_out_address']):
                    # Handle withdrawal
                    token_address = str(row['token_out_address']).lower()
                    amount = float(row['token_out_amount'])
                    
                    # Get CoinGecko ID and current price
                    coingecko_id = await self.get_coingecko_id_from_address(token_address, blockchain)
                    if not coingecko_id:
                        continue
                    
                    current_price = await self.get_historical_price(coingecko_id, row['block_time'])
                    if not current_price:
                        continue
                    
                    # FIFO processing
                    if token_address in inventory and inventory[token_address]:
                        remaining_amount = amount
                        
                        while remaining_amount > 0 and inventory[token_address]:
                            lot_amount, lot_cost, lot_timestamp = inventory[token_address][0]
                            
                            take_amount = min(lot_amount, remaining_amount)
                            proceeds = take_amount * current_price
                            cost = take_amount * lot_cost
                            lot_pnl = proceeds - cost
                            
                            realized_pnl += lot_pnl
                            remaining_amount -= take_amount
                            
                            if take_amount == lot_amount:
                                inventory[token_address].pop(0)
                            else:
                                inventory[token_address][0] = (lot_amount - take_amount, lot_cost, lot_timestamp)
                            
                            transactions_processed.append({
                                'type': 'WITHDRAWAL',
                                'token': coingecko_id,
                                'amount': take_amount,
                                'cost_basis': lot_cost,
                                'sale_price': current_price,
                                'pnl': lot_pnl,
                                'timestamp': row['block_time']
                            })
                        
            except Exception as e:
                logger.error(f"Error processing transaction: {e}")
                continue
        
        return {
            'realized_pnl': realized_pnl,
            'inventory': inventory,
            'transactions_processed': transactions_processed
        }
    
    async def calculate_wallet_pnl(self, df: pd.DataFrame, wallet_address: str) -> Dict:
        """
        Main method to calculate PnL for a wallet with pagination
        """
        start_time = time.time()
        
        # Clean and sort data
        df_clean = df.copy()
        df_clean['block_time'] = pd.to_datetime(df_clean['block_time'])
        df_clean = df_clean.sort_values('block_time')
        
        total_transactions = len(df_clean)
        batch_size = 20
        all_results = {
            'realized_pnl': 0,
            'inventory': {},
            'transactions_processed': []
        }
        
        # Process in batches
        for i in range(0, total_transactions, batch_size):
            batch = df_clean.iloc[i:i + batch_size]
            batch_results = await self.process_transactions_batch(batch)
            
            # Aggregate results
            all_results['realized_pnl'] += batch_results['realized_pnl']
            
            # Merge inventory
            for token, lots in batch_results['inventory'].items():
                if token not in all_results['inventory']:
                    all_results['inventory'][token] = []
                all_results['inventory'][token].extend(lots)
            
            all_results['transactions_processed'].extend(batch_results['transactions_processed'])
            
            # Update progress (for Streamlit)
            progress = min((i + batch_size) / total_transactions, 1.0)
            logger.info(f"Processed {i + len(batch)}/{total_transactions} transactions")
        
        # Calculate unrealized PnL
        unrealized_pnl = 0
        current_time = datetime.now()
        
        for token_address, lots in all_results['inventory'].items():
            # Get current price for each token
            coingecko_id = await self.get_coingecko_id_from_address(token_address, 'ethereum')  # Assuming mainnet for current prices
            if coingecko_id:
                current_price = await self.get_historical_price(coingecko_id, current_time)
                if current_price:
                    for amount, cost_basis, _ in lots:
                        unrealized_pnl += amount * (current_price - cost_basis)
        
        processing_time = time.time() - start_time
        logger.info(f"PNL calculation completed in {processing_time:.2f} seconds")
        
        return {
            'wallet_address': wallet_address,
            'realized_pnl': all_results['realized_pnl'],
            'unrealized_pnl': unrealized_pnl,
            'total_pnl': all_results['realized_pnl'] + unrealized_pnl,
            'current_inventory': all_results['inventory'],
            'processing_time': processing_time,
            'total_transactions': total_transactions
        }

# Streamlit App Integration
async def run_pnl_calculation(wallet_address: str, df: pd.DataFrame):
    """Async function to run PnL calculation in background"""
    async with ScalablePnLCalculator(coingecko_api_key=st.secrets.get("COINGECKO_API_KEY")) as calculator:
        return await calculator.calculate_wallet_pnl(df, wallet_address)

def main():
    # Example usage in Streamlit
    st.title("Wallet PnL Calculator")
    
    wallet_address = st.text_input("Enter Wallet Address")
    
    if wallet_address and st.button("Calculate PnL"):
        # Load data from Dune query (replace with your data loading logic)
        # df = load_wallet_data(wallet_address)
        
        # Run async calculation
        with st.spinner("Calculating PnL..."):
            result = asyncio.run(run_pnl_calculation(wallet_address, df))
        
        # Display results
        st.metric("Realized PnL", f"${result['realized_pnl']:,.2f}")
        st.metric("Unrealized PnL", f"${result['unrealized_pnl']:,.2f}")
        st.metric("Total PnL", f"${result['total_pnl']:,.2f}")
        st.info(f"Processed {result['total_transactions']} transactions in {result['processing_time']:.2f}s")

if __name__ == "__main__":
    main()

