# YouTube Audio Crawler - Scale Testing v4

**Purpose:** Scale testing untuk validate cookie auth stability

**Test Phases:**
- Phase 1: ‚úÖ Done (3 videos, 100% success)
- Phase 2: Medium (20 videos, 1 channel)
- Phase 3: Multi-channel (3-5 channels, 5-10 videos each)
- Phase 4: Full scale (50 channels)

**Requirements:**
- Format: M4A, 44kHz exact (44000 Hz), mono, 192kbps
- Cookie authentication (fresh cookies required!)
- Uses FFmpeg audio filter for proper 44kHz resampling

## Step 1: Install Dependencies

In [None]:
!pip install -q yt-dlp
!ffmpeg -version | head -n 1
print("\n‚úÖ Dependencies installed")

## Step 2: Upload Fresh YouTube Cookies

**CRITICAL:** Export fresh cookies (less than 1 hour old)

1. Logout/login to YouTube
2. Use cookie extension to export
3. Upload here

In [None]:
from google.colab import files
import os
from datetime import datetime

print("üì§ Upload youtube_cookies.txt:\n")
uploaded = files.upload()

COOKIES_FILE = None
for filename in uploaded.keys():
    if 'cookie' in filename.lower() or filename.endswith('.txt'):
        COOKIES_FILE = filename
        
        # Check file
        stat = os.stat(filename)
        mod_time = datetime.fromtimestamp(stat.st_mtime)
        age_hours = (datetime.now() - mod_time).total_seconds() / 3600
        
        with open(filename, 'r') as f:
            lines = f.readlines()
            youtube_cookies = sum(1 for line in lines if 'youtube.com' in line)
        
        print(f"\n‚úÖ Cookie file: {filename}")
        print(f"üìä Lines: {len(lines)}")
        print(f"üç™ YouTube cookies: {youtube_cookies}")
        print(f"‚è∞ File age: {age_hours:.1f} hours")
        
        if age_hours > 2:
            print(f"\n‚ö†Ô∏è  WARNING: Cookie file is {age_hours:.1f} hours old")
            print("   Recommended: Export fresh cookies (< 1 hour old)")
        
        break

if not COOKIES_FILE:
    print("\n‚ùå No cookies uploaded!")
else:
    print(f"\n‚úÖ Ready for testing with cookies")

## Step 3: Setup

In [None]:
import yt_dlp
import json
import time
import random
from pathlib import Path
from datetime import datetime

DOWNLOADS_DIR = Path('./downloads')
DOWNLOADS_DIR.mkdir(exist_ok=True)

# Conservative settings for stability
SLEEP_BETWEEN_VIDEOS = 8   # seconds between videos
SLEEP_MIN = 5   # minimum random sleep
SLEEP_MAX = 10  # maximum random sleep

print("‚úÖ Setup complete")
print(f"üìÅ Downloads: {DOWNLOADS_DIR.absolute()}")
print(f"üç™ Cookies: {COOKIES_FILE if COOKIES_FILE else 'None'}")
print(f"‚è±Ô∏è  Sleep: {SLEEP_MIN}-{SLEEP_MAX}s before download, {SLEEP_BETWEEN_VIDEOS}s between videos")

## Step 4: Download Functions with Enhanced Monitoring

In [None]:
def get_channel_videos(channel_url):
    """Get all videos from channel"""
    print(f"\nüîç Fetching: {channel_url}")
    
    ydl_opts = {
        'quiet': True,
        'extract_flat': True,
    }
    
    if COOKIES_FILE:
        ydl_opts['cookiefile'] = COOKIES_FILE
        print(f"   üç™ Using cookies")
    
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            result = ydl.extract_info(channel_url, download=False)
            
            if 'entries' in result:
                videos = []
                for entry in result['entries']:
                    if entry:
                        videos.append({
                            'video_id': entry.get('id'),
                            'url': f"https://www.youtube.com/watch?v={entry.get('id')}",
                            'title': entry.get('title', 'Unknown')
                        })
                
                print(f"   ‚úÖ Found {len(videos)} videos")
                return videos
    except Exception as e:
        error_msg = str(e)
        print(f"   ‚ùå Error: {error_msg[:100]}")
        
        # Detect error types
        if 'bot' in error_msg.lower():
            print(f"   ‚ö†Ô∏è  Bot detection - check cookies")
        elif 'cookies' in error_msg.lower() and 'no longer valid' in error_msg.lower():
            print(f"   ‚ö†Ô∏è  Cookies expired - export fresh cookies")
    
    return []

