In [1]:
pip install -U yt-dlp

Collecting yt-dlp
  Downloading yt_dlp-2025.12.8-py3-none-any.whl (3.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m70.6 kB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0mm
[?25hInstalling collected packages: yt-dlp
Successfully installed yt-dlp-2025.12.8

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
from yt_dlp import YoutubeDL

ydl_opts = {
    "quiet": True,
    "skip_download": True,
}

with YoutubeDL(ydl_opts) as ydl:
    info = ydl.extract_info(
        "https://www.youtube.com/@GoldminesTelefilms",
        download=False
    )

print(info.keys())


ERROR: [youtube] PDV1vkczRiM: Video unavailable. This content isn't available, try again later. The current session has been rate-limited by YouTube for up to an hour. It is recommended to use `-t sleep` to add a delay between video requests to avoid exceeding the rate limit. For more information, refer to  https://github.com/yt-dlp/yt-dlp/wiki/Extractors#this-content-isnt-available-try-again-later


DownloadError: ERROR: [youtube] PDV1vkczRiM: Video unavailable. This content isn't available, try again later. The current session has been rate-limited by YouTube for up to an hour. It is recommended to use `-t sleep` to add a delay between video requests to avoid exceeding the rate limit. For more information, refer to  https://github.com/yt-dlp/yt-dlp/wiki/Extractors#this-content-isnt-available-try-again-later

In [5]:
import time
import random
from yt_dlp import YoutubeDL
import concurrent.futures
from functools import partial

class YouTubeChannelExtractor:
    def __init__(self, channel_url, max_workers=3, min_delay=1, max_delay=3):
        """
        Initialize the YouTube channel extractor with rate limiting controls
        
        Args:
            channel_url: YouTube channel URL
            max_workers: Number of concurrent workers (be conservative)
            min_delay: Minimum delay between requests in seconds
            max_delay: Maximum delay between requests in seconds
        """
        self.channel_url = channel_url
        self.max_workers = max_workers
        self.min_delay = min_delay
        self.max_delay = max_delay
        
        # Base options for channel info extraction
        self.channel_opts = {
            "quiet": True,
            "skip_download": True,
            "extract_flat": True,  # Get basic info without downloading
            "playlistend": 50,  # Adjust based on your needs
        }
        
        # Options for video details extraction
        self.video_opts = {
            "quiet": True,
            "skip_download": True,
            "extract_flat": False,
        }
    
    def get_channel_videos(self):
        """Get list of video IDs from channel"""
        with YoutubeDL(self.channel_opts) as ydl:
            try:
                info = ydl.extract_info(self.channel_url, download=False)
                if 'entries' in info:
                    return [entry['id'] for entry in info['entries'] if entry]
                return []
            except Exception as e:
                print(f"Error getting channel info: {e}")
                return []
    
    def get_video_details(self, video_id):
        """Get details for a single video with delay"""
        # Random delay to avoid rate limiting
        time.sleep(random.uniform(self.min_delay, self.max_delay))
        
        try:
            with YoutubeDL(self.video_opts) as ydl:
                info = ydl.extract_info(video_id, download=False)
                
                # Extract only the required fields
                video_data = {
                    'youtube_id': info.get('id'),
                    'title': info.get('title'),
                    'duration': info.get('duration'),
                    'tags': info.get('tags', []),
                    'images': {
                        'thumbnail': info.get('thumbnail'),
                        'thumbnails': info.get('thumbnails', [])
                    }
                }
                return video_data
        except Exception as e:
            print(f"Error getting video {video_id}: {e}")
            return None
    
    def extract_all_videos(self):
        """Extract all videos from channel with rate limiting"""
        print("Fetching video list from channel...")
        video_ids = self.get_channel_videos()
        
        if not video_ids:
            print("No videos found or error fetching channel")
            return []
        
        print(f"Found {len(video_ids)} videos. Starting extraction...")
        
        # Use ThreadPoolExecutor for concurrent extraction
        all_video_data = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Create futures for all video IDs
            futures = []
            for video_id in video_ids:
                future = executor.submit(self.get_video_details, video_id)
                futures.append(future)
                
                # Small delay between submitting tasks
                time.sleep(0.1)
            
            # Collect results as they complete
            completed = 0
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result:
                    all_video_data.append(result)
                completed += 1
                if completed % 5 == 0:
                    print(f"Progress: {completed}/{len(video_ids)} videos processed")
                
                # Optional: Save progress periodically
                if completed % 20 == 0:
                    self._save_progress(all_video_data, completed)
        
        print(f"Extraction complete! Processed {len(all_video_data)} videos")
        return all_video_data
    
    def _save_progress(self, data, count):
        """Optional: Save progress to file"""
        import json
        with open(f'progress_{count}.json', 'w') as f:
            json.dump(data, f, indent=2)
        print(f"Progress saved after {count} videos")

# Alternative: Sequential extraction with adaptive delays
def extract_with_adaptive_delay(channel_url, batch_size=10, initial_delay=2):
    """
    Extract videos with adaptive delay - increases delay if errors occur
    """
    channel_extractor = YouTubeChannelExtractor(channel_url)
    video_ids = channel_extractor.get_channel_videos()
    
    all_data = []
    delay = initial_delay
    error_count = 0
    
    for i, video_id in enumerate(video_ids):
        print(f"Processing video {i+1}/{len(video_ids)}")
        
        try:
            # Apply current delay
            time.sleep(delay)
            
            # Get video details
            video_data = channel_extractor.get_video_details(video_id)
            if video_data:
                all_data.append(video_data)
                error_count = 0  # Reset error count on success
                
                # Gradually decrease delay if things are going well
                if error_count == 0 and delay > 1:
                    delay = max(1, delay * 0.9)  # Reduce delay by 10%
            else:
                error_count += 1
                
        except Exception as e:
            print(f"Error on video {video_id}: {e}")
            error_count += 1
        
        # If we get multiple errors, increase delay
        if error_count >= 3:
            delay = min(10, delay * 1.5)  # Increase by 50%, max 10 seconds
            print(f"Increased delay to {delay:.1f} seconds due to errors")
            error_count = 0  # Reset after adjusting
        
        # Save batch
        if (i + 1) % batch_size == 0:
            import json
            with open(f'batch_{(i+1)//batch_size}.json', 'w') as f:
                json.dump(all_data, f, indent=2)
            print(f"Saved batch {(i+1)//batch_size}")
    
    return all_data

# Fastest option (use with caution - higher risk of rate limiting)
def fast_extract(channel_url, max_videos=100):
    """
    Fast extraction with minimal delay - use only if you have high rate limits
    """
    opts = {
        "quiet": True,
        "skip_download": True,
        "extract_flat": False,
        "playlistend": max_videos,
        "ignoreerrors": True,  # Continue on errors
    }
    
    all_data = []
    
    with YoutubeDL(opts) as ydl:
        info = ydl.extract_info(channel_url, download=False)
        
        if 'entries' in info:
            for entry in info['entries']:
                if entry:  # Skip None entries from errors
                    video_data = {
                        'youtube_id': entry.get('id'),
                        'title': entry.get('title'),
                        'duration': entry.get('duration'),
                        'tags': entry.get('tags', []),
                        'images': {
                            'thumbnail': entry.get('thumbnail'),
                            'thumbnails': entry.get('thumbnails', [])
                        }
                    }
                    all_data.append(video_data)
                    
                    # Very small delay
                    time.sleep(0.5)
    
    return all_data



In [6]:
# Usage example:
data = None
if __name__ == "__main__":
    # Method 1: Conservative with rate limiting
    extractor = YouTubeChannelExtractor(
        "https://www.youtube.com/@GoldminesTelefilms",
        max_workers=2,  # Fewer workers = less likely to be rate limited
        min_delay=1.5,
        max_delay=3
    )
    data = extractor.extract_all_videos()
    
    # Method 2: Adaptive delay (good balance)
    # data = extract_with_adaptive_delay("https://www.youtube.com/@channelname")
    
    # Method 3: Fast extraction (risky)
    # data = fast_extract("https://www.youtube.com/@channelname", max_videos=50)
    
    # Print results
    print(f"\nExtracted {len(data)} videos")
    if data:
        print("\nFirst video details:")
        import json
        print(json.dumps(data[0], indent=2))

Fetching video list from channel...
Found 2 videos. Starting extraction...


ERROR: [generic] 'UCyoXW-Dse7fURq30EWl_CUA' is not a valid URL


Error getting video UCyoXW-Dse7fURq30EWl_CUA: ERROR: [generic] 'UCyoXW-Dse7fURq30EWl_CUA' is not a valid URL


ERROR: [generic] 'UCyoXW-Dse7fURq30EWl_CUA' is not a valid URL


Error getting video UCyoXW-Dse7fURq30EWl_CUA: ERROR: [generic] 'UCyoXW-Dse7fURq30EWl_CUA' is not a valid URL
Extraction complete! Processed 0 videos

Extracted 0 videos


In [10]:
import time
import random
import json
from yt_dlp import YoutubeDL

class GoldminesTelefilmsExtractor:
    def __init__(self, max_videos=50):
        """
        Initialize extractor for Goldmines Telefilms channel
        
        Args:
            max_videos: Maximum number of videos to extract
        """
        self.channel_url = "https://www.youtube.com/@GoldminesTelefilms/videos"
        self.max_videos = max_videos
        
        # Optimized options for this specific channel
        self.channel_opts = {
            "quiet": True,
            "skip_download": True,
            "ignoreerrors": True,
            "extract_flat": True,
            "playlistend": max_videos,
            "sleep_interval": 2,
            "sleep_interval_requests": 1,
            "no_warnings": True,  # Suppress warnings for cleaner output
        }
        
        # Video extraction options with better rate limiting
        self.video_opts = {
            "quiet": True,
            "skip_download": True,
            "ignoreerrors": True,
            "no_warnings": True,
            "extract_flat": False,
            "sleep_interval": 3,
            "sleep_interval_requests": 2,
            "max_sleep_interval": 15,
        }
    
    def get_video_list(self):
        """Get list of videos from the channel"""
        print("Fetching video list from Goldmines Telefilms channel...")
        
        with YoutubeDL(self.channel_opts) as ydl:
            try:
                info = ydl.extract_info(self.channel_url, download=False)
                
                if not info:
                    print("No data received from channel")
                    return []
                
                videos = []
                if 'entries' in info:
                    for entry in info['entries']:
                        if entry and 'id' in entry:
                            video_url = f"https://www.youtube.com/watch?v={entry['id']}"
                            videos.append({
                                'url': video_url,
                                'id': entry['id'],
                                'title': entry.get('title', 'Unknown'),
                            })
                
                print(f"Found {len(videos)} videos")
                return videos
                
            except Exception as e:
                print(f"Error fetching channel: {e}")
                return []
    
    def extract_video_with_retry(self, video_info, retry_count=0):
        """
        Extract a single video with retry logic
        
        Args:
            video_info: Dictionary with video URL and ID
            retry_count: Current retry attempt
        """
        max_retries = 2
        
        # Progressive delay: 3s first, 10s second, 30s third attempt
        base_delay = [3, 10, 30][min(retry_count, 2)]
        jitter = random.uniform(0.5, 2.0)
        delay = base_delay * jitter
        
        print(f"  Waiting {delay:.1f}s before request (attempt {retry_count + 1})...")
        time.sleep(delay)
        
        try:
            with YoutubeDL(self.video_opts) as ydl:
                info = ydl.extract_info(video_info['url'], download=False)
                
                if not info:
                    print(f"  No data for video {video_info['id']}")
                    return None
                
                video_data = {
                    'youtube_id': info.get('id'),
                    'title': info.get('title', 'Unknown Title'),
                    'duration': info.get('duration', 0),
                    'tags': info.get('tags', []),
                    'images': {
                        'thumbnail': info.get('thumbnail'),
                        'thumbnails': info.get('thumbnails', [])
                    }
                }
                
                print(f"  ✓ Extracted: {video_data['title'][:60]}...")
                return video_data
                
        except Exception as e:
            error_msg = str(e).lower()
            
            # Check for rate limiting
            if any(keyword in error_msg for keyword in ['rate', '429', 'unavailable', 'limit']):
                if retry_count < max_retries:
                    print(f"  Rate limited. Retrying in {delay*2:.1f}s...")
                    return self.extract_video_with_retry(video_info, retry_count + 1)
                else:
                    print(f"  Max retries exceeded for video {video_info['id']}")
                    return None
            else:
                print(f"  Error extracting video {video_info['id']}: {str(e)[:100]}")
                return None
    
    def extract_channel_data(self):
        """Main method to extract all channel data"""
        print("=" * 60)
        print("Goldmines Telefilms Channel Extractor")
        print("=" * 60)
        
        # Step 1: Get video list
        videos = self.get_video_list()
        
        if not videos:
            print("No videos found. Exiting.")
            return []
        
        print(f"\nStarting extraction of {len(videos)} videos...")
        print("-" * 60)
        
        all_data = []
        processed = 0
        successful = 0
        
        # Step 2: Extract each video with delays
        for i, video in enumerate(videos):
            processed += 1
            print(f"\n[{i+1}/{len(videos)}] Processing: {video['title'][:50]}...")
            print(f"  Video ID: {video['id']}")
            
            video_data = self.extract_video_with_retry(video)
            
            if video_data:
                all_data.append(video_data)
                successful += 1
                print(f"  Status: SUCCESS (Total: {successful})")
            else:
                print(f"  Status: FAILED")
            
            # Save progress every 5 videos
            if (i + 1) % 5 == 0:
                self._save_progress(all_data, i + 1)
                
                # Longer break every 10 videos
                if (i + 1) % 10 == 0:
                    long_break = random.uniform(15, 25)
                    print(f"\n{'-'*40}")
                    print(f"Taking extended break: {long_break:.1f} seconds")
                    print(f"{'-'*40}")
                    time.sleep(long_break)
        
        # Final save
        self._save_final_results(all_data)
        
        # Summary
        print("\n" + "=" * 60)
        print("EXTRACTION COMPLETE")
        print("=" * 60)
        print(f"Total videos processed: {processed}")
        print(f"Successfully extracted: {successful}")
        print(f"Failed: {processed - successful}")
        print(f"Success rate: {(successful/processed*100):.1f}%")
        
        if all_data:
            print(f"\nSample data from first video:")
            print(json.dumps(all_data[0], indent=2, ensure_ascii=False))
        
        return all_data
    
    def _save_progress(self, data, count):
        """Save progress to file"""
        filename = f'goldmines_progress_{count}.json'
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        print(f"  Progress saved to {filename}")
    
    def _save_final_results(self, data):
        """Save final results"""
        from datetime import datetime
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f'goldmines_telefilms_{timestamp}.json'
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump({
                'channel': 'Goldmines Telefilms',
                'url': self.channel_url,
                'extracted_at': timestamp,
                'total_videos': len(data),
                'videos': data
            }, f, ensure_ascii=False, indent=2)
        
        print(f"\nFinal results saved to {filename}")
        
        # Also create a CSV for easy viewing
        self._create_csv(data, timestamp)
    
    def _create_csv(self, data, timestamp):
        """Create CSV file for easy viewing"""
        import csv
        
        csv_filename = f'goldmines_telefilms_{timestamp}.csv'
        
        with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
            fieldnames = ['youtube_id', 'title', 'duration', 'tags_count', 'thumbnail']
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            
            writer.writeheader()
            for video in data:
                writer.writerow({
                    'youtube_id': video.get('youtube_id', ''),
                    'title': video.get('title', ''),
                    'duration': video.get('duration', 0),
                    'tags_count': len(video.get('tags', [])),
                    'thumbnail': video.get('images', {}).get('thumbnail', '')
                })
        
        print(f"CSV file created: {csv_filename}")

# Quick test function
def quick_test(limit=5):
    """Quick test with small number of videos"""
    print("Running quick test with 5 videos...")
    extractor = GoldminesTelefilmsExtractor(max_videos=limit)
    
    # Test getting video list only
    videos = extractor.get_video_list()
    
    if videos:
        print(f"\nFirst {min(3, len(videos))} videos found:")
        for i, video in enumerate(videos[:3]):
            print(f"{i+1}. {video['title'][:50]}... (ID: {video['id']})")
        
        # Test extracting first video
        print(f"\nTesting extraction of first video...")
        test_video = extractor.extract_video_with_retry(videos[0])
        
        if test_video:
            print("\n✓ Test successful!")
            print(f"Title: {test_video['title']}")
            print(f"Duration: {test_video['duration']} seconds")
            print(f"Tags: {len(test_video['tags'])} tags")
            print(f"Thumbnail: {test_video['images']['thumbnail'][:80]}...")
            return True
        else:
            print("\n✗ Test failed - might be rate limited")
            return False
    return False

# Batch extraction with resume capability
def batch_extract(batch_size=10, start_from=0):
    """
    Extract in batches with resume capability
    
    Args:
        batch_size: Number of videos per batch
        start_from: Starting index (for resuming)
    """
    extractor = GoldminesTelefilmsExtractor(max_videos=100)  # Get up to 100 videos
    videos = extractor.get_video_list()
    
    if not videos:
        return []
    
    # Slice videos for this batch
    end_idx = min(start_from + batch_size, len(videos))
    batch_videos = videos[start_from:end_idx]
    
    print(f"\nProcessing batch: videos {start_from+1} to {end_idx}")
    
    batch_data = []
    for i, video in enumerate(batch_videos):
        print(f"\n[{i+1}/{len(batch_videos)}] Processing: {video['title'][:50]}...")
        
        video_data = extractor.extract_video_with_retry(video)
        if video_data:
            batch_data.append(video_data)
        
        # Save batch results
        if (i + 1) % 5 == 0:
            filename = f'batch_{start_from+1}_to_{start_from+i+1}.json'
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(batch_data, f, ensure_ascii=False, indent=2)
    
    return batch_data



In [16]:
import pandas as pd

# Load JSON list from file
df = pd.read_json("../../resource/youtube_data copy.json")

# Flatten nested images field
df["thumbnail"] = df["images"].apply(lambda x: x.get("thumbnail"))
df = df.drop(columns=["images"])

# Add playlist_id column based on duration (< 15 minutes)
df["playlist_id"] = df["duration"].apply(
    lambda x: 3 if x < 900 else None  # None → MySQL NULL
)

# Save to CSV
df.to_csv("../../resource/youtube_goldmines_data.csv", index=False)

print(df)




        youtube_id                                              title  \
0      VgPBhn77pp0  मिर्नालिनी रवि अथर्व को चैंलेंजे करती है | Gad...   
1      6WpZVxGT8xY  सूर्या दिलीप ताहिल से मिलने आया l Khatarnak Kh...   
2      UmGPzTkBwjI  विजय को देख कर इस गुंडे की बोलती बंद हो गई | B...   
3      Abtxr8argbM  अजय जिस लड़की से प्यार करता है उसपर उसके बड़े ...   
4      wliG1P9RMnY  इस औरत को पूरा यकीन है कि उसकी बेटी आज भी जिंद...   
...            ...                                                ...   
11167  bJ7Fvju9DFs  The Return of Rebel 2 (Billa) Hindi Dubbed Ful...   
11168  TgTibdqWHTg  Indian Soldier Never On Holiday (Thupakki) Hin...   
11169  EMWM2uN8WCQ  Jawaan (2018) New Released Hindi Dubbed Full M...   
11170  1m-M62XN6kM  Apradhi Kaun (Dongala Mutha) 2018 Official Hin...   
11171  bi2ZGhdUnFE  Jamai Raja (Mappillai) Full Hindi Dubbed Movie...   

       duration                                          thumbnail  \
0           745  https://i.ytimg.com/vi/VgPBhn77pp0/m