In [1]:
import os
import csv
import requests
import boto3
from io import StringIO
from datetime import datetime, timedelta
import json
import pytz
import concurrent.futures
from threading import Lock
import time

# Add dotenv support for local development
try:
    from dotenv import load_dotenv
    load_dotenv()  # This will load your .env file from the same directory
    print("✅ .env file loaded successfully")
except ImportError:
    print("⚠️  python-dotenv not installed - using system environment variables")
except Exception as e:
    print(f"⚠️  Could not load .env file: {e}")

# Thread-safe counters
progress_lock = Lock()
processed_count = 0
api_call_count = 0

# Test event for when no event is provided (Jupyter/local testing)
TEST_EVENT = {
    "environment": "test",
    "max_workers": 10,
    "ticker_limit": 100,
    "page_limit": 5
}

def make_polygon_request(url, headers, symbol=None, max_retries=3):
    """Make a Polygon API request with retry logic"""
    global api_call_count
    
    for attempt in range(max_retries):
        try:
            with progress_lock:
                api_call_count += 1
                
            response = requests.get(url, headers=headers, timeout=15)
            
            if response.status_code == 200:
                data = response.json()
                if data.get('status') == 'OK':
                    return data
                else:
                    print(f"API returned non-OK status for {symbol}: {data.get('status')}")
                    return None
            elif response.status_code == 429:  # Rate limited
                wait_time = 2 ** attempt
                print(f"Rate limited on {symbol}, waiting {wait_time}s...")
                time.sleep(wait_time)
                continue
            else:
                print(f"HTTP {response.status_code} for {symbol}: {response.text[:200]}")
                return None
                
        except Exception as e:
            if attempt == max_retries - 1:
                print(f"Final attempt failed for {symbol}: {e}")
                return None
            else:
                time.sleep(1)
                continue
    
    return None

def get_all_us_stocks(api_key, page_limit=None, per_page_limit=1000):
    """Get all US stocks from Polygon tickers endpoint"""
    headers = {"Authorization": f"Bearer {api_key}"}
    all_tickers = []
    next_url = f"https://api.polygon.io/v3/reference/tickers?market=stocks&locale=us&active=true&limit={per_page_limit}"
    
    page_count = 0
    max_pages = page_limit or 100  # Default safety limit, but configurable
    while next_url and page_count < max_pages:
        page_count += 1
        print(f"Fetching page {page_count} of tickers...")
        
        data = make_polygon_request(next_url, headers, "tickers_page")
        if not data or 'results' not in data:
            break
            
        tickers = data['results']
        all_tickers.extend([ticker['ticker'] for ticker in tickers])
        
        # Get next page URL
        next_url = data.get('next_url')
        if next_url:
            # Check if next_url is already a full URL or just a path
            if next_url.startswith('https://'):
                # It's already a full URL, just add the API key
                if '?' in next_url:
                    next_url = f"{next_url}&apikey={api_key}"
                else:
                    next_url = f"{next_url}?apikey={api_key}"
            else:
                # It's just a path, prepend the domain
                if '?' in next_url:
                    next_url = f"https://api.polygon.io{next_url}&apikey={api_key}"
                else:
                    next_url = f"https://api.polygon.io{next_url}?apikey={api_key}"
        
        print(f"Page {page_count}: Found {len(tickers)} tickers, total: {len(all_tickers)}")
        
        # Small delay between pages
        time.sleep(0.1)
    
    if page_count >= max_pages:
        print(f"Reached page limit of {max_pages}. Total tickers collected: {len(all_tickers)}")
    else:
        print(f"Completed all pages. Total US stocks found: {len(all_tickers)}")
    return all_tickers

