In [3]:
#!/usr/bin/env python3
"""
Market Screener for Grid Trading Pairs
Fetches all spot/USDT pairs from Binance and applies screening criteria
"""

import ccxt
import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import os
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class PairData:
    """Data structure for trading pair information"""
    symbol: str
    close_price: float
    hurst_exponent: float
    volume_7d: float
    volume_7d_usd: float
    last_update: datetime
    ohlcv_data: Optional[pd.DataFrame] = None

@dataclass
class ScreeningCriteria:
    """Screening criteria configuration with validation"""
    min_price: float = 1.0
    min_volume_7d_usd: float = 10_000_000
    max_hurst: float = 0.5  # Hurst < 0.5 for mean-reverting
    years_of_data: int = 1  # 1 year of data
    top_n_pairs: int = 5  # Top 5 pairs
    
    def __post_init__(self):
        """Validate criteria values"""
        if self.min_price <= 0:
            raise ValueError("min_price must be positive")
        if self.min_volume_7d_usd <= 0:
            raise ValueError("min_volume_7d_usd must be positive")
        if not 0 <= self.max_hurst <= 1:
            raise ValueError("max_hurst must be between 0 and 1")
        if self.years_of_data <= 0:
            raise ValueError("years_of_data must be positive")
        if self.top_n_pairs <= 0:
            raise ValueError("top_n_pairs must be positive")

class HurstCalculator:
    """Calculate Hurst exponent for time series analysis with robust error handling"""
    
    @staticmethod
    def calculate_hurst(price_series: pd.Series) -> float:
        """
        Calculate Hurst exponent using R/S analysis
        
        Args:
            price_series: Price series (typically close prices)
            
        Returns:
            Hurst exponent value (0-1, where <0.5 = mean-reverting, >0.5 = trending)
        """
        try:
            # Input validation
            if price_series is None or price_series.empty:
                logger.warning("Empty price series provided for Hurst calculation")
                return 0.5
            
            if len(price_series) < 20:  # Need at least 20 data points
                logger.warning(f"Insufficient data for Hurst calculation: {len(price_series)} points")
                return 0.5
            
            # Remove any NaN or infinite values
            clean_series = price_series.dropna()
            if len(clean_series) < 20:
                logger.warning("Insufficient clean data after removing NaN values")
                return 0.5
            
            # Calculate log returns
            log_returns = np.log(clean_series / clean_series.shift(1)).dropna()
            
            if len(log_returns) < 20:
                logger.warning("Insufficient log returns for Hurst calculation")
                return 0.5
            
            # Check for constant values (no variance)
            if log_returns.std() == 0:
                logger.warning("No variance in log returns, returning neutral Hurst")
                return 0.5
            
            # R/S analysis
            n = len(log_returns)
            mean_return = log_returns.mean()
            
            # Calculate cumulative deviations
            cumsum_deviations = (log_returns - mean_return).cumsum()
            
            # Calculate range R
            R = cumsum_deviations.max() - cumsum_deviations.min()
            
            # Calculate standard deviation S
            S = log_returns.std()
            
            if S == 0 or R == 0:
                logger.warning("Zero variance or range in R/S calculation")
                return 0.5
            
            # R/S ratio
            rs_ratio = R / S
            
            if rs_ratio <= 0:
                logger.warning("Invalid R/S ratio, returning neutral Hurst")
                return 0.5
            
            # Hurst exponent calculation
            hurst = np.log(rs_ratio) / np.log(n)
            
            # Clamp between 0 and 1
            hurst = max(0.0, min(1.0, hurst))
            
            return float(hurst)
            
        except Exception as e:
            logger.error(f"Error calculating Hurst exponent: {e}")
            return 0.5  # Return neutral value on error