def download_video_audio(video_info, video_num, total_videos):
    """Download audio with detailed progress tracking"""
    video_id = video_info['video_id']
    video_url = video_info['url']
    title = video_info['title']
    
    print(f"\n{'='*60}")
    print(f"‚¨áÔ∏è  Video {video_num}/{total_videos}: {title[:50]}...")
    print(f"   ID: {video_id}")
    print(f"   URL: {video_url}")
    
    video_dir = DOWNLOADS_DIR / video_id
    video_dir.mkdir(exist_ok=True)
    
    start_time = time.time()
    
    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': str(video_dir / f"{video_id}.%(ext)s"),
        'no_warnings': False,
        'ignoreerrors': False,
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'm4a',
            'preferredquality': '192',
        }],
        'postprocessor_args': [
            '-c:a', 'aac',              # Force AAC encoding (disable streamcopy)
            '-af', 'aresample=44000',   # Resample to 44kHz via filter
            '-ac', '1',                 # Mono
            '-b:a', '192k',             # Bitrate
        ],
    }
    
    if COOKIES_FILE:
        ydl_opts['cookiefile'] = COOKIES_FILE
        print(f"   üç™ Authenticated")
    
    error_type = None
    error_detail = None
    
    try:
        sleep_time = random.uniform(SLEEP_MIN, SLEEP_MAX)
        print(f"   ‚è≥ Sleeping {sleep_time:.1f}s...")
        time.sleep(sleep_time)
        
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(video_url, download=True)
            
            # Save metadata
            metadata = {
                'video_id': video_id,
                'title': info.get('title'),
                'description': info.get('description'),
                'uploader': info.get('uploader'),
                'upload_date': info.get('upload_date'),
                'duration': info.get('duration'),
                'view_count': info.get('view_count'),
                'like_count': info.get('like_count'),
                'channel_id': info.get('channel_id'),
                'channel_url': info.get('channel_url'),
                'url': video_url,
                'audio_format': 'm4a',
                'sample_rate': 44000,  # 44kHz exact (client requirement)
                'channels': 1,
                'bitrate': 192,
                'downloaded_at': datetime.now().isoformat(),
                'authentication': 'cookies' if COOKIES_FILE else 'none'
            }
            
            with open(video_dir / f"{video_id}.json", 'w', encoding='utf-8') as f:
                json.dump(metadata, f, indent=2, ensure_ascii=False)
            
            duration = time.time() - start_time
            print(f"   ‚úÖ Success! ({duration:.1f}s)")
            
            time.sleep(SLEEP_BETWEEN_VIDEOS)
            return True, None, None, duration
            
    except Exception as e:
        error_msg = str(e)
        duration = time.time() - start_time
        
        # Classify error
        if '403' in error_msg or 'Forbidden' in error_msg:
            error_type = '403_forbidden'
            error_detail = '403 Forbidden'
            print(f"   ‚ùå 403 Forbidden ({duration:.1f}s)")
        elif 'bot' in error_msg.lower() or 'sign in' in error_msg.lower():
            error_type = 'bot_detection'
            error_detail = 'Bot detection'
            print(f"   ‚ùå Bot detection ({duration:.1f}s)")
        elif 'cookies' in error_msg.lower() and 'no longer valid' in error_msg.lower():
            error_type = 'cookies_expired'
            error_detail = 'Cookies expired'
            print(f"   ‚ùå Cookies expired ({duration:.1f}s)")
        else:
            error_type = 'other'
            error_detail = error_msg[:100]
            print(f"   ‚ùå Error: {error_msg[:100]} ({duration:.1f}s)")
        
        return False, error_type, error_detail, duration

print("‚úÖ Functions loaded with enhanced monitoring")

## Step 5: Scale Testing Configuration

**Choose your test phase:**

In [None]:
# ============================================
# SCALE TESTING CONFIGURATION
# ============================================