def get_essential_stock_data(symbol, api_key):
    """Get essential trading data with minimal API calls (2 calls per ticker)"""
    headers = {"Authorization": f"Bearer {api_key}"}
    stock_data = {'ticker': symbol}
    
    # Get current time for column naming
    cst = pytz.timezone('America/Chicago')
    now_cst = datetime.now(cst)
    time_str = now_cst.strftime('%H:%M')
    
    # 1. Company Details (for name and market cap)
    company_url = f"https://api.polygon.io/v3/reference/tickers/{symbol}"
    company_data = make_polygon_request(company_url, headers, symbol)
    if company_data and 'results' in company_data:
        result = company_data['results']
        stock_data.update({
            'name': result.get('name', 'N/A'),
            'market_cap_millions': result.get('market_cap', 0) / 1_000_000 if result.get('market_cap') else 0,
            'primary_exchange': result.get('primary_exchange', 'N/A')
        })
    else:
        stock_data.update({
            'name': 'N/A',
            'market_cap_millions': 0,
            'primary_exchange': 'N/A'
        })
    
    # 2. Previous Day Aggregates (OHLCV)
    prev_day_url = f"https://api.polygon.io/v2/aggs/ticker/{symbol}/prev"
    prev_data = make_polygon_request(prev_day_url, headers, symbol)
    if prev_data and 'results' in prev_data and len(prev_data['results']) > 0:
        result = prev_data['results'][0]
        
        # Extract OHLCV data
        open_price = result.get('o')
        high_price = result.get('h')
        low_price = result.get('l')
        close_price = result.get('c')  # This is previous close
        volume = result.get('v')
        
        stock_data.update({
            'open': f"{open_price:.2f}" if open_price is not None else 'N/A',
            'high': f"{high_price:.2f}" if high_price is not None else 'N/A',
            'low': f"{low_price:.2f}" if low_price is not None else 'N/A',
            'previous_close': f"{close_price:.2f}" if close_price is not None else 'N/A',
            'volume': f"{int(volume)}" if volume is not None else 'N/A'
        })
        
        # Current price with timestamp (using previous close as proxy)
        current_price_col = f'current_price_{time_str}'
        stock_data[current_price_col] = stock_data['previous_close']
        
        # Calculate open_pct_chg (current price vs open price)
        open_pct_chg_col = f'open_pct_chg_{time_str}'
        if open_price is not None and close_price is not None and open_price != 0:
            pct_change = ((close_price - open_price) / open_price) * 100
            stock_data[open_pct_chg_col] = f"{pct_change:+.2f}%"
        else:
            stock_data[open_pct_chg_col] = 'N/A'
        
        # Store raw values for potential previous_pct_chg calculation
        stock_data['_raw_current_price'] = close_price if close_price is not None else None
        stock_data['_current_time'] = time_str
        
    else:
        stock_data.update({
            'open': 'N/A',
            'high': 'N/A', 
            'low': 'N/A',
            'previous_close': 'N/A',
            'volume': 'N/A'
        })
        
        # Still need the timestamped columns even if no data
        current_price_col = f'current_price_{time_str}'
        open_pct_chg_col = f'open_pct_chg_{time_str}'
        stock_data[current_price_col] = 'N/A'
        stock_data[open_pct_chg_col] = 'N/A'
        stock_data['_raw_current_price'] = None
        stock_data['_current_time'] = time_str
    
    # Format market cap
    if stock_data['market_cap_millions'] > 0:
        stock_data['market_cap_millions'] = f"{stock_data['market_cap_millions']:.2f}"
    else:
        stock_data['market_cap_millions'] = 'N/A'
    
    return stock_data

def process_single_ticker(symbol, api_key):
    """Process a single ticker with minimal API calls"""
    global processed_count
    
    try:
        stock_data = get_essential_stock_data(symbol, api_key)
        
        # Update progress counter
        with progress_lock:
            processed_count += 1
            if processed_count % 100 == 0:
                print(f"Processed {processed_count} tickers... (API calls: {api_call_count})")
        
        return stock_data, None
        
    except Exception as e:
        error_msg = f"Error processing {symbol}: {str(e)}"
        print(error_msg)
        return None, error_msg