class BinanceDataFetcher:
    """Enhanced data fetcher for Binance with error handling and caching"""
    
    def __init__(self, api_key: Optional[str] = None, api_secret: Optional[str] = None):
        # Configure exchange with API credentials if available
        exchange_config = {
            "enableRateLimit": True,
            "options": {"defaultType": "spot"}  # Make sure it's spot, not futures
        }
        
        # Add API credentials if provided
        if api_key and api_secret:
            exchange_config.update({
                "apiKey": api_key,
                "secret": api_secret
            })
            logger.info("🔑 Using API credentials for enhanced data access")
        else:
            logger.info("🌐 Using public endpoints only")
            
        self.exchange = ccxt.binance(exchange_config)
        self.cache_dir = Path('data_cache')
        self.cache_dir.mkdir(exist_ok=True)
        
    def get_all_spot_usdt_pairs(self) -> List[str]:
        """Get all spot USDT trading pairs from Binance with API access"""
        try:
            # Load markets to get comprehensive list
            markets = self.exchange.load_markets()
            usdt_pairs = []
            
            for symbol, market in markets.items():
                if (market.get('type') == 'spot' and 
                    market.get('quote') == 'USDT' and 
                    market.get('active', False)):
                    usdt_pairs.append(symbol)
            
            if not usdt_pairs:
                logger.warning("No USDT pairs found, using fallback list")
                return self._get_fallback_pairs()
            
            logger.info(f"Found {len(usdt_pairs)} active USDT spot pairs")
            
            # Sort by volume for better prioritization (only if we have API access)
            try:
                # Limit to top 100 pairs to avoid rate limits
                top_pairs = usdt_pairs[:100]
                tickers = self.exchange.fetch_tickers(top_pairs)
                usdt_pairs.sort(key=lambda x: tickers.get(x, {}).get('quoteVolume', 0), reverse=True)
                logger.info("Sorted pairs by trading volume")
            except Exception as e:
                logger.warning(f"Could not sort by volume: {e}")
            
            return usdt_pairs
            
        except Exception as e:
            logger.error(f"Error fetching spot pairs: {e}")
            return self._get_fallback_pairs()
    
    def _get_fallback_pairs(self) -> List[str]:
        """Get fallback list of major USDT pairs"""
        logger.info("Using fallback list of major USDT pairs")
        return [
            'BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'ADA/USDT', 'XRP/USDT',
            'SOL/USDT', 'DOT/USDT', 'DOGE/USDT', 'AVAX/USDT', 'SHIB/USDT',
            'MATIC/USDT', 'LTC/USDT', 'UNI/USDT', 'LINK/USDT', 'ATOM/USDT',
            'XLM/USDT', 'BCH/USDT', 'FIL/USDT', 'TRX/USDT', 'ETC/USDT',
            'VET/USDT', 'ICP/USDT', 'THETA/USDT', 'FTM/USDT', 'ALGO/USDT',
            'MANA/USDT', 'SAND/USDT', 'AXS/USDT', 'CRV/USDT', 'COMP/USDT',
            'MKR/USDT', 'SNX/USDT', 'YFI/USDT', 'AAVE/USDT', 'SUSHI/USDT'
        ]
    
    def fetch_ohlcv_data(self, symbol: str, timeframe: str = '1d', 
                        years: int = 1) -> Optional[pd.DataFrame]:
        """Fetch OHLCV data for a symbol with caching and robust error handling"""
        try:
            # Input validation
            if not symbol or not isinstance(symbol, str):
                logger.error(f"Invalid symbol: {symbol}")
                return None
            
            if years <= 0 or years > 5:  # Limit to reasonable range
                logger.warning(f"Invalid years parameter: {years}, using 1 year")
                years = 1
            
            # Check cache first
            cache_file = self.cache_dir / f"{symbol.replace('/', '_')}_{timeframe}_{years}y.json"
            
            if cache_file.exists():
                cache_time = datetime.fromtimestamp(cache_file.stat().st_mtime)
                if datetime.now() - cache_time < timedelta(hours=1):  # Cache valid for 1 hour
                    logger.info(f"Loading {symbol} from cache")
                    try:
                        with open(cache_file, 'r') as f:
                            data = json.load(f)
                            df = pd.DataFrame(data)
                            if not df.empty:
                                return df
                    except Exception as e:
                        logger.warning(f"Error loading cache for {symbol}: {e}")
            
            # Calculate start date for historical data
            end_date = datetime.now()
            start_date = end_date - timedelta(days=years * 365)
            
            # Convert to timestamp
            from_ts = self.exchange.parse8601(start_date.strftime('%Y-%m-%d %H:%M:%S'))
            
            # Fetch data with pagination
            ohlcv_list = self._fetch_ohlcv_paginated(symbol, timeframe, from_ts)
            
            if not ohlcv_list:
                logger.warning(f"No data fetched for {symbol}")
                return None
            
            # Convert to DataFrame
            df = pd.DataFrame(ohlcv_list, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            df = df.sort_values('timestamp').drop_duplicates(subset=['timestamp'])
            df = df.reset_index(drop=True)
            
            # Validate data quality
            if len(df) < 30:  # Need at least 30 days
                logger.warning(f"Insufficient data for {symbol}: {len(df)} days")
                return None
            
            # Cache the data
            try:
                with open(cache_file, 'w') as f:
                    json.dump(df.to_dict('records'), f, default=str)
            except Exception as e:
                logger.warning(f"Error caching data for {symbol}: {e}")
            
            logger.info(f"Fetched {len(df)} candles for {symbol}")
            return df
            
        except Exception as e:
            logger.error(f"Error fetching OHLCV for {symbol}: {e}")
            return None
    
    def _fetch_ohlcv_paginated(self, symbol: str, timeframe: str, from_ts: int) -> List:
        """Fetch OHLCV data with pagination and rate limiting"""
        ohlcv_list = []
        max_candles = 1000  # Reasonable limit for 1 year of daily data
        
        try:
            # First request
            ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, since=from_ts, limit=1000)
            if ohlcv:
                ohlcv_list.extend(ohlcv)
            
            # Pagination loop
            while len(ohlcv_list) < max_candles:
                if not ohlcv or len(ohlcv) < 1000:
                    break
                
                # Get the last timestamp and request next batch
                from_ts = ohlcv[-1][0] + 1  # Add 1ms to avoid duplicates
                ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, since=from_ts, limit=1000)
                
                if not ohlcv or len(ohlcv) == 0:
                    break
                
                ohlcv_list.extend(ohlcv)
                
                # Rate limiting
                time.sleep(0.1)
                
                # Safety check
                if len(ohlcv_list) > max_candles:
                    ohlcv_list = ohlcv_list[:max_candles]
                    break
            
            return ohlcv_list
            
        except Exception as e:
            logger.warning(f"Error in pagination for {symbol}: {e}")
            return ohlcv_list
    
    def get_current_ticker_data(self, symbol: str) -> Optional[Dict]:
        """Get current ticker data for a symbol using the exact pattern from your example"""
        try:
            ticker = self.exchange.fetch_ticker(symbol)
            return ticker
        except Exception as e:
            logger.warning(f"Error fetching ticker for {symbol}: {e}")
            return None
    
    def calculate_volume_7d_usd(self, df: pd.DataFrame) -> float:
        """Calculate 7-day trading volume in USD with validation"""
        try:
            if df is None or df.empty:
                logger.warning("Empty DataFrame provided for volume calculation")
                return 0.0
            
            if len(df) < 7:
                logger.warning(f"Insufficient data for 7d volume calculation: {len(df)} days")
                return 0.0
            
            # Validate required columns
            required_cols = ['volume', 'close']
            if not all(col in df.columns for col in required_cols):
                logger.error(f"Missing required columns: {required_cols}")
                return 0.0
            
            # Get last 7 days of data
            last_7_days = df.tail(7)
            
            # Check for valid data
            if last_7_days['volume'].isna().any() or last_7_days['close'].isna().any():
                logger.warning("NaN values found in volume or close data")
                return 0.0
            
            # Calculate volume in USD (volume * close_price)
            volume_usd = (last_7_days['volume'] * last_7_days['close']).sum()
            
            # Validate result
            if volume_usd < 0:
                logger.warning("Negative volume calculated, returning 0")
                return 0.0
            
            return float(volume_usd)
            
        except Exception as e:
            logger.error(f"Error calculating 7d volume: {e}")
            return 0.0

