# DeFi DEX Reputation Scoring Model

This notebook contains the AI scoring logic for DEX (Decentralized Exchange) transactions.
Your task is to integrate this logic into a production Kafka-based microservice.

## Model Overview
- **LP Scoring**: Analyzes liquidity provision patterns (deposits/withdraws)
- **Swap Scoring**: Analyzes trading behavior and token diversity
- **Combined Scoring**: Weighted average with user categorization

In [None]:
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple

# Sample transaction data structure
sample_wallet_data = {
    "wallet_address": "0x742d35Cc6634C0532925a3b8D4C9db96590e4265",
    "data": [
        {
            "protocolType": "dexes",
            "transactions": [
                {
                    "document_id": "507f1f77bcf86cd799439011",
                    "action": "swap",
                    "timestamp": 1703980800,
                    "caller": "0x742d35Cc6634C0532925a3b8D4C9db96590e4265",
                    "protocol": "uniswap_v3",
                    "poolId": "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640",
                    "poolName": "Uniswap V3 USDC/WETH 0.05%",
                    "tokenIn": {
                        "amount": 1000000000,
                        "amountUSD": 1000.0,
                        "address": "0xa0b86a33e6c3d4c3e6c3d4c3e6c3d4c3e6c3d4c3",
                        "symbol": "USDC"
                    },
                    "tokenOut": {
                        "amount": 500000000000000000,
                        "amountUSD": 1000.0,
                        "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
                        "symbol": "WETH"
                    }
                },
                {
                    "document_id": "507f1f77bcf86cd799439012",
                    "action": "deposit",
                    "timestamp": 1703980900,
                    "caller": "0x742d35Cc6634C0532925a3b8D4C9db96590e4265",
                    "protocol": "uniswap_v3",
                    "poolId": "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640",
                    "poolName": "Uniswap V3 USDC/WETH 0.05%",
                    "token0": {
                        "amount": 500000000,
                        "amountUSD": 500.0,
                        "address": "0xa0b86a33e6c3d4c3e6c3d4c3e6c3d4c3e6c3d4c3",
                        "symbol": "USDC"
                    },
                    "token1": {
                        "amount": 250000000000000000,
                        "amountUSD": 500.0,
                        "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
                        "symbol": "WETH"
                    }
                }
            ]
        }
    ]
}

print("Sample data loaded successfully")

## Data Preprocessing Functions

These functions convert JSON transaction data into pandas DataFrames for analysis.

In [None]:
def preprocess_dex_transactions(wallet_data: Dict) -> pd.DataFrame:
    """
    Convert wallet transaction data to DataFrame for DEX analysis.
    
    Args:
        wallet_data: Wallet data with transactions
        
    Returns:
        DataFrame with processed transaction data
    """
    rows = []
    wallet_address = wallet_data['wallet_address']
    
    for category_data in wallet_data['data']:
        if category_data['protocolType'] != 'dexes':
            continue
            
        for tx in category_data['transactions']:
            row = {
                'wallet_address': wallet_address,
                'document_id': tx['document_id'],
                'action': tx['action'],
                'timestamp': tx['timestamp'],
                'protocol': tx.get('protocol', ''),
                'pool_id': tx.get('poolId', ''),
                'pool_name': tx.get('poolName', '')
            }
            
            # Handle different transaction types
            if tx['action'] == 'swap':
                token_in = tx.get('tokenIn', {})
                token_out = tx.get('tokenOut', {})
                row['amount_usd'] = max(
                    token_in.get('amountUSD', 0),
                    token_out.get('amountUSD', 0)
                )
                row['token_in_symbol'] = token_in.get('symbol', '')
                row['token_out_symbol'] = token_out.get('symbol', '')
                
            elif tx['action'] in ['deposit', 'withdraw']:
                token0 = tx.get('token0', {})
                token1 = tx.get('token1', {})
                row['amount_usd'] = (
                    token0.get('amountUSD', 0) + token1.get('amountUSD', 0)
                )
                row['token0_symbol'] = token0.get('symbol', '')
                row['token1_symbol'] = token1.get('symbol', '')
            
            rows.append(row)
    
    df = pd.DataFrame(rows)
    if not df.empty:
        df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')
    
    return df

# Test preprocessing
df = preprocess_dex_transactions(sample_wallet_data)
print(f"Processed {len(df)} transactions")
print(df.head())

## LP (Liquidity Provider) Scoring Functions

These functions analyze liquidity provision patterns and calculate LP reputation scores.