def lambda_handler(event=None, context=None):
    global processed_count, api_call_count
    processed_count = 0
    api_call_count = 0
    
    # Auto-detect mode: if no event provided, assume Jupyter/local mode
    jupyter_mode = event is None
    
    if jupyter_mode:
        event = TEST_EVENT
        print("🧪 No event provided - Running in JUPYTER/LOCAL mode with test event")
        print(f"Test event: {event}")
    else:
        print("🚀 Event provided - Running in LAMBDA mode")
    
    # Initialize AWS clients (skip if in Jupyter mode and no AWS setup)
    try:
        s3 = boto3.client('s3')
        sns = boto3.client('sns')
        aws_available = True
        if jupyter_mode:
            print("✅ AWS clients initialized successfully")
    except Exception as e:
        if jupyter_mode:
            print(f"⚠️  AWS clients not available (expected in Jupyter): {e}")
            s3 = None
            sns = None
            aws_available = False
        else:
            raise e
    
    # Get environment variables
    bucket_name = os.getenv('BUCKET_NAME')
    polygon_api_key = os.getenv('POLYGON_API_KEY')
    sns_topic_arn = os.getenv('SNS_TOPIC_ARN')
    
    if not polygon_api_key:
        raise ValueError("POLYGON_API_KEY environment variable is required")
    
    # Configuration
    is_test = event.get('environment') == 'test'
    max_workers = event.get('max_workers', 20)
    ticker_limit = event.get('ticker_limit', 5000 if is_test else None)
    page_limit = event.get('page_limit', None)
    
    # Generate file paths
    input_date = datetime.now().strftime('%Y%m%d')
    timestamp = datetime.now().strftime('%H%M')
    
    if jupyter_mode:
        # Jupyter mode: Use simple daily filename for appending
        output_file_name = f'stock_data_{input_date}.csv'
        output_file_key = output_file_name
    else:
        # Lambda mode: Use S3 path structure
        output_file_key = f'stock_data/{input_date}/stock_data_{input_date}.csv'
    
    # Check if we're appending to existing data
    existing_data = {}
    
    if jupyter_mode:
        # Try to read existing local file
        try:
            with open(output_file_name, 'r', encoding='utf-8') as f:
                existing_content = f.read()
                existing_reader = csv.DictReader(StringIO(existing_content))
                for row in existing_reader:
                    existing_data[row['ticker']] = row
            print(f"📂 Found existing local file with {len(existing_data)} tickers")
        except FileNotFoundError:
            print(f"📂 No existing local file found, creating new: {output_file_name}")
            existing_data = {}
        except Exception as e:
            print(f"📂 Error reading existing file: {e}")
            existing_data = {}
    elif aws_available:
        # Try to read existing S3 file
        try:
            existing_obj = s3.get_object(Bucket=bucket_name, Key=output_file_key)
            existing_content = existing_obj['Body'].read().decode('utf-8')
            existing_reader = csv.DictReader(StringIO(existing_content))
            for row in existing_reader:
                existing_data[row['ticker']] = row
            print(f"📂 Found existing S3 file with {len(existing_data)} tickers")
        except Exception as e:
            print(f"📂 No existing S3 file found, creating new: {e}")
            existing_data = {}
    
    try:
        print("Starting essential stock data collection...")
        start_time = time.time()
        
        # Get all US stock tickers
        if is_test:
            tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA'][:ticker_limit] if ticker_limit else ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA']
            print(f"Test mode: Processing {len(tickers)} sample tickers")
        else:
            print("Fetching all US stock tickers...")
            all_tickers = get_all_us_stocks(polygon_api_key, page_limit=page_limit)
            tickers = all_tickers[:ticker_limit] if ticker_limit else all_tickers
            print(f"Production mode: Processing {len(tickers)} tickers")
        
        # Process all tickers in parallel
        valid_data = []
        errors = []
        
        print(f"Starting parallel processing with {max_workers} workers...")
        print(f"Making 2 API calls per ticker (company info + OHLCV data)")
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Submit all tasks
            future_to_ticker = {
                executor.submit(process_single_ticker, ticker, polygon_api_key): ticker
                for ticker in tickers
            }
            
            # Collect results as they complete
            for future in concurrent.futures.as_completed(future_to_ticker):
                ticker = future_to_ticker[future]
                try:
                    stock_data, error = future.result()
                    if stock_data:
                        # Calculate previous_pct_chg if we have existing data
                        if ticker in existing_data:
                            current_time = stock_data['_current_time']
                            current_price = stock_data['_raw_current_price']
                            
                            # Find the most recent previous current_price column
                            prev_price_col = None
                            prev_price_value = None
                            
                            for col, value in existing_data[ticker].items():
                                if col.startswith('current_price_') and col != f'current_price_{current_time}':
                                    if value != 'N/A':
                                        try:
                                            prev_price_value = float(value)
                                            prev_price_col = col
                                        except:
                                            continue
                            
                            # Calculate previous_pct_chg
                            prev_pct_chg_col = f'previous_pct_chg_{current_time}'
                            if prev_price_value is not None and current_price is not None and prev_price_value != 0:
                                pct_change = ((current_price - prev_price_value) / prev_price_value) * 100
                                stock_data[prev_pct_chg_col] = f"{pct_change:+.2f}%"
                            else:
                                stock_data[prev_pct_chg_col] = 'N/A'
                            
                            # Merge with existing data (preserve old columns, add new ones)
                            merged_data = existing_data[ticker].copy()
                            merged_data.update(stock_data)
                            stock_data = merged_data
                        else:
                            # New ticker, add empty previous_pct_chg
                            current_time = stock_data['_current_time']
                            prev_pct_chg_col = f'previous_pct_chg_{current_time}'
                            stock_data[prev_pct_chg_col] = 'N/A'
                        
                        # Remove internal helper fields
                        stock_data.pop('_raw_current_price', None)
                        stock_data.pop('_current_time', None)
                        
                        valid_data.append(stock_data)
                    if error:
                        errors.append(error)
                except Exception as e:
                    error_msg = f"Exception processing {ticker}: {e}"
                    print(error_msg)
                    errors.append(error_msg)
        
        processing_time = time.time() - start_time
        print(f"Processing completed in {processing_time:.1f} seconds")
        print(f"Valid records: {len(valid_data)}, Errors: {len(errors)}, API calls: {api_call_count}")
        
        if not valid_data:
            raise Exception("No valid data collected")
        
        # Get the current time for the dynamic column names
        cst = pytz.timezone('America/Chicago')
        now_cst = datetime.now(cst)
        time_str = now_cst.strftime('%H:%M')
        current_price_col = f'current_price_{time_str}'
        open_pct_chg_col = f'open_pct_chg_{time_str}'
        prev_pct_chg_col = f'previous_pct_chg_{time_str}'
        
        # Collect all possible columns from the data
        all_columns = set()
        for row in valid_data:
            all_columns.update(row.keys())
        
        # Define base column order
        base_columns = [
            'ticker',
            'name', 
            'market_cap_millions',
            'volume',
            'previous_close',
            'open',
            'high',
            'low'
        ]
        
        # Separate timestamped columns and sort them chronologically
        current_price_cols = sorted([col for col in all_columns if col.startswith('current_price_')])
        open_pct_chg_cols = sorted([col for col in all_columns if col.startswith('open_pct_chg_')])
        prev_pct_chg_cols = sorted([col for col in all_columns if col.startswith('previous_pct_chg_')])
        
        # Build final column order: base columns + timestamped columns in chronological order
        fieldnames = base_columns.copy()
        
        # Add timestamped columns in sets, chronologically
        all_timestamps = set()
        for col in current_price_cols + open_pct_chg_cols + prev_pct_chg_cols:
            timestamp = col.split('_')[-1]  # Extract HH:MM
            all_timestamps.add(timestamp)
        
        # Sort timestamps chronologically
        sorted_timestamps = sorted(all_timestamps)
        
        # Add columns for each timestamp in order: current_price, open_pct_chg, previous_pct_chg
        for timestamp in sorted_timestamps:
            for col_type in ['current_price', 'open_pct_chg', 'previous_pct_chg']:
                col_name = f'{col_type}_{timestamp}'
                if col_name in all_columns:
                    fieldnames.append(col_name)
        
        # Create CSV output
        output = StringIO()
        writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction='ignore')
        writer.writeheader()
        
        # Sort by market cap (descending) then by ticker
        def sort_key(item):
            try:
                market_cap = float(item.get('market_cap_millions', 0)) if item.get('market_cap_millions') != 'N/A' else 0
                return (-market_cap, item.get('ticker', ''))
            except:
                return (0, item.get('ticker', ''))
        
        valid_data.sort(key=sort_key)
        
        for row in valid_data:
            writer.writerow(row)
        
        # Handle output based on mode
        if jupyter_mode:
            # Save locally in Jupyter mode with append functionality
            with open(output_file_name, 'w', newline='', encoding='utf-8') as f:
                f.write(output.getvalue())
            print(f"📁 Data {'appended to' if existing_data else 'saved as'} local file: {output_file_name}")
            file_location = output_file_name
        elif aws_available:
            # Upload to S3 in Lambda mode
            print(f"Uploading data to S3: {output_file_key}")
            s3.put_object(
                Bucket=bucket_name,
                Key=output_file_key,
                Body=output.getvalue(),
                ContentType='text/csv'
            )
            file_location = output_file_key
        else:
            # Fallback: save locally even in Lambda mode if AWS not available
            fallback_filename = f'stock_data_{input_date}_{timestamp}.csv'
            with open(fallback_filename, 'w', newline='', encoding='utf-8') as f:
                f.write(output.getvalue())
            print(f"📁 AWS not available, data saved locally as: {fallback_filename}")
            file_location = fallback_filename
        
        # Generate presigned URL (only if AWS available)
        presigned_url = None
        if aws_available and s3:
            try:
                presigned_url = s3.generate_presigned_url(
                    'get_object',
                    Params={'Bucket': bucket_name, 'Key': output_file_key},
                    ExpiresIn=604800  # 7 days
                )
            except Exception as e:
                print(f"Could not generate presigned URL: {e}")
        
        # Calculate statistics
        success_rate = (len(valid_data) / len(tickers)) * 100 if tickers else 0
        avg_api_calls_per_ticker = api_call_count / len(tickers) if tickers else 0
        
        # Send notification (only if AWS available)
        if aws_available and sns:
            message = (
                f'📊 ESSENTIAL STOCK DATA COLLECTION COMPLETE\n\n'
                f'🎯 STATISTICS:\n'
                f'• Environment: {"TEST" if is_test else "PRODUCTION"}\n'
                f'• Total tickers processed: {len(tickers):,}\n'
                f'• Successful records: {len(valid_data):,}\n'
                f'• Failed records: {len(errors):,}\n'
                f'• Success rate: {success_rate:.1f}%\n'
                f'• Processing time: {processing_time:.1f} seconds\n'
                f'• API calls made: {api_call_count:,} (2 per ticker)\n'
                f'• Avg API calls per ticker: {avg_api_calls_per_ticker:.1f}\n\n'
                f'📈 DATA INCLUDES:\n'
                f'• Ticker symbol\n'
                f'• Company name\n'
                f'• Market cap (millions)\n'
                f'• Primary exchange\n'
                f'• OHLCV data (Open, High, Low, Close, Volume)\n'
                f'• Current price\n\n'
                f'💾 Download link (expires in 7 days):\n{presigned_url or "N/A"}'
            )
            
            if errors and len(errors) <= 10:
                message += f'\n\n❌ ERRORS:\n' + '\n'.join(errors[:10])
            elif errors:
                message += f'\n\n❌ First 10 errors:\n' + '\n'.join(errors[:10])
            
            try:
                sns.publish(
                    TopicArn=sns_topic_arn,
                    Subject=f'📊 Essential Stock Data Complete - {len(valid_data):,} stocks',
                    Message=message
                )
            except Exception as e:
                print(f"Could not send SNS notification: {e}")
        
        # Print summary for Jupyter mode
        if jupyter_mode:
            print("\n" + "="*60)
            print("🎉 DATA COLLECTION COMPLETE!")
            print("="*60)
            print(f"📊 Processed {len(tickers):,} tickers")
            print(f"✅ Successful: {len(valid_data):,} ({success_rate:.1f}%)")
            print(f"❌ Errors: {len(errors):,}")
            print(f"⏱️  Time: {processing_time:.1f}s")
            print(f"🔌 API calls: {api_call_count:,} (2 per ticker)")
            print(f"📁 File: {file_location}")
            print(f"🔄 Mode: {'Appended to existing' if existing_data else 'Created new'} daily file")
            if errors:
                print(f"\n🚨 Sample errors: {errors[:3]}")
        
        return {
            'statusCode': 200,
            'body': {
                'message': 'Successfully collected essential stock data',
                'environment': 'test' if is_test else 'production',
                'mode': 'jupyter' if jupyter_mode else 'lambda',
                'append_mode': len(existing_data) > 0,
                'existing_tickers': len(existing_data),
                'total_tickers': len(tickers),
                'successful_records': len(valid_data),
                'errors': len(errors),
                'success_rate': f"{success_rate:.1f}%",
                'processing_time_seconds': round(processing_time, 1),
                'api_calls_made': api_call_count,
                'output_file': file_location,
                'download_url': presigned_url
            }
        }
        
    except Exception as e:
        error_message = f'Error collecting essential stock data: {str(e)}'
        print(f"Exception: {error_message}")
        
        # Send error notification (only if AWS available)
        if aws_available and sns:
            try:
                sns.publish(
                    TopicArn=sns_topic_arn,
                    Subject='❌ Essential Stock Data Collection Failed',
                    Message=f'Error: {error_message}\n\nProcessed {processed_count} tickers before failure.\nAPI calls made: {api_call_count}'
                )
            except Exception as sns_error:
                print(f"Could not send error notification: {sns_error}")
        
        return {
            'statusCode': 500,
            'body': {'error': error_message, 'processed_count': processed_count, 'api_calls': api_call_count}
        }
