In [None]:
#!/usr/bin/env python3
"""
üö® ULTRA-SCALE 5K+ DATA EXTRACTION TESTING

Patient, methodical testing approach for 5000+ data extraction validation.
Implements intelligent batching, real-time progress tracking, and comprehensive validation.

This test will take time but provides thorough validation of large-scale capabilities.
"""

import os, time, pathlib, pprint, requests, json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional

# Configuration - Auto-detect correct endpoint
def find_browser_endpoint():
    """Auto-detect the correct browser endpoint."""
    import urllib.request
    
    # Try different possible endpoints
    endpoints = [
        "http://browser:8004", 
    ]
    
    for endpoint in endpoints:
        try:
            with urllib.request.urlopen(f"{endpoint}/healthz", timeout=2) as response:
                if response.status == 200:
                    print(f"üîç Auto-detected browser endpoint: {endpoint}")
                    return endpoint
        except:
            continue
    
    # Default fallback
    return "http://browser:8004"

EP = find_browser_endpoint()  # Auto-detect correct endpoint

class UltraScale5KTester:
    """
    üö® ULTRA-SCALE 5K+ DATA EXTRACTION TESTER
    
    Patient testing approach for validating 5000+ data extraction capabilities.
    Implements progressive testing, intelligent batching, and real-time progress tracking.
    """
    
    def __init__(self, target_username="naval"):
        self.target_username = target_username
        self.endpoint = EP
        self.results = {
            "target_username": target_username,
            "start_time": None,
            "batches": [],
            "total_extracted": {},
            "quality_metrics": {},
            "performance_metrics": {},
            "issues": [],
            "success": False
        }
        
    def wait_for_job(self, job_id: str, every: int = 5) -> Dict[str, Any]:
        """Wait for job completion with detailed progress tracking."""
        print(f"‚è≥ Monitoring job {job_id}...")
        start_time = time.time()
        
        while True:
            try:
                rec = requests.get(f"{self.endpoint}/jobs/{job_id}", timeout=10).json()
                status = rec["status"]
                
                if status not in {"finished", "error"}:
                    elapsed = time.time() - start_time
                    print(f"\r‚è±Ô∏è  {rec.get('status_with_elapsed', status)} (Elapsed: {elapsed:.0f}s)", end="")
                else:
                    elapsed = time.time() - start_time
                    print(f"\n‚úÖ {status.upper()} in {elapsed:.1f}s")
                    return rec
                    
                time.sleep(every)
                
            except Exception as e:
                print(f"\n‚ùå Error checking job status: {e}")
                return {"status": "error", "error": str(e)}

    def submit_job(self, payload: Dict[str, Any], test_name: str) -> Dict[str, Any]:
        """Submit job and wait for completion with progress tracking."""
        print(f"\nüöÄ SUBMITTING: {test_name}")
        print(f"üìù Target: {payload.get('max_posts', 'N/A')} posts from @{payload.get('username', 'N/A')}")
        
        try:
            r = requests.post(f"{self.endpoint}/jobs/twitter", json=payload, timeout=30)
            r.raise_for_status()
            jid = r.json()["job_id"]
            print(f"üÜî Job ID: {jid}")
            
            result = self.wait_for_job(jid)
            return result
            
        except Exception as e:
            print(f"‚ùå Job submission failed: {e}")
            return {"status": "error", "error": str(e)}

    def analyze_extraction_result(self, result: Dict[str, Any], test_name: str) -> Dict[str, Any]:
        """Analyze extraction results with detailed metrics."""
        print(f"\nüìä ANALYZING: {test_name}")
        print("-" * 60)
        
        analysis = {
            "test_name": test_name,
            "status": result.get("status", "unknown"),
            "success": False,
            "extracted": 0,
            "target": 0,
            "rate": 0,
            "duration": 0,
            "speed": 0,
            "quality_score": 0,
            "issues": []
        }
        
        if result["status"] == "error":
            error_msg = result.get('error', 'Unknown error')
            print(f"‚ùå FAILED: {error_msg}")
            analysis["issues"].append(f"Job failed: {error_msg}")
            return analysis
        
        if "result" not in result:
            print(f"‚ùå No result data found")
            analysis["issues"].append("No result data in response")
            return analysis
        
        res = result["result"]
        data = res.get("data", [])
        
        if not data:
            print(f"‚ö†Ô∏è NO DATA EXTRACTED - 0 items returned")
            analysis["issues"].append("No data extracted")
            return analysis
        
        # Extract posts count
        first_item = data[0] if data else {}
        posts_extracted = 0
        
        if isinstance(first_item, dict) and 'posts' in first_item:
            posts_extracted = len(first_item['posts'])
        elif isinstance(data, list):
            posts_extracted = len(data)
        
        analysis.update({
            "success": posts_extracted > 0,
            "extracted": posts_extracted
        })
        
        print(f"‚úÖ EXTRACTED: {posts_extracted:,} posts")
        
        return analysis

    def run_progressive_scale_test(self) -> bool:
        """
        Phase 1: Progressive Scale Testing (10 -> 100 -> 1000 posts)
        Validates system capacity before attempting 5K extraction.
        """
        print(f"\nüî¨ PHASE 1: PROGRESSIVE SCALE VALIDATION")
        print("="*70)
        print(f"üéØ Purpose: Validate system capacity before 5K extraction")
        print(f"üìä Strategy: Progressive testing from 10 to 1000 posts")
        
        progressive_tests = [
            {"name": "Micro Scale", "posts": 10, "timeout": 120},
            {"name": "Small Scale", "posts": 50, "timeout": 180},
            {"name": "Medium Scale", "posts": 150, "timeout": 300},
            {"name": "Large Scale", "posts": 500, "timeout": 600},
            {"name": "Ultra Scale", "posts": 1000, "timeout": 900}
        ]

        baseline_proven = False
        max_proven_capacity = 0

        for i, test in enumerate(progressive_tests, 1):
            print(f"\nüìä Test {i}/{len(progressive_tests)}: {test['name']} ({test['posts']} posts)")
            print(f"‚è±Ô∏è Timeout: {test['timeout']} seconds")

            phase_start = time.time()

            payload = {
                "username": self.target_username,
                "scrape_posts": True,
                "max_posts": test['posts'],
                "scrape_level": 4
            }

            result = self.submit_job(payload, f"Progressive Test {i}")
            analysis = self.analyze_extraction_result(result, test['name'])
            
            phase_duration = time.time() - phase_start
            analysis["duration"] = phase_duration
            analysis["target"] = test['posts']
            
            if analysis["success"]:
                extraction_rate = (analysis["extracted"] / test['posts']) * 100
                speed = analysis["extracted"] / phase_duration if phase_duration > 0 else 0
                
                analysis["rate"] = extraction_rate
                analysis["speed"] = speed

                print(f"‚úÖ SUCCESS: {analysis['extracted']}/{test['posts']} posts ({extraction_rate:.1f}%)")
                print(f"üìà Speed: {speed:.2f} posts/second")
                print(f"‚è±Ô∏è Duration: {phase_duration:.1f}s")

                if extraction_rate >= 70:  # 70% success threshold
                    baseline_proven = True
                    max_proven_capacity = max(max_proven_capacity, test['posts'])
                    print(f"üéØ BASELINE PROVEN: {test['posts']} posts capacity confirmed")
                else:
                    print(f"‚ö†Ô∏è LOW EXTRACTION: Only {extraction_rate:.1f}% success rate")

            else:
                print(f"‚ùå FAILED: {' | '.join(analysis['issues'])}")
                print(f"‚è±Ô∏è Duration: {phase_duration:.1f}s")

                # If we fail at lower scales, stop progressive testing
                if test['posts'] <= 150:
                    print(f"üö® CRITICAL: Failed at {test['posts']} posts - infrastructure issues detected")
                    self.results["issues"].append(f"Failed at {test['posts']} posts - cannot proceed to 5K")
                    return False

            self.results["batches"].append(analysis)

            # Brief cooldown between tests
            if i < len(progressive_tests):
                print(f"‚è≥ Cooling down 10 seconds...")
                time.sleep(10)

        if baseline_proven and max_proven_capacity >= 150:
            print(f"\nüéâ PHASE 1 SUCCESS: Proven capacity up to {max_proven_capacity:,} posts")
            self.results["performance_metrics"]["proven_capacity"] = max_proven_capacity
            return True
        else:
            print(f"\nüö® PHASE 1 FAILED: Maximum proven capacity only {max_proven_capacity} posts")
            self.results["issues"].append(f"Insufficient baseline capacity: {max_proven_capacity} posts")
            return False

    def run_intelligent_5k_batching(self, proven_capacity: int) -> int:
        """
        Phase 2: Intelligent Batching for 5K+ Extraction
        Uses proven safe limits to extract 5000+ posts via intelligent batching.
        """
        print(f"\nüöÄ PHASE 2: INTELLIGENT 5K+ BATCHING")
        print("="*70)
        print(f"üìä Proven Capacity: {proven_capacity} posts per job")

        # Calculate optimal batch strategy
        safe_batch_size = min(proven_capacity, 150)  # Use proven safe limit
        target_5k = 5000
        num_batches = (target_5k + safe_batch_size - 1) // safe_batch_size  # Ceiling division

        print(f"üéØ Strategy: {num_batches} batches of {safe_batch_size} posts each")
        print(f"üìà Expected total: {num_batches * safe_batch_size} posts")
        print(f"‚è±Ô∏è Estimated time: {num_batches * 180} seconds ({(num_batches * 180)/60:.1f} minutes)")
        print(f"üö® PATIENCE REQUIRED: This will take time - progress shown below")

        total_5k_extracted = 0
        batch_failures = 0
        test_batch_limit = min(num_batches, 15)  # Test with first 15 batches

        print(f"\nüì¶ STARTING BATCH EXTRACTION (Testing {test_batch_limit} batches)")
        print("="*50)

        for batch_num in range(1, test_batch_limit + 1):
            print(f"\nüì¶ BATCH {batch_num}/{test_batch_limit}")
            print(f"üéØ Target: {safe_batch_size} posts")

            batch_start = time.time()

            payload = {
                "username": self.target_username,
                "scrape_posts": True,
                "max_posts": safe_batch_size,
                "scrape_level": 4,
                "batch_info": {
                    "batch_number": batch_num,
                    "total_batches": test_batch_limit,
                    "posts_offset": (batch_num - 1) * safe_batch_size
                }
            }

            result = self.submit_job(payload, f"5K Batch {batch_num}")
            analysis = self.analyze_extraction_result(result, f"Batch {batch_num}")
            
            batch_duration = time.time() - batch_start
            elapsed_total = time.time() - self.results["start_time"].timestamp()
            
            analysis["duration"] = batch_duration
            analysis["target"] = safe_batch_size

            if analysis["success"]:
                batch_extracted = analysis["extracted"]
                total_5k_extracted += batch_extracted
                extraction_rate = (batch_extracted / safe_batch_size) * 100
                speed = batch_extracted / batch_duration if batch_duration > 0 else 0
                
                analysis["rate"] = extraction_rate
                analysis["speed"] = speed

                print(f"‚úÖ BATCH SUCCESS: {batch_extracted}/{safe_batch_size} posts ({extraction_rate:.1f}%)")
                print(f"üìà Batch Speed: {speed:.2f} posts/second")
                print(f"üìä CUMULATIVE TOTAL: {total_5k_extracted:,} posts")
                print(f"‚è±Ô∏è Elapsed: {elapsed_total:.0f}s | This Batch: {batch_duration:.1f}s")

                # Progress tracking
                progress = (batch_num / test_batch_limit) * 100
                if batch_num > 1:
                    avg_batch_time = elapsed_total / batch_num
                    eta_remaining = avg_batch_time * (test_batch_limit - batch_num)
                    print(f"üìà Progress: {progress:.1f}% | ETA: {eta_remaining:.0f}s remaining")
                
                # Milestone tracking
                if total_5k_extracted >= 1000:
                    print(f"üéØ MILESTONE: 1K posts achieved!")
                if total_5k_extracted >= 2500:
                    print(f"üéØ MILESTONE: 2.5K posts achieved!")

            else:
                batch_failures += 1
                print(f"‚ùå BATCH FAILED: {' | '.join(analysis['issues'])}")
                print(f"‚è±Ô∏è Duration: {batch_duration:.1f}s")
                print(f"üìä CUMULATIVE TOTAL: {total_5k_extracted:,} posts (no change)")

                # Stop if too many consecutive failures
                if batch_failures >= 3:
                    print(f"üö® STOPPING: {batch_failures} consecutive batch failures")
                    self.results["issues"].append(f"Stopped after {batch_failures} batch failures")
                    break

            self.results["batches"].append(analysis)

            # Brief pause between batches to avoid overwhelming the system
            if batch_num < test_batch_limit:
                print(f"‚è≥ Inter-batch cooldown 15 seconds...")
                time.sleep(15)

        return total_5k_extracted

    def run_multi_data_type_test(self, posts_extracted: int) -> Dict[str, int]:
        """
        Phase 3: Multi-Data Type Testing (followers, following, media, likes, mentions)
        Tests extraction of different data types if posts extraction was successful.
        """
        if posts_extracted < 1000:
            print(f"\nüö® PHASE 3 SKIPPED: Insufficient posts extracted ({posts_extracted})")
            self.results["issues"].append("Multi-data type testing skipped due to low post extraction")
            return {}

        print(f"\nüîç PHASE 3: MULTI-DATA TYPE VALIDATION")
        print("="*70)
        print(f"üéØ Purpose: Validate extraction of all data types requested")
        print(f"üìä Scope: followers, following, media, likes, mentions")

        data_types_test = {
            "followers": {"max_followers": 1000},
            "following": {"max_following": 1000}, 
            "media": {"max_media": 500},
            "likes": {"max_likes": 500},
            "mentions": {"max_mentions": 500}
        }

        extraction_results = {}

        for data_type, params in data_types_test.items():
            print(f"\nüìä Testing {data_type.upper()} extraction...")

            payload = {
                "username": self.target_username,
                f"scrape_{data_type}": True,
                **params,
                "scrape_level": 4
            }

            type_start = time.time()
            result = self.submit_job(payload, f"5K {data_type.title()}")
            analysis = self.analyze_extraction_result(result, f"{data_type.title()} Test")
            type_duration = time.time() - type_start
            
            analysis["duration"] = type_duration
            analysis["target"] = list(params.values())[0]

            if analysis["success"]:
                extracted_count = analysis["extracted"]
                target_count = list(params.values())[0]
                rate = (extracted_count / target_count) * 100 if target_count > 0 else 0
                
                analysis["rate"] = rate

                print(f"‚úÖ {data_type.upper()}: {extracted_count:,}/{target_count:,} ({rate:.1f}%)")
                extraction_results[data_type] = extracted_count

            else:
                print(f"‚ùå {data_type.upper()} FAILED: {' | '.join(analysis['issues'])}")
                extraction_results[data_type] = 0

            self.results["batches"].append(analysis)
            time.sleep(10)  # Brief pause between data types

        return extraction_results

    def generate_comprehensive_report(self, posts_extracted: int, multi_data_results: Dict[str, int]):
        """Generate comprehensive ultra-scale testing report."""
        print(f"\n" + "="*80)
        print(f"üìä ULTRA-SCALE 5K+ TESTING COMPREHENSIVE REPORT")
        print("="*80)

        # Executive Summary
        total_followers = multi_data_results.get("followers", 0)
        total_following = multi_data_results.get("following", 0)
        total_media = multi_data_results.get("media", 0)
        total_likes = multi_data_results.get("likes", 0)
        total_mentions = multi_data_results.get("mentions", 0)

        overall_items = posts_extracted + total_followers + total_following + total_media + total_likes + total_mentions
        total_duration = time.time() - self.results["start_time"].timestamp()

        print(f"üéØ EXECUTIVE SUMMARY:")
        print(f"   Target User: @{self.target_username}")
        print(f"   Test Duration: {total_duration:.1f} seconds ({total_duration/60:.1f} minutes)")
        print(f"   Total Data Extracted: {overall_items:,} items")

        # Data Breakdown
        print(f"\nüìä ULTRA-SCALE DATA EXTRACTION RESULTS:")
        print(f"   üìù Posts: {posts_extracted:,}")
        print(f"   üë• Followers: {total_followers:,}")
        print(f"   ‚û°Ô∏è Following: {total_following:,}")
        print(f"   üñºÔ∏è Media: {total_media:,}")
        print(f"   ‚ù§Ô∏è Likes: {total_likes:,}")
        print(f"   @Ô∏è‚É£ Mentions: {total_mentions:,}")

        # Success Assessment
        success_criteria = {
            "5K+ Posts": posts_extracted >= 5000,
            "2.5K+ Posts": posts_extracted >= 2500,
            "1K+ Posts": posts_extracted >= 1000,
            "Multi-Data Types": sum(multi_data_results.values()) > 0,
            "All Data Types": len([v for v in multi_data_results.values() if v > 0]) >= 3
        }

        print(f"\nüéØ SUCCESS CRITERIA ASSESSMENT:")
        for criteria, met in success_criteria.items():
            icon = "‚úÖ" if met else "‚ùå"
            print(f"   {icon} {criteria}")

        # Overall Success Determination
        if success_criteria["5K+ Posts"]:
            overall_success = "EXCELLENT - 5K+ ACHIEVED"
            capability_score = "PRODUCTION READY"
        elif success_criteria["2.5K+ Posts"]:
            overall_success = "GOOD - 2.5K+ ACHIEVED"
            capability_score = "PRODUCTION CAPABLE"
        elif success_criteria["1K+ Posts"]:
            overall_success = "FAIR - 1K+ ACHIEVED"
            capability_score = "LIMITED PRODUCTION"
        else:
            overall_success = "POOR - <1K POSTS"
            capability_score = "NOT PRODUCTION READY"

        print(f"\nüèÜ OVERALL ASSESSMENT:")
        print(f"   Result: {overall_success}")
        print(f"   Capability: {capability_score}")
        
        self.results["success"] = posts_extracted >= 2500  # 50% of 5K target
        self.results["total_extracted"]["posts"] = posts_extracted
        self.results["total_extracted"].update(multi_data_results)

        # Production Recommendations
        print(f"\nüí° PRODUCTION RECOMMENDATIONS:")
        if posts_extracted >= 2500:
            print(f"   ‚úÖ System ready for ultra-scale production use")
            print(f"   ‚úÖ Use intelligent batching for requests > 150 posts")
            print(f"   ‚úÖ Expected 5K extraction time: ~{(5000/150) * 180:.0f} seconds")
            print(f"   ‚úÖ Implement progress tracking for user visibility")
        elif posts_extracted >= 1000:
            print(f"   ‚ö†Ô∏è System capable of large-scale production with limits")
            print(f"   ‚ö†Ô∏è Recommended maximum: 1000-2000 posts per request")
            print(f"   ‚ö†Ô∏è Monitor system performance and adjust limits")
        else:
            print(f"   ‚ùå System requires infrastructure improvements")
            print(f"   ‚ùå Fix baseline extraction issues before production")

        print(f"\nüéØ 5K+ VALIDATION RESULT: {'‚úÖ PASSED' if self.results['success'] else '‚ùå FAILED'}")
        print(f"üìä System is {'READY' if self.results['success'] else 'NOT READY'} for ultra-scale production")

        return self.results

    def run_complete_5k_test(self):
        """
        üö® Run Complete Ultra-Scale 5K+ Testing Suite
        
        This is the main method that orchestrates all testing phases.
        """
        print("üö® ULTRA-SCALE 5K+ DATA EXTRACTION TESTING")
        print("="*80)
        print(f"üéØ TARGET: @{self.target_username}")
        print(f"üìä GOAL: Extract 5000+ posts, followers, following, media, likes, mentions")
        print(f"‚è±Ô∏è APPROACH: Patient, intelligent batching with real-time progress")
        print(f"üî¨ VALIDATION: Comprehensive data quality and architecture assessment")
        print(f"üö® ESTIMATED TIME: 30-60 minutes for complete validation")
        print("="*80)

        self.results["start_time"] = datetime.now()
        
        # Check API connectivity first
        try:
            test_response = requests.get(f"{self.endpoint}/healthz", timeout=5)
            if test_response.status_code == 200:
                print(f"‚úÖ API connectivity: Connected to {self.endpoint}")
            else:
                print(f"‚ö†Ô∏è API connectivity: Unexpected response {test_response.status_code}")
                return self.results
        except Exception as e:
            print(f"‚ùå API connectivity: Failed - {e}")
            self.results["issues"].append(f"API connectivity failed: {e}")
            return self.results

        # Phase 1: Progressive Scale Testing
        baseline_success = self.run_progressive_scale_test()
        if not baseline_success:
            print(f"\nüö® STOPPING: Baseline testing failed - cannot proceed to 5K")
            return self.generate_comprehensive_report(0, {})

        # Phase 2: Intelligent 5K+ Batching
        proven_capacity = self.results["performance_metrics"].get("proven_capacity", 150)
        posts_extracted = self.run_intelligent_5k_batching(proven_capacity)

        # Phase 3: Multi-Data Type Testing  
        multi_data_results = self.run_multi_data_type_test(posts_extracted)

        # Generate Final Report
        return self.generate_comprehensive_report(posts_extracted, multi_data_results)

# Initialize and run ultra-scale 5K testing
print("üöÄ INITIALIZING ULTRA-SCALE 5K+ TESTING")
print("="*50)

# You can change the target username here
TARGET_USERNAME = "naval" 
 # Change this to test different accounts

tester = UltraScale5KTester(target_username=TARGET_USERNAME)
final_results = tester.run_complete_5k_test()

print(f"\nüéâ ULTRA-SCALE 5K+ TESTING COMPLETED!")
print(f"üìä Final Results: {final_results['success']}")
print(f"üìÅ Full results stored in tester.results for detailed analysis")