In [None]:
!pip install -q pyarrow pandas requests tqdm ratelimit

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for ratelimit (setup.py) ... [?25l[?25hdone


In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
from IPython.display import display, Javascript
display(Javascript('''
function KeepAlive(){
    console.log("Sending keep-alive");
    google.colab.kernel.proxyPort(0, {})
}
setInterval(KeepAlive, 30000);
'''))

<IPython.core.display.Javascript object>

In [None]:
import os
import pandas as pd
import requests
import json
import time
import logging
import numpy as np
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from ratelimit import limits, sleep_and_retry
from IPython.display import clear_output
import pyarrow.parquet as pq

In [None]:
TMDB_API_KEY = "7541ae50874f0648cc65ac72416a9430"
os.environ["TMDB_API_KEY"] = TMDB_API_KEY

INPUT_CSV = "/content/drive/MyDrive/Colab/BigData/All_IDs.csv"
OUTPUT_DIR = "/content/drive/MyDrive/Colab/BigData"
os.makedirs(OUTPUT_DIR, exist_ok=True)

MAX_REQUESTS = 40
PERIOD_SECONDS = 10

BATCH_SIZE = 10000
SAVE_EVERY = 5

In [None]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f"{OUTPUT_DIR}/tmdb_fetcher.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger()
logger.info("Logger initialized")

In [None]:
class RateLimitedSession(requests.Session):
    def __init__(self, max_requests=MAX_REQUESTS, period=PERIOD_SECONDS):
        super().__init__()
        self.max_requests = max_requests
        self.period = period
        self.request_times = []

        retry_strategy = Retry(
            total=5,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=100, pool_maxsize=100)
        self.mount("https://", adapter)

    def request(self, method, url, **kwargs):
        current_time = time.time()
        self.request_times = [t for t in self.request_times if t > current_time - self.period]

        if len(self.request_times) >= self.max_requests:
            sleep_time = self.period - (current_time - self.request_times[0])
            if sleep_time > 0:
                logger.warning(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds")
                time.sleep(sleep_time)

        self.request_times.append(time.time())
        return super().request(method, url, **kwargs)

In [None]:
def fetch_movie_details(session, movie_id):
    """Fetch complete movie data with rate limiting"""
    try:
        response = session.get(
            f"https://api.themoviedb.org/3/movie/{movie_id}",
            params={
                'api_key': os.environ["TMDB_API_KEY"],
                'append_to_response': 'credits'
            },
            timeout=30
        )
        response.raise_for_status()
        return movie_id, response.json()
    except requests.HTTPError as e:
        if e.response.status_code == 404:
            logger.debug(f"Movie ID {movie_id} not found (404)")
        else:
            logger.warning(f"HTTP error for movie {movie_id}: {e.response.status_code}")
    except Exception as e:
        logger.error(f"Error fetching movie {movie_id}: {str(e)}")
    return movie_id, None

def process_movie_data(raw_data):
    """Transform raw API response into structured data"""
    if not raw_data:
        return None

    movie = raw_data
    credits = movie.get('credits', {})

    crew_roles = {
        'director': [],
        'director_of_photography': [],
        'writers': [],
        'producers': [],
        'music_composer': []
    }

    for person in credits.get('crew', []):
        job = person.get('job', '').lower()
        department = person.get('department', '').lower()
        name = person.get('name')

        if not name:
            continue

        if 'director' in job:
            crew_roles['director'].append(name)
        elif 'director of photography' in job or 'cinematographer' in job:
            crew_roles['director_of_photography'].append(name)
        elif 'writer' in job or department == 'writing':
            crew_roles['writers'].append(name)
        elif 'producer' in job:
            crew_roles['producers'].append(name)
        elif 'composer' in job or (department == 'sound' and 'music' in job):
            crew_roles['music_composer'].append(name)

    # Cast extraction (top 20)
    cast = [p['name'] for p in credits.get('cast', [])[:20]]

    # Structured data
    return {
        'id': movie.get('id'),
        'title': movie.get('title'),
        'original_title': movie.get('original_title'),
        'release_date': movie.get('release_date'),
        'runtime': movie.get('runtime'),
        'budget': movie.get('budget'),
        'revenue': movie.get('revenue'),
        'popularity': movie.get('popularity'),
        'vote_average': movie.get('vote_average'),
        'vote_count': movie.get('vote_count'),
        'imdb_id': movie.get('imdb_id'),
        'genres': ', '.join([g['name'] for g in movie.get('genres', [])]),
        'production_companies': ', '.join([c['name'] for c in movie.get('production_companies', [])]),
        'production_countries': ', '.join([c['name'] for c in movie.get('production_countries', [])]),
        'spoken_languages': ', '.join([l['name'] for l in movie.get('spoken_languages', [])]),
        'cast': ', '.join(cast),
        'director': ', '.join(set(crew_roles['director'])),
        'director_of_photography': ', '.join(set(crew_roles['director_of_photography'])),
        'writers': ', '.join(set(crew_roles['writers'])),
        'producers': ', '.join(set(crew_roles['producers'])),
        'music_composer': ', '.join(set(crew_roles['music_composer']))
    }


In [None]:
def process_batch(movie_ids, batch_num, total_batches, checkpoint_file):
    """Process a batch of movie IDs"""
    logger.info(f"Starting batch {batch_num+1}/{total_batches} with {len(movie_ids)} movies")

    results = []
    session = RateLimitedSession()
    success_count = 0
    failure_count = 0

    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = {executor.submit(fetch_movie_details, session, mid): mid for mid in movie_ids}

        for future in tqdm(
            as_completed(futures),
            total=len(movie_ids),
            desc=f"Batch {batch_num+1}/{total_batches}"
        ):
            movie_id, raw_data = future.result()
            if raw_data:
                processed = process_movie_data(raw_data)
                if processed:
                    results.append(processed)
                    success_count += 1
                else:
                    failure_count += 1
            else:
                failure_count += 1

    if results:
        df = pd.DataFrame(results)
        output_path = f"{OUTPUT_DIR}/batch_{batch_num}.parquet"
        df.to_parquet(output_path)
        logger.info(f"Saved batch {batch_num+1} with {len(df)} movies to {output_path}")

    with open(checkpoint_file, 'w') as f:
        json.dump({"last_batch": batch_num}, f)

    logger.info(f"Batch {batch_num+1} complete. Success: {success_count}, Failures: {failure_count}")
    return success_count


In [None]:
def main():
    logger.info(f"Loading movie IDs from {INPUT_CSV}")
    id_df = pd.read_csv(INPUT_CSV)
    all_movie_ids = id_df['id'].astype(int).tolist()
    total_ids = len(all_movie_ids)
    logger.info(f"Loaded {total_ids:,} movie IDs")

    total_batches = (total_ids + BATCH_SIZE - 1) // BATCH_SIZE
    checkpoint_file = f"{OUTPUT_DIR}/checkpoint.json"

    start_batch = 0
    try:
        if os.path.exists(checkpoint_file):
            with open(checkpoint_file, 'r') as f:
                checkpoint = json.load(f)
            start_batch = checkpoint.get("last_batch", 0) + 1
            logger.info(f"Resuming from batch {start_batch}")
    except:
        logger.warning("Could not read checkpoint file. Starting from beginning")

    processed_count = 0
    for batch_num in range(start_batch, total_batches):
        start_idx = batch_num * BATCH_SIZE
        end_idx = min((batch_num + 1) * BATCH_SIZE, total_ids)
        batch_ids = all_movie_ids[start_idx:end_idx]

        count = process_batch(batch_ids, batch_num, total_batches, checkpoint_file)
        processed_count += count

        # Progress report
        clear_output(wait=True)
        progress = (end_idx / total_ids) * 100
        logger.info(f"Progress: {end_idx:,}/{total_ids:,} IDs ({progress:.1f}%)")
        logger.info(f"Processed: {processed_count:,} successful records")

        # Save checkpoint every N batches
        if batch_num % SAVE_EVERY == 0:
            with open(checkpoint_file, 'w') as f:
                json.dump({"last_batch": batch_num}, f)

    logger.info("All batches processed successfully!")

    # Combine all batches into a single file
    logger.info("Combining all batches...")
    all_files = [f"{OUTPUT_DIR}/batch_{i}.parquet" for i in range(total_batches)]
    full_df = pd.concat([pd.read_parquet(f) for f in all_files if os.path.exists(f)])
    full_output = f"{OUTPUT_DIR}/full_tmdb_dataset.parquet"
    full_df.to_parquet(full_output)
    logger.info(f"Saved complete dataset to {full_output} ({len(full_df):,} records)")

if __name__ == "__main__":
    main()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Batch 43/43: 100%|██████████| 6847/6847 [24:20<00:00,  4.69it/s]