result = lambda_handler()
# For Lambda: gets called automatically with event

✅ .env file loaded successfully
🧪 No event provided - Running in JUPYTER/LOCAL mode with test event
Test event: {'environment': 'test', 'max_workers': 10, 'ticker_limit': 100, 'page_limit': 5}
✅ AWS clients initialized successfully
📂 No existing local file found, creating new: stock_data_20250604.csv
Starting essential stock data collection...
Test mode: Processing 5 sample tickers
Starting parallel processing with 10 workers...
Making 2 API calls per ticker (company info + OHLCV data)
Processing completed in 0.5 seconds
Valid records: 5, Errors: 0, API calls: 10
📁 Data saved as local file: stock_data_20250604.csv

🎉 DATA COLLECTION COMPLETE!
📊 Processed 5 tickers
✅ Successful: 5 (100.0%)
❌ Errors: 0
⏱️  Time: 0.5s
🔌 API calls: 10 (2 per ticker)
📁 File: stock_data_20250604.csv
🔄 Mode: Created new daily file


In [3]:
import os
import csv
import json
import asyncio
import pandas as pd
from datetime import datetime, timedelta
import pytz
from collections import defaultdict
import boto3
from io import StringIO
import logging
from typing import Dict, List, Set
import time

# Polygon WebSocket imports
from polygon import WebSocketClient
from polygon.websocket.models import WebSocketMessage, EquityTrade, EquityQuote, EquityAgg
from polygon import RESTClient