In [None]:
def calculate_lp_features(df: pd.DataFrame) -> Dict[str, float]:
    """
    Calculate LP-specific features from transaction data.
    
    Args:
        df: DataFrame with DEX transactions
        
    Returns:
        Dictionary with LP features
    """
    if df.empty:
        return {}
    
    # Filter LP transactions
    deposits = df[df['action'] == 'deposit']
    withdraws = df[df['action'] == 'withdraw']
    
    # Basic metrics
    total_deposit_usd = deposits['amount_usd'].sum() if not deposits.empty else 0.0
    total_withdraw_usd = withdraws['amount_usd'].sum() if not withdraws.empty else 0.0
    num_deposits = len(deposits)
    num_withdraws = len(withdraws)
    
    # Calculate withdraw ratio
    withdraw_ratio = total_withdraw_usd / total_deposit_usd if total_deposit_usd > 0 else 0.0
    
    # Calculate account age
    if not df.empty:
        account_age_days = (df['timestamp'].max() - df['timestamp'].min()) / 86400
    else:
        account_age_days = 0.0
    
    # Calculate average holding time
    avg_hold_time_days = calculate_holding_time(deposits, withdraws)
    
    # Unique pools
    unique_pools = df['pool_id'].nunique()
    
    return {
        'total_deposit_usd': total_deposit_usd,
        'total_withdraw_usd': total_withdraw_usd,
        'num_deposits': num_deposits,
        'num_withdraws': num_withdraws,
        'withdraw_ratio': withdraw_ratio,
        'avg_hold_time_days': avg_hold_time_days,
        'account_age_days': account_age_days,
        'unique_pools': unique_pools
    }

def calculate_holding_time(deposits: pd.DataFrame, withdraws: pd.DataFrame) -> float:
    """
    Calculate realistic holding time by matching deposits to withdraws.
    """
    if deposits.empty:
        return 0.0
    
    holding_times = []
    current_time = datetime.now().timestamp()
    
    for _, deposit in deposits.iterrows():
        deposit_time = deposit['timestamp']
        
        # Find next withdraw after this deposit
        future_withdraws = withdraws[withdraws['timestamp'] > deposit_time]
        
        if not future_withdraws.empty:
            # Use earliest withdraw
            withdraw_time = future_withdraws['timestamp'].min()
            holding_time = (withdraw_time - deposit_time) / 86400  # Convert to days
        else:
            # No withdraw found, use current time
            holding_time = (current_time - deposit_time) / 86400
        
        holding_times.append(holding_time)
    
    return np.mean(holding_times) if holding_times else 0.0

def calculate_lp_score(features: Dict[str, float]) -> Tuple[float, Dict[str, Any]]:
    """
    Calculate LP reputation score based on features.
    
    Returns:
        Tuple of (score, score_breakdown)
    """
    if not features:
        return 0.0, {}
    
    # Volume score (0-300 points)
    volume_score = min(features['total_deposit_usd'] / 10000 * 300, 300)
    
    # Frequency score (0-200 points)
    frequency_score = min(features['num_deposits'] * 20, 200)
    
    # Retention score (0-250 points) - higher is better for low withdraw ratio
    retention_score = max(0, (1 - features['withdraw_ratio']) * 250)
    
    # Holding time score (0-150 points)
    holding_score = min(features['avg_hold_time_days'] / 30 * 150, 150)
    
    # Diversity score (0-100 points)
    diversity_score = min(features['unique_pools'] * 20, 100)
    
    total_score = volume_score + frequency_score + retention_score + holding_score + diversity_score
    
    breakdown = {
        'volume_score': volume_score,
        'frequency_score': frequency_score,
        'retention_score': retention_score,
        'holding_score': holding_score,
        'diversity_score': diversity_score,
        'total_lp_score': total_score
    }
    
    return total_score, breakdown

# Test LP scoring
lp_features = calculate_lp_features(df)
lp_score, lp_breakdown = calculate_lp_score(lp_features)
print(f"LP Features: {lp_features}")
print(f"LP Score: {lp_score}")
print(f"LP Breakdown: {lp_breakdown}")

## Swap Scoring Functions

These functions analyze trading behavior and calculate swap reputation scores.