class MarketScreener:
    """Main market screener class - clean and simple"""
    
    def __init__(self, api_key: Optional[str] = None, api_secret: Optional[str] = None):
        self.data_fetcher = BinanceDataFetcher(api_key, api_secret)
        self.hurst_calculator = HurstCalculator()
        self.criteria = ScreeningCriteria()
        self.results: List[PairData] = []
        
    def screen_single_pair(self, symbol: str) -> Optional[PairData]:
        """Screen a single trading pair with comprehensive validation and error handling"""
        try:
            # Input validation
            if not symbol or not isinstance(symbol, str):
                logger.error(f"Invalid symbol: {symbol}")
                return None
            
            logger.info(f"Screening {symbol}...")
            
            # Step 1: Get current ticker data for quick filters
            ticker_data = self.data_fetcher.get_current_ticker_data(symbol)
            if not ticker_data:
                logger.warning(f"No ticker data available for {symbol}")
                return None
            
            # Step 2: Price filter (quick)
            current_price = ticker_data.get('last', 0)
            if current_price <= 0:
                logger.warning(f"Invalid price for {symbol}: {current_price}")
                return None
            
            if current_price < self.criteria.min_price:
                logger.debug(f"{symbol} filtered out: price ${current_price:.4f} < ${self.criteria.min_price}")
                return None
            
            # Step 3: Volume pre-filter (quick)
            volume_24h = ticker_data.get('quoteVolume', 0)
            if volume_24h <= 0:
                logger.warning(f"Invalid volume for {symbol}: {volume_24h}")
                return None
            
            # Quick volume check (24h volume * 7 should be roughly 7d volume)
            estimated_7d_volume = volume_24h * 7
            if estimated_7d_volume < self.criteria.min_volume_7d_usd * 0.5:  # Conservative estimate
                logger.debug(f"{symbol} filtered out: estimated 7d volume ${estimated_7d_volume:,.0f} too low")
                return None
            
            # Step 4: Fetch OHLCV data for detailed analysis
            df = self.data_fetcher.fetch_ohlcv_data(symbol, years=self.criteria.years_of_data)
            if df is None or df.empty:
                logger.warning(f"No OHLCV data available for {symbol}")
                return None
            
            # Step 5: Data quality check
            if len(df) < 30:  # At least 30 days of data
                logger.warning(f"Insufficient data for {symbol}: {len(df)} days")
                return None
            
            # Step 6: Calculate 7-day volume in USD (detailed check)
            volume_7d_usd = self.data_fetcher.calculate_volume_7d_usd(df)
            if volume_7d_usd < self.criteria.min_volume_7d_usd:
                logger.debug(f"{symbol} filtered out: 7d volume ${volume_7d_usd:,.0f} < ${self.criteria.min_volume_7d_usd:,.0f}")
                return None
            
            # Step 7: Calculate Hurst exponent
            hurst = self.hurst_calculator.calculate_hurst(df['close'])
            if hurst >= self.criteria.max_hurst:
                logger.debug(f"{symbol} filtered out: Hurst {hurst:.4f} >= {self.criteria.max_hurst}")
                return None
            
            # Step 8: Calculate 7-day volume in base currency
            volume_7d = df.tail(7)['volume'].sum()
            
            # Step 9: Create pair data
            pair_data = PairData(
                symbol=symbol,
                close_price=current_price,
                hurst_exponent=hurst,
                volume_7d=volume_7d,
                volume_7d_usd=volume_7d_usd,
                last_update=datetime.now(),
                ohlcv_data=df
            )
            
            logger.info(f"✅ {symbol} passed all filters: "
                       f"Price=${current_price:.4f}, "
                       f"Hurst={hurst:.4f}, "
                       f"7d Volume=${volume_7d_usd:,.0f}")
            
            return pair_data
            
        except Exception as e:
            logger.error(f"Error screening {symbol}: {e}")
            return None
    
    def screen_all_pairs(self, max_workers: int = 3) -> List[PairData]:
        """Screen all USDT spot pairs with parallel processing and comprehensive error handling"""
        try:
            # Input validation
            if max_workers <= 0 or max_workers > 10:
                logger.warning(f"Invalid max_workers: {max_workers}, using 3")
                max_workers = 3
            
            # Get all USDT pairs
            pairs = self.data_fetcher.get_all_spot_usdt_pairs()
            if not pairs:
                logger.error("No pairs found to screen")
                return []
            
            # Limit pairs for performance (focus on top liquid pairs)
            max_pairs = 50  # Screen top 50 most liquid pairs
            if len(pairs) > max_pairs:
                pairs = pairs[:max_pairs]
                logger.info(f"Limited screening to top {max_pairs} most liquid pairs")
            
            logger.info(f"Starting screening of {len(pairs)} pairs with {max_workers} workers...")
            
            # Screen pairs in parallel
            results = []
            completed = 0
            total = len(pairs)
            
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                # Submit all tasks
                future_to_pair = {
                    executor.submit(self.screen_single_pair, pair): pair 
                    for pair in pairs
                }
                
                # Process completed tasks
                for future in as_completed(future_to_pair):
                    pair = future_to_pair[future]
                    completed += 1
                    
                    try:
                        result = future.result(timeout=30)  # 30 second timeout per pair
                        if result:
                            results.append(result)
                            logger.info(f"Progress: {completed}/{total} - Found {len(results)} valid pairs")
                    except Exception as e:
                        logger.error(f"Error processing {pair}: {e}")
                        continue
                    
                    # Progress update every 10 pairs
                    if completed % 10 == 0:
                        logger.info(f"Progress: {completed}/{total} pairs processed")
            
            # Sort by volume (descending) and take top N
            results.sort(key=lambda x: x.volume_7d_usd, reverse=True)
            top_results = results[:self.criteria.top_n_pairs]
            
            self.results = top_results
            logger.info(f"Screening completed. {len(results)} pairs passed filters.")
            logger.info(f"Top {len(top_results)} pairs selected.")
            
            return top_results
            
        except Exception as e:
            logger.error(f"Error in screening process: {e}")
            return []
    
    def get_screening_summary(self) -> Dict:
        """Get summary of screening results"""
        if not self.results:
            return {"message": "No results available"}
        
        summary = {
            "total_pairs_screened": len(self.results),
            "criteria": {
                "min_price": self.criteria.min_price,
                "min_volume_7d_usd": self.criteria.min_volume_7d_usd,
                "max_hurst": self.criteria.max_hurst,
                "years_of_data": self.criteria.years_of_data
            },
            "top_pairs": []
        }
        
        for i, pair in enumerate(self.results, 1):
            summary["top_pairs"].append({
                "rank": i,
                "symbol": pair.symbol,
                "close_price": pair.close_price,
                "hurst_exponent": pair.hurst_exponent,
                "volume_7d_usd": pair.volume_7d_usd,
                "volume_7d": pair.volume_7d
            })
        
        return summary
    
    def save_results(self, filename: str = "screening_results.json"):
        """Save screening results to file"""
        try:
            results_data = {
                "timestamp": datetime.now().isoformat(),
                "summary": self.get_screening_summary(),
                "detailed_results": []
            }
            
            for pair in self.results:
                results_data["detailed_results"].append({
                    "symbol": pair.symbol,
                    "close_price": pair.close_price,
                    "hurst_exponent": pair.hurst_exponent,
                    "volume_7d_usd": pair.volume_7d_usd,
                    "volume_7d": pair.volume_7d,
                    "last_update": pair.last_update.isoformat()
                })
            
            with open(filename, 'w') as f:
                json.dump(results_data, f, indent=2, default=str)
            
            logger.info(f"Results saved to {filename}")
            
        except Exception as e:
            logger.error(f"Error saving results: {e}")