# Load environment variables
try:
    from dotenv import load_dotenv
    load_dotenv()
    print("✅ .env file loaded successfully")
except ImportError:
    print("⚠️  python-dotenv not installed - using system environment variables")

# Configuration
AWS_S3_ENABLED = True  # Toggle S3 upload
S3_BUCKET = os.getenv('BUCKET_NAME')
S3_PREFIX = "stock_data/real-time-monitor/"
POLL_INTERVAL = 60  # seconds
FILTER_START_DELAY = 420  # 7 minutes (420 seconds)
POLYGON_API_KEY = os.getenv('POLYGON_API_KEY')

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class NASDAQMonitor:
    def __init__(self):
        self.rest_client = RESTClient(POLYGON_API_KEY)
        self.ws_client = WebSocketClient(POLYGON_API_KEY, feed='sip')  # Premium real-time feed
        self.stocks_data = defaultdict(dict)  # Store latest data for each stock
        self.nasdaq_symbols = set()
        self.qualified_symbols = set()
        self.start_time = None
        self.filter_enabled = False
        self.running = True
        self.cst = pytz.timezone('America/Chicago')
        self.data_lock = asyncio.Lock()
        
        # S3 client
        if AWS_S3_ENABLED:
            self.s3_client = boto3.client('s3')
        
        # File paths
        self.date_str = datetime.now(self.cst).strftime('%Y%m%d')
        self.start_time_str = datetime.now(self.cst).strftime('%H%M')
        self.raw_file = f'nasdaq_monitor_raw_{self.date_str}_{self.start_time_str}.csv'
        self.filtered_file = f'nasdaq_monitor_filtered_{self.date_str}_{self.start_time_str}.csv'
        
        # CSV headers
        self.headers = [
            'timestamp', 'symbol', 'market_cap_millions', 'previous_close', 'open',
            'current_price', 'bid', 'ask', 'volume', 'day_high', 'day_low',
            'change_pct', 'change_from_open_pct', 'meets_criteria'
        ]
        
        # Initialize CSV files
        self._initialize_csv_files()
    
    def _initialize_csv_files(self):
        """Create CSV files with headers"""
        with open(self.raw_file, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=self.headers)
            writer.writeheader()
        logger.info(f"Created raw data file: {self.raw_file}")
    
    async def fetch_nasdaq_symbols(self):
        """Fetch all NASDAQ symbols from Polygon using efficient pagination"""
        logger.info("Fetching NASDAQ symbols...")
        symbols = []
        
        try:
            # Use pagination efficiently
            next_url = None
            page_count = 0
            
            while True:
                page_count += 1
                if next_url:
                    # Parse the cursor from next_url
                    tickers_response = self.rest_client.list_tickers(
                        market="stocks",
                        exchange="XNAS",
                        active=True,
                        limit=1000,
                        cursor=next_url.split('cursor=')[1] if 'cursor=' in next_url else None
                    )
                else:
                    tickers_response = self.rest_client.list_tickers(
                        market="stocks",
                        exchange="XNAS",
                        active=True,
                        limit=1000
                    )
                
                # Process the current page
                page_symbols = []
                for ticker in tickers_response:
                    page_symbols.append(ticker.ticker)
                
                symbols.extend(page_symbols)
                logger.info(f"Page {page_count}: Found {len(page_symbols)} symbols, total: {len(symbols)}")
                
                # Check if there's a next page
                if hasattr(tickers_response, 'next_url') and tickers_response.next_url:
                    next_url = tickers_response.next_url
                else:
                    break
                
                # Safety limit
                if len(symbols) >= 10000:
                    logger.warning(f"Reached safety limit of 10000 symbols")
                    break
            
            self.nasdaq_symbols = set(symbols)
            logger.info(f"Found {len(self.nasdaq_symbols)} NASDAQ symbols")
            
            # Fetch initial data using efficient snapshot API
            await self.fetch_initial_data()
            
        except Exception as e:
            logger.error(f"Error fetching NASDAQ symbols: {e}")
            # Fallback to a small test set
            self.nasdaq_symbols = {'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'META', 'NVDA', 'AMD', 'NFLX', 'TSLA'}
            logger.info(f"Using test set of {len(self.nasdaq_symbols)} symbols")
    
    async def fetch_initial_data(self):
        """Fetch initial data (market cap, open, previous close) for all symbols using minimal API calls"""
        logger.info("Fetching initial data for all symbols...")
        
        # Use snapshot endpoint for bulk data - much more efficient
        try:
            # Get all snapshots in one call
            snapshots = self.rest_client.get_snapshot_all("stocks")
            
            for snapshot in snapshots:
                symbol = snapshot.ticker
                if symbol in self.nasdaq_symbols:
                    async with self.data_lock:
                        self.stocks_data[symbol].update({
                            'market_cap_millions': getattr(snapshot, 'market_cap', 0) / 1_000_000 if hasattr(snapshot, 'market_cap') else 0,
                            'previous_close': snapshot.prev_day.close if snapshot.prev_day else 0,
                            'open': snapshot.day.open if snapshot.day else 0,
                            'volume': snapshot.day.volume if snapshot.day else 0,
                            'day_high': snapshot.day.high if snapshot.day else 0,
                            'day_low': snapshot.day.low if snapshot.day else float('inf'),
                            'current_price': snapshot.day.close if snapshot.day else 0
                        })
            
            logger.info(f"Initial data fetched for {len(self.stocks_data)} symbols using snapshot API")
            
        except Exception as e:
            logger.error(f"Error fetching snapshot data: {e}")
            # Fallback to individual calls only if snapshot fails
            logger.info("Falling back to individual API calls...")
            
            # Limit to top symbols to minimize calls
            limited_symbols = list(self.nasdaq_symbols)[:100]
            for symbol in limited_symbols:
                try:
                    # Get previous day data
                    prev_day = self.rest_client.get_previous_close(symbol)
                    if prev_day and len(prev_day) > 0:
                        async with self.data_lock:
                            self.stocks_data[symbol].update({
                                'market_cap_millions': 0,  # Skip market cap in fallback
                                'previous_close': prev_day[0].close,
                                'open': prev_day[0].open,
                                'volume': 0,
                                'day_high': 0,
                                'day_low': float('inf')
                            })
                except Exception as e:
                    logger.debug(f"Error fetching data for {symbol}: {e}")
    
    def calculate_qualifying_criteria(self, symbol: str) -> bool:
        """Check if stock meets all qualifying criteria"""
        data = self.stocks_data.get(symbol, {})
        
        # Get required values
        volume = data.get('volume', 0)
        prev_close = data.get('previous_close', 0)
        open_price = data.get('open', 0)
        current_price = data.get('current_price', 0)
        
        # Calculate change from previous close
        if prev_close > 0:
            change_pct = ((current_price - prev_close) / prev_close) * 100
        else:
            change_pct = 0
        
        # Check all criteria
        meets_criteria = (
            volume > 300_000 and
            change_pct >= 2.5 and
            prev_close >= 0.01 and
            current_price > open_price
        )
        
        return meets_criteria
    
    async def handle_message(self, message: WebSocketMessage):
        """Handle incoming WebSocket messages"""
        if isinstance(message, (EquityTrade, EquityQuote, EquityAgg)):
            symbol = message.symbol
            
            async with self.data_lock:
                if symbol not in self.stocks_data:
                    self.stocks_data[symbol] = {}
                
                # Update price and volume data
                if isinstance(message, EquityTrade):
                    self.stocks_data[symbol]['current_price'] = message.price
                    self.stocks_data[symbol]['volume'] = self.stocks_data[symbol].get('volume', 0) + message.size
                    
                    # Update high/low
                    current_high = self.stocks_data[symbol].get('day_high', 0)
                    current_low = self.stocks_data[symbol].get('day_low', float('inf'))
                    self.stocks_data[symbol]['day_high'] = max(current_high, message.price)
                    self.stocks_data[symbol]['day_low'] = min(current_low, message.price)
                
                elif isinstance(message, EquityQuote):
                    self.stocks_data[symbol]['bid'] = message.bid_price
                    self.stocks_data[symbol]['ask'] = message.ask_price
                
                elif isinstance(message, EquityAgg):
                    self.stocks_data[symbol]['current_price'] = message.close
                    self.stocks_data[symbol]['volume'] = message.volume
                    self.stocks_data[symbol]['day_high'] = message.high
                    self.stocks_data[symbol]['day_low'] = message.low
    
    async def write_data_snapshot(self):
        """Write current data snapshot to CSV"""
        timestamp = datetime.now(self.cst).strftime('%Y-%m-%d %H:%M:%S')
        rows = []
        
        async with self.data_lock:
            for symbol in self.nasdaq_symbols:
                if symbol not in self.stocks_data:
                    continue
                
                data = self.stocks_data[symbol]
                
                # Skip if no current price
                if 'current_price' not in data:
                    continue
                
                # Calculate percentages
                prev_close = data.get('previous_close', 0)
                open_price = data.get('open', 0)
                current_price = data.get('current_price', 0)
                
                change_pct = ((current_price - prev_close) / prev_close * 100) if prev_close > 0 else 0
                change_from_open_pct = ((current_price - open_price) / open_price * 100) if open_price > 0 else 0
                
                # Check if meets criteria
                meets_criteria = self.calculate_qualifying_criteria(symbol)
                
                row = {
                    'timestamp': timestamp,
                    'symbol': symbol,
                    'market_cap_millions': f"{data.get('market_cap_millions', 0):.2f}",
                    'previous_close': f"{prev_close:.2f}",
                    'open': f"{open_price:.2f}",
                    'current_price': f"{current_price:.2f}",
                    'bid': f"{data.get('bid', 0):.2f}",
                    'ask': f"{data.get('ask', 0):.2f}",
                    'volume': data.get('volume', 0),
                    'day_high': f"{data.get('day_high', 0):.2f}",
                    'day_low': f"{data.get('day_low', 0):.2f}" if data.get('day_low', float('inf')) != float('inf') else "0.00",
                    'change_pct': f"{change_pct:.2f}",
                    'change_from_open_pct': f"{change_from_open_pct:.2f}",
                    'meets_criteria': 'Y' if meets_criteria else 'N'
                }
                rows.append(row)
                
                # Track qualified symbols
                if meets_criteria:
                    self.qualified_symbols.add(symbol)
                elif symbol in self.qualified_symbols:
                    self.qualified_symbols.remove(symbol)
        
        # Write to raw file
        with open(self.raw_file, 'a', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=self.headers)
            writer.writerows(rows)
        
        # Write to filtered file if enabled and there are qualified stocks
        if self.filter_enabled and self.qualified_symbols:
            filtered_rows = [row for row in rows if row['meets_criteria'] == 'Y']
            
            # Create filtered file if it doesn't exist
            if not os.path.exists(self.filtered_file):
                with open(self.filtered_file, 'w', newline='') as f:
                    writer = csv.DictWriter(f, fieldnames=self.headers)
                    writer.writeheader()
            
            with open(self.filtered_file, 'a', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=self.headers)
                writer.writerows(filtered_rows)
        
        # Upload to S3 if enabled
        if AWS_S3_ENABLED:
            await self.upload_to_s3()
        
        logger.info(f"Data snapshot written - Total: {len(rows)}, Qualified: {len(self.qualified_symbols)}")
        
        # Log when no qualifiers but keep running
        if self.filter_enabled and len(self.qualified_symbols) == 0:
            logger.info("No qualifying stocks at this time, continuing to monitor...")
    
    async def upload_to_s3(self):
        """Upload CSV files to S3"""
        try:
            # Upload raw file
            with open(self.raw_file, 'rb') as f:
                self.s3_client.put_object(
                    Bucket=S3_BUCKET,
                    Key=f"{S3_PREFIX}{self.raw_file}",
                    Body=f.read()
                )
            
            # Upload filtered file if it exists
            if os.path.exists(self.filtered_file):
                with open(self.filtered_file, 'rb') as f:
                    self.s3_client.put_object(
                        Bucket=S3_BUCKET,
                        Key=f"{S3_PREFIX}{self.filtered_file}",
                        Body=f.read()
                    )
            
            logger.info("Files uploaded to S3")
        except Exception as e:
            logger.error(f"Error uploading to S3: {e}")
    
    async def periodic_writer(self):
        """Write data snapshots every minute"""
        while self.running:
            await asyncio.sleep(POLL_INTERVAL)
            
            # Enable filtering after 7 minutes
            if not self.filter_enabled and self.start_time:
                elapsed = time.time() - self.start_time
                if elapsed >= FILTER_START_DELAY:
                    self.filter_enabled = True
                    logger.info("Filtering enabled - creating filtered output file")
            
            await self.write_data_snapshot()
    
    async def run(self):
        """Main run loop"""
        self.start_time = time.time()
        
        # Fetch NASDAQ symbols
        await self.fetch_nasdaq_symbols()
        
        # Subscribe to all symbols
        logger.info(f"Subscribing to {len(self.nasdaq_symbols)} symbols...")
        
        # Subscribe in batches to avoid overwhelming the connection
        symbols_list = list(self.nasdaq_symbols)
        batch_size = 100
        
        # Create subscription strings for efficiency
        subscription_strings = []
        for i in range(0, len(symbols_list), batch_size):
            batch = symbols_list[i:i + batch_size]
            # Create comma-separated symbol list for each batch
            batch_str = ",".join(batch)
            subscription_strings.append(batch_str)
        
        # Subscribe to all data types for each batch
        for i, batch_str in enumerate(subscription_strings):
            logger.info(f"Subscribing to batch {i+1}/{len(subscription_strings)}")
            
            # Subscribe to trades
            self.ws_client.subscribe(f"T.{batch_str}")
            # Subscribe to quotes  
            self.ws_client.subscribe(f"Q.{batch_str}")
            # Subscribe to minute aggregates
            self.ws_client.subscribe(f"AM.{batch_str}")
            
            await asyncio.sleep(0.1)  # Small delay between batches
        
        logger.info("Subscriptions complete. Starting real-time monitoring...")
        
        # Start periodic writer
        writer_task = asyncio.create_task(self.periodic_writer())
        
        # Define sync handler to bridge to async
        def sync_handle_message(message):
            asyncio.create_task(self.handle_message(message))

        try:
            self.ws_client.run(sync_handle_message)
        except Exception as e:
            logger.error(f"Error in WebSocket run loop: {e}")
        finally:
            writer_task.cancel()
            await self.ws_client.close()
            logger.info("Monitor stopped")

async def main():
    """Main function to run the monitor"""
    monitor = NASDAQMonitor()
    
    try:
        await monitor.run()
    except KeyboardInterrupt:
        logger.info("Received keyboard interrupt, shutting down...")
        monitor.running = False
    except Exception as e:
        logger.error(f"Error in main loop: {e}")
        monitor.running = False

# Run the monitor
if __name__ == "__main__":
    # For Jupyter notebook, use:
    await main()
    
    # For script execution, use:
    # asyncio.run(main())

2025-06-04 01:54:15,318 - INFO - Created raw data file: nasdaq_monitor_raw_20250604_0154.csv
2025-06-04 01:54:15,319 - INFO - Fetching NASDAQ symbols...


✅ .env file loaded successfully


2025-06-04 01:54:16,875 - INFO - Page 1: Found 4965 symbols, total: 4965
2025-06-04 01:54:16,876 - INFO - Found 4920 NASDAQ symbols
2025-06-04 01:54:16,876 - INFO - Fetching initial data for all symbols...
2025-06-04 01:54:17,536 - INFO - Initial data fetched for 4866 symbols using snapshot API
2025-06-04 01:54:17,541 - INFO - Subscribing to 4920 symbols...
2025-06-04 01:54:17,542 - INFO - Subscribing to batch 1/50
2025-06-04 01:54:17,643 - INFO - Subscribing to batch 2/50
2025-06-04 01:54:17,745 - INFO - Subscribing to batch 3/50
2025-06-04 01:54:17,847 - INFO - Subscribing to batch 4/50
2025-06-04 01:54:17,949 - INFO - Subscribing to batch 5/50
2025-06-04 01:54:18,051 - INFO - Subscribing to batch 6/50
2025-06-04 01:54:18,153 - INFO - Subscribing to batch 7/50
2025-06-04 01:54:18,255 - INFO - Subscribing to batch 8/50
2025-06-04 01:54:18,357 - INFO - Subscribing to batch 9/50
2025-06-04 01:54:18,459 - INFO - Subscribing to batch 10/50
2025-06-04 01:54:18,561 - INFO - Subscribing to b