In [None]:
def calculate_swap_features(df: pd.DataFrame) -> Dict[str, float]:
    """
    Calculate swap-specific features from transaction data.
    
    Args:
        df: DataFrame with DEX transactions
        
    Returns:
        Dictionary with swap features
    """
    if df.empty:
        return {}
    
    # Filter swap transactions
    swaps = df[df['action'] == 'swap']
    
    if swaps.empty:
        return {
            'total_swap_volume': 0.0,
            'num_swaps': 0,
            'unique_pools_swapped': 0,
            'avg_swap_size': 0.0,
            'token_diversity_score': 0,
            'swap_frequency_score': 0.0
        }
    
    # Basic swap metrics
    total_swap_volume = swaps['amount_usd'].sum()
    num_swaps = len(swaps)
    unique_pools_swapped = swaps['pool_id'].nunique()
    avg_swap_size = total_swap_volume / num_swaps if num_swaps > 0 else 0.0
    
    # Token diversity analysis
    token_diversity_score = calculate_token_diversity(swaps)
    
    # Swap frequency analysis
    swap_frequency_score = calculate_swap_frequency(swaps)
    
    return {
        'total_swap_volume': total_swap_volume,
        'num_swaps': num_swaps,
        'unique_pools_swapped': unique_pools_swapped,
        'avg_swap_size': avg_swap_size,
        'token_diversity_score': token_diversity_score,
        'swap_frequency_score': swap_frequency_score
    }

def calculate_token_diversity(swaps: pd.DataFrame) -> int:
    """
    Calculate token diversity score based on variety of tokens traded.
    """
    if swaps.empty:
        return 0
    
    # Define stable tokens
    stable_tokens = {'USDC', 'USDT', 'DAI', 'LUSD', 'USDP', 'TUSD', 'FRAX'}
    
    # Get all unique tokens
    tokens_in = set(swaps['token_in_symbol'].dropna())
    tokens_out = set(swaps['token_out_symbol'].dropna())
    all_tokens = tokens_in.union(tokens_out)
    
    # Count stable vs volatile tokens
    stable_count = len(all_tokens.intersection(stable_tokens))
    volatile_count = len(all_tokens) - stable_count
    
    # Weighted diversity score (volatile tokens worth more)
    diversity_score = stable_count * 10 + volatile_count * 15
    
    return min(diversity_score, 150)  # Cap at 150 points

def calculate_swap_frequency(swaps: pd.DataFrame) -> float:
    """
    Calculate swap frequency score based on trading patterns.
    """
    if swaps.empty or len(swaps) < 2:
        return 0.0
    
    # Calculate time between swaps
    swaps_sorted = swaps.sort_values('timestamp')
    time_diffs = swaps_sorted['timestamp'].diff().dropna()
    
    # Convert to hours
    time_diffs_hours = time_diffs / 3600
    
    # Calculate average time between swaps
    avg_time_between_swaps = time_diffs_hours.mean()
    
    # Score based on frequency (lower time = higher score)
    if avg_time_between_swaps <= 1:  # Less than 1 hour
        return 100.0
    elif avg_time_between_swaps <= 24:  # Less than 1 day
        return 80.0
    elif avg_time_between_swaps <= 168:  # Less than 1 week
        return 60.0
    elif avg_time_between_swaps <= 720:  # Less than 1 month
        return 40.0
    else:
        return 20.0

def calculate_swap_score(features: Dict[str, float]) -> Tuple[float, Dict[str, Any]]:
    """
    Calculate swap reputation score based on features.
    
    Returns:
        Tuple of (score, score_breakdown)
    """
    if not features:
        return 0.0, {}
    
    # Volume score (0-250 points)
    volume_score = min(features['total_swap_volume'] / 50000 * 250, 250)
    
    # Frequency score (0-200 points)
    frequency_score = min(features['num_swaps'] * 10, 200)
    
    # Diversity score (0-150 points) - from token diversity
    diversity_score = features['token_diversity_score']
    
    # Activity score (0-100 points) - from swap frequency
    activity_score = features['swap_frequency_score']
    
    # Pool diversity score (0-100 points)
    pool_diversity_score = min(features['unique_pools_swapped'] * 25, 100)
    
    total_score = volume_score + frequency_score + diversity_score + activity_score + pool_diversity_score
    
    breakdown = {
        'volume_score': volume_score,
        'frequency_score': frequency_score,
        'diversity_score': diversity_score,
        'activity_score': activity_score,
        'pool_diversity_score': pool_diversity_score,
        'total_swap_score': total_score
    }
    
    return total_score, breakdown

# Test swap scoring
swap_features = calculate_swap_features(df)
swap_score, swap_breakdown = calculate_swap_score(swap_features)
print(f"Swap Features: {swap_features}")
print(f"Swap Score: {swap_score}")
print(f"Swap Breakdown: {swap_breakdown}")

## Combined Scoring and User Tagging

Final functions to combine LP and Swap scores and generate user tags.