def main():
    """Main function to run the market screener - clean and simple"""
    try:
        logger.info("🚀 Starting Market Screener for Grid Trading Pairs")
        logger.info("=" * 60)
        
        # Load API credentials
        api_key, api_secret = _load_api_credentials()
        
        # Initialize screener
        screener = MarketScreener(api_key=api_key, api_secret=api_secret)
        
        # Run screening
        start_time = time.time()
        results = screener.screen_all_pairs(max_workers=3)
        
        end_time = time.time()
        duration = end_time - start_time
        
        # Display results
        _display_results(results, duration)
        
        # Save results
        screener.save_results()
        
        # Display summary
        _display_summary(screener)
        
    except KeyboardInterrupt:
        logger.info("\n⏹️  Screening interrupted by user")
    except Exception as e:
        logger.error(f"❌ Fatal error in main: {e}")
        raise

def _load_api_credentials() -> Tuple[Optional[str], Optional[str]]:
    """Load API credentials with error handling"""
    try:
        from api_config import get_api_credentials
        api_key, api_secret = get_api_credentials()
        
        if not api_key or not api_secret:
            logger.error("❌ API credentials not configured!")
            logger.info("Please update api_config.py with your Binance API credentials")
            return None, None
        else:
            logger.info("✅ API credentials loaded successfully")
            return api_key, api_secret
            
    except ImportError:
        logger.warning("⚠️  api_config.py not found, running without API credentials")
        return None, None