# Phase 2: Medium Scale (20 videos, 1 channel)
TEST_PHASE = "Phase 2 - Medium Scale"
CHANNELS = [
    "https://www.youtube.com/channel/UCLFgJS-f6UKOJ3Xz0K8Kosg",  # leon (205 videos)
]
MAX_VIDEOS_PER_CHANNEL = 20

# Phase 3: Multi-Channel (uncomment to use)
# TEST_PHASE = "Phase 3 - Multi-Channel"
# CHANNELS = [
#     "https://www.youtube.com/channel/UCLFgJS-f6UKOJ3Xz0K8Kosg",  # leon
#     "https://www.youtube.com/channel/UC74T0OeGBT2bOcidVMwqqoQ",  # Ross_Liu
#     "https://www.youtube.com/channel/XXXXX",  # Add more
# ]
# MAX_VIDEOS_PER_CHANNEL = 10

# Phase 4: Full Scale (uncomment to use)
# TEST_PHASE = "Phase 4 - Full Scale"
# CHANNELS = [  # Load from file or paste 50 channels
#     # ... all 50 channels
# ]
# MAX_VIDEOS_PER_CHANNEL = None  # Download all

print(f"üìä Test Configuration: {TEST_PHASE}")
print(f"üìã Channels: {len(CHANNELS)}")
print(f"üìπ Max videos per channel: {MAX_VIDEOS_PER_CHANNEL if MAX_VIDEOS_PER_CHANNEL else 'ALL'}")
print(f"üç™ Cookies: {'‚úÖ Loaded' if COOKIES_FILE else '‚ùå None'}")

## Step 6: Start Scale Testing

In [None]:
# Detailed statistics
stats = {
    'test_phase': TEST_PHASE,
    'total_videos': 0,
    'success': 0,
    'failed': 0,
    'errors': {
        '403_forbidden': 0,
        'bot_detection': 0,
        'cookies_expired': 0,
        'other': 0
    },
    'durations': [],
    'channels_processed': 0,
    'start_time': datetime.now(),
    'videos_details': []
}

print(f"üöÄ Starting: {TEST_PHASE}")
print(f"‚è∞ {stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}")
print(f"üç™ Cookies: {'‚úÖ' if COOKIES_FILE else '‚ùå'}")
print("="*60)

for channel_idx, channel_url in enumerate(CHANNELS, 1):
    print(f"\n{'#'*60}")
    print(f"üì∫ Channel {channel_idx}/{len(CHANNELS)}: {channel_url}")
    print(f"{'#'*60}")
    
    videos = get_channel_videos(channel_url)
    if not videos:
        print("‚ö†Ô∏è  Skipping channel (no videos or error)")
        continue
    
    if MAX_VIDEOS_PER_CHANNEL:
        videos = videos[:MAX_VIDEOS_PER_CHANNEL]
        print(f"üìä Testing with {len(videos)} videos")
    
    stats['channels_processed'] += 1
    
    for i, video in enumerate(videos, 1):
        stats['total_videos'] += 1
        
        success, error_type, error_detail, duration = download_video_audio(
            video, i, len(videos)
        )
        
        # Track details
        video_result = {
            'video_id': video['video_id'],
            'title': video['title'],
            'success': success,
            'error_type': error_type,
            'error_detail': error_detail,
            'duration': duration,
            'channel': channel_url
        }
        stats['videos_details'].append(video_result)
        stats['durations'].append(duration)
        
        if success:
            stats['success'] += 1
        else:
            stats['failed'] += 1
            if error_type:
                stats['errors'][error_type] += 1
        
        # Progress update every 5 videos
        if stats['total_videos'] % 5 == 0:
            current_rate = (stats['success'] / stats['total_videos']) * 100 if stats['total_videos'] > 0 else 0
            print(f"\nüìä Progress: {stats['total_videos']} videos, {current_rate:.1f}% success rate")

# Final statistics
stats['end_time'] = datetime.now()
stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds()

print("\n" + "="*60)
print("üìä SCALE TESTING RESULTS")
print("="*60)
print(f"üéØ Test Phase: {TEST_PHASE}")
print(f"‚è∞ Start: {stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}")
print(f"‚è∞ End: {stats['end_time'].strftime('%Y-%m-%d %H:%M:%S')}")
print(f"‚è±Ô∏è  Duration: {stats['duration']:.0f}s ({stats['duration']/60:.1f}min)")
print(f"\nüì∫ Channels processed: {stats['channels_processed']}/{len(CHANNELS)}")
print(f"üìπ Total videos: {stats['total_videos']}")
print(f"‚úÖ Success: {stats['success']}")
print(f"‚ùå Failed: {stats['failed']}")