In [None]:
def generate_user_tags(lp_features: Dict, swap_features: Dict) -> List[str]:
    """
    Generate user tags based on LP and swap behavior.
    
    Returns:
        List of user tags
    """
    tags = []
    
    # LP-based tags
    if lp_features.get('total_deposit_usd', 0) > 100000:
        tags.append('Whale LP')
    elif lp_features.get('total_deposit_usd', 0) > 10000:
        tags.append('Large LP')
    elif lp_features.get('total_deposit_usd', 0) > 1000:
        tags.append('Medium LP')
    elif lp_features.get('total_deposit_usd', 0) > 0:
        tags.append('Small LP')
    
    # Holding behavior tags
    if lp_features.get('avg_hold_time_days', 0) > 90:
        tags.append('Long-term Holder')
    elif lp_features.get('avg_hold_time_days', 0) > 30:
        tags.append('Medium-term Holder')
    elif lp_features.get('avg_hold_time_days', 0) > 0:
        tags.append('Short-term Holder')
    
    # Swap-based tags
    if swap_features.get('total_swap_volume', 0) > 500000:
        tags.append('Whale Trader')
    elif swap_features.get('total_swap_volume', 0) > 50000:
        tags.append('Large Trader')
    elif swap_features.get('total_swap_volume', 0) > 5000:
        tags.append('Active Trader')
    elif swap_features.get('total_swap_volume', 0) > 0:
        tags.append('Casual Trader')
    
    # Frequency tags
    if swap_features.get('num_swaps', 0) > 100:
        tags.append('High Frequency Trader')
    elif swap_features.get('num_swaps', 0) > 20:
        tags.append('Regular Trader')
    
    # Diversity tags
    if swap_features.get('token_diversity_score', 0) > 100:
        tags.append('Diversified Trader')
    elif lp_features.get('unique_pools', 0) > 3:
        tags.append('Multi-Pool LP')
    
    return tags

def calculate_final_score(lp_score: float, swap_score: float, 
                         lp_features: Dict, swap_features: Dict) -> Tuple[float, Dict[str, Any]]:
    """
    Calculate final combined reputation score.
    
    Returns:
        Tuple of (final_score, complete_features)
    """
    # Weight LP and Swap scores (60% LP, 40% Swap)
    lp_weight = 0.6
    swap_weight = 0.4
    
    final_score = (lp_score * lp_weight) + (swap_score * swap_weight)
    
    # Generate user tags
    user_tags = generate_user_tags(lp_features, swap_features)
    
    # Combine all features
    complete_features = {
        **lp_features,
        **swap_features,
        'user_tags': user_tags,
        'lp_score': lp_score,
        'swap_score': swap_score,
        'final_score': final_score
    }
    
    return final_score, complete_features

def process_wallet_complete(wallet_data: Dict) -> Tuple[float, Dict[str, Any]]:
    """
    Complete wallet processing pipeline.
    
    This is the main function you need to implement in your server.
    
    Args:
        wallet_data: Raw wallet data from Kafka
        
    Returns:
        Tuple of (final_score, features)
    """
    # Step 1: Preprocess data
    df = preprocess_dex_transactions(wallet_data)
    
    if df.empty:
        return 0.0, {'error': 'No valid transactions found'}
    
    # Step 2: Calculate LP features and score
    lp_features = calculate_lp_features(df)
    lp_score, lp_breakdown = calculate_lp_score(lp_features)
    
    # Step 3: Calculate Swap features and score
    swap_features = calculate_swap_features(df)
    swap_score, swap_breakdown = calculate_swap_score(swap_features)
    
    # Step 4: Calculate final score and combine features
    final_score, complete_features = calculate_final_score(
        lp_score, swap_score, lp_features, swap_features
    )
    
    # Add score breakdowns
    complete_features['score_breakdown'] = {
        'lp_breakdown': lp_breakdown,
        'swap_breakdown': swap_breakdown
    }
    
    return final_score, complete_features

# Test complete pipeline
final_score, features = process_wallet_complete(sample_wallet_data)
print(f"\nFinal Score: {final_score:.2f}")
print(f"User Tags: {features['user_tags']}")
print(f"LP Score: {features['lp_score']:.2f}")
print(f"Swap Score: {features['swap_score']:.2f}")

# Expected output format for your server
expected_output = {
    'wallet_address': sample_wallet_data['wallet_address'],
    'zscore': f"{final_score:.18f}",  # 18 decimal places for blockchain compatibility
    'timestamp': int(datetime.now().timestamp()),
    'categories': [{
        'category': 'dexes',
        'score': final_score,
        'transaction_count': len(df),
        'features': {
            k: v for k, v in features.items() 
            if k not in ['score_breakdown', 'user_tags']
        }
    }]
}

print(f"\nExpected Kafka Output:")
print(json.dumps(expected_output, indent=2))