def _display_results(results: List[PairData], duration: float) -> None:
    """Display screening results"""
    logger.info(f"\n⏱️  Screening completed in {duration:.2f} seconds")
    logger.info(f"📊 Found {len(results)} pairs that passed all criteria")
    
    if results:
        logger.info("\n🏆 TOP PAIRS FOR GRID TRADING:")
        logger.info("-" * 60)
        
        for i, pair in enumerate(results, 1):
            logger.info(f"{i}. {pair.symbol}")
            logger.info(f"   💰 Price: ${pair.close_price:.4f}")
            logger.info(f"   📈 Hurst: {pair.hurst_exponent:.4f}")
            logger.info(f"   💵 7d Volume: ${pair.volume_7d_usd:,.0f}")
            logger.info(f"   📊 7d Volume (base): {pair.volume_7d:,.2f}")
            logger.info("")
    else:
        logger.warning("❌ No pairs found that meet the criteria")

def _display_summary(screener: MarketScreener) -> None:
    """Display screening summary"""
    summary = screener.get_screening_summary()
    if "criteria" in summary:
        logger.info("📋 SCREENING SUMMARY:")
        logger.info(f"   Min Price: ${summary['criteria']['min_price']}")
        logger.info(f"   Min 7d Volume: ${summary['criteria']['min_volume_7d_usd']:,.0f}")
        logger.info(f"   Max Hurst: {summary['criteria']['max_hurst']}")
        logger.info(f"   Data Period: {summary['criteria']['years_of_data']} years")