if stats['total_videos'] > 0:
    success_rate = (stats['success'] / stats['total_videos']) * 100
    print(f"\nüìà Success Rate: {success_rate:.1f}%")
    
    if stats['durations']:
        avg_duration = sum(stats['durations']) / len(stats['durations'])
        print(f"‚è±Ô∏è  Average time per video: {avg_duration:.1f}s")
    
    print(f"\n‚ùå Error Breakdown:")
    print(f"   403 Forbidden: {stats['errors']['403_forbidden']}")
    print(f"   Bot detection: {stats['errors']['bot_detection']}")
    print(f"   Cookies expired: {stats['errors']['cookies_expired']}")
    print(f"   Other: {stats['errors']['other']}")
    
    # Analysis
    print(f"\nüìä Analysis:")
    if success_rate >= 90:
        print(f"   ‚úÖ EXCELLENT - Ready for next phase!")
    elif success_rate >= 70:
        print(f"   ‚ö†Ô∏è  GOOD - Minor issues, investigate errors")
    elif success_rate >= 50:
        print(f"   ‚ö†Ô∏è  MODERATE - Significant issues, need fixes")
    else:
        print(f"   ‚ùå POOR - Major issues, need debugging")
    
    if stats['errors']['cookies_expired'] > 0:
        print(f"   ‚ö†Ô∏è  Cookies expired during testing - export fresh cookies")
    if stats['errors']['403_forbidden'] > stats['total_videos'] * 0.2:
        print(f"   ‚ö†Ô∏è  High 403 rate - may need IP rotation")

print("\n‚úÖ Scale testing complete!")

# Save results
results_file = f"scale_test_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(results_file, 'w') as f:
    # Convert datetime to string for JSON
    stats_json = stats.copy()
    stats_json['start_time'] = stats['start_time'].isoformat()
    stats_json['end_time'] = stats['end_time'].isoformat()
    json.dump(stats_json, f, indent=2)

print(f"\nüíæ Results saved: {results_file}")

## Step 7: View Downloaded Files

In [None]:
print("üìÇ Downloaded files:\n")

success_count = 0
failed_count = 0
total_mb = 0

for vdir in sorted(DOWNLOADS_DIR.iterdir()):
    if not vdir.is_dir():
        continue
    
    m4a = list(vdir.glob('*.m4a'))
    json_file = list(vdir.glob('*.json'))
    
    if m4a and json_file:
        success_count += 1
        size = m4a[0].stat().st_size / (1024*1024)
        total_mb += size
        print(f"‚úÖ {vdir.name}")
        print(f"   ‚îî‚îÄ {m4a[0].name} ({size:.2f} MB)")
        print(f"   ‚îî‚îÄ {json_file[0].name}")
    else:
        failed_count += 1
        print(f"‚ùå {vdir.name} (incomplete)")

print(f"\nüìä File Summary:")
print(f"   ‚úÖ Complete: {success_count}")
print(f"   ‚ùå Incomplete: {failed_count}")
print(f"   üíæ Total size: {total_mb:.2f} MB")
if success_count > 0:
    print(f"   üìè Average: {total_mb/success_count:.2f} MB per file")

## Step 8: Create ZIP for Download

In [None]:
import shutil

timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
zip_name = f"scale_test_{TEST_PHASE.replace(' ', '_')}_{timestamp}"

print(f"üì¶ Creating ZIP: {zip_name}.zip")
shutil.make_archive(zip_name, 'zip', DOWNLOADS_DIR)
print(f"‚úÖ ZIP created!")

# Also copy results file
print(f"\nüì• Files ready for download:")
print(f"   1. {zip_name}.zip (audio files)")
print(f"   2. {results_file} (test results)")

try:
    files.download(f"{zip_name}.zip")
    files.download(results_file)
    print(f"\n‚¨áÔ∏è  Download started!")
except:
    print(f"\nüí° Download from files panel (left sidebar)")