if __name__ == "__main__":
    main()


2025-09-13 14:40:22,046 - INFO - 🚀 Starting Market Screener for Grid Trading Pairs
2025-09-13 14:40:22,051 - INFO - 🌐 Using public endpoints only
2025-09-13 14:40:24,367 - INFO - Found 419 active USDT spot pairs
2025-09-13 14:40:24,514 - INFO - Sorted pairs by trading volume
2025-09-13 14:40:24,515 - INFO - Limited screening to top 50 most liquid pairs
2025-09-13 14:40:24,515 - INFO - Starting screening of 50 pairs with 3 workers...
2025-09-13 14:40:24,516 - INFO - Screening USDC/USDT...
2025-09-13 14:40:24,519 - INFO - Screening ETH/USDT...
2025-09-13 14:40:24,519 - INFO - Screening BTC/USDT...
2025-09-13 14:40:24,660 - INFO - Screening SOL/USDT...
2025-09-13 14:40:24,969 - INFO - Fetched 365 candles for SOL/USDT
2025-09-13 14:40:24,974 - INFO - Screening DOGE/USDT...
2025-09-13 14:40:25,069 - INFO - Fetched 365 candles for ETH/USDT
2025-09-13 14:40:25,072 - INFO - Screening XRP/USDT...
2025-09-13 14:40:25,074 - INFO - Fetched 365 candles for BTC/USDT
2025-09-13 14:40:25,076 - INFO - 