# Milestone 1: Data Collection

## Requirements

- **Data Source**: movie data was scraped by using TMDb API
- **Dataset Attributes**:
  - Movie title
  - Genre(s)
  - Release year
  - IMDb rating
  - Number of user votes
  - Movie description
  - Poster URL

In [1]:
import csv
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Optional
import requests
#Constants

API_KEY = "tmdb-api-key"  # Replace with a valid TMDb API key
BASE_URL = "https://api.themoviedb.org/3"
OUTPUT_FILENAME = "top20k_movies_project2.csv"
MAX_TOTAL_MOVIES = 20000

# Year-based configuration
START_YEAR = 1935
END_YEAR = 2024

# Adjust these values for different retrieval strategies
DEFAULT_MAX_PAGES_PER_YEAR = 100
DEFAULT_MIN_VOTE_COUNT = 100
RECENT_YEARS = [2024, 2023, 2022, 2021, 2020]
RECENT_MAX_PAGES_PER_YEAR = 500
RECENT_MIN_VOTE_COUNT = 80
MAX_WORKERS_YEAR = 5    # Threads used to process multiple years concurrently
MAX_WORKERS_PAGE = 10   # Threads used to fetch pages within a year concurrently
MAX_WORKERS_CREDITS = 10  # Threads used to fetch credits concurrently

# Retry logic configuration
RETRY_LIMIT = 3
RETRY_BACKOFF = 2  # seconds

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("movie_fetcher.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def fetch_json(url: str, params: Dict) -> Optional[Dict]:
    for attempt in range(1, RETRY_LIMIT + 1):
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            logger.warning(
                f"Attempt {attempt} failed for URL: {url} with params: {params}. Error: {e}"
            )
            if attempt < RETRY_LIMIT:
                sleep_time = RETRY_BACKOFF ** attempt
                logger.info(f"Retrying after {sleep_time} seconds...")
                time.sleep(sleep_time)
            else:
                logger.error(
                    f"All {RETRY_LIMIT} attempts failed for URL: {url} with params: {params}."
                )
    return None


def fetch_genres() -> Dict[int, str]:
    url = f"{BASE_URL}/genre/movie/list"
    params = {"api_key": API_KEY, "language": "en-US"}
    data = fetch_json(url, params)
    if data and "genres" in data:
        genre_mapping = {genre["id"]: genre["name"] for genre in data["genres"]}
        logger.info(f"Fetched {len(genre_mapping)} genres.")
        return genre_mapping

    logger.error("Failed to fetch genres.")
    return {}


def fetch_movies_by_page(endpoint: str, page: int, filters: Optional[Dict] = None) -> Optional[Dict]:
    url = f"{BASE_URL}/{endpoint}"
    params = {
        "api_key": API_KEY,
        "language": "en-US",
        "page": page,
    }
    if filters:
        params.update(filters)
    return fetch_json(url, params)


def fetch_movie_credits(movie_id: int) -> Optional[Dict]:
    url = f"{BASE_URL}/movie/{movie_id}/credits"
    params = {"api_key": API_KEY, "language": "en-US"}
    return fetch_json(url, params)


def process_movie(movie: Dict, genres: Dict[int, str], unique_titles: set, min_vote_count: int) -> Optional[Dict]:
    title = movie.get("title")
    vote_count = movie.get("vote_count", 0)

    # Filter based on title uniqueness and minimum vote count
    if not title or title in unique_titles or vote_count < min_vote_count:
        return None

    unique_titles.add(title)

    movie_genres = [genres.get(g_id, "Unknown") for g_id in movie.get("genre_ids", [])]
    release_date = movie.get("release_date", "N/A")
    release_year = release_date.split("-")[0] if release_date != "N/A" else "N/A"
    poster_path = movie.get("poster_path")
    poster_url = f"https://image.tmdb.org/t/p/w500{poster_path}" if poster_path else "N/A"

    return {
        "id": movie.get("id"),
        "Title": title,
        "Genres": ", ".join(movie_genres),
        "Release Year": release_year,
        "IMDb Rating": movie.get("vote_average"),
        "Number of Votes": vote_count,
        "Description": movie.get("overview", "N/A"),
        "Poster URL": poster_url,
    }


def fetch_movies_for_year(year: int, genres: Dict[int, str], max_movies: int) -> List[Dict]:
    movies = []
    unique_titles = set()

    # Determine parameters based on whether the year is in the recent set
    if year in RECENT_YEARS:
        max_pages = RECENT_MAX_PAGES_PER_YEAR
        min_votes = RECENT_MIN_VOTE_COUNT
    else:
        max_pages = DEFAULT_MAX_PAGES_PER_YEAR
        min_votes = DEFAULT_MIN_VOTE_COUNT

    filters = {
        "primary_release_year": year,
        "sort_by": "vote_average.desc",
    }

    #fetch multiple pages of movie data
    with ThreadPoolExecutor(max_workers=MAX_WORKERS_PAGE) as executor:
        future_to_page = {
            executor.submit(fetch_movies_by_page, "discover/movie", page, filters): page
            for page in range(1, max_pages + 1)
        }

        for future in as_completed(future_to_page):
            page = future_to_page[future]
            data = future.result()
            if not data or "results" not in data:
                logger.warning(f"No data returned for year {year}, page {page}")
                continue

            for movie_data in data["results"]:
                processed = process_movie(movie_data, genres, unique_titles, min_votes)
                if processed:
                    movies.append(processed)
                    if len(movies) >= max_movies:
                        logger.debug(f"Reached max_movies limit: {max_movies} for year {year}")
                        return movies

    logger.info(f"Fetched {len(movies)} movies for year {year}.")
    return movies


def fetch_top_movies(genres: Dict[int, str], max_total_movies: int) -> List[Dict]:
    movies = []
    years = range(END_YEAR, START_YEAR - 1, -1)  # Start from the most recent year

    with ThreadPoolExecutor(max_workers=MAX_WORKERS_YEAR) as executor:
        future_to_year = {
            executor.submit(fetch_movies_for_year, year, genres, max_total_movies): year
            for year in years
        }

        for future in as_completed(future_to_year):
            year = future_to_year[future]
            try:
                year_movies = future.result()
                movies.extend(year_movies)
                logger.info(f"Total movies collected so far: {len(movies)}")

                if len(movies) >= max_total_movies:
                    logger.info(f"Reached the maximum limit of {max_total_movies} movies.")
                    break
            except Exception as e:
                logger.error(f"Error fetching movies for year {year}: {e}")

    logger.info(f"Total movies fetched: {len(movies)}")
    return movies[:max_total_movies]


def fetch_and_add_credits(movies: List[Dict], max_workers: int = MAX_WORKERS_CREDITS) -> None:
    logger.info("Starting to fetch credits for each movie...")
    start_time = time.time()
    total_movies = len(movies)
    completed = 0

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_movie = {executor.submit(fetch_movie_credits, m["id"]): m for m in movies}

        for future in as_completed(future_to_movie):
            movie = future_to_movie[future]
            try:
                credits = future.result()
                if credits:
                    directors = [member["name"] for member in credits.get("crew", []) if member.get("job") == "Director"]
                    actors = [member["name"] for member in credits.get("cast", [])[:3]]
                else:
                    directors = []
                    actors = []

                movie["Director"] = ", ".join(directors) if directors else "N/A"
                movie["Cast"] = ", ".join(actors) if actors else "N/A"
            except Exception as e:
                logger.error(f"Error fetching credits for movie ID {movie['id']}: {e}")
                movie["Director"] = "N/A"
                movie["Cast"] = "N/A"
            finally:
                completed += 1
                # Log progress every 100 movies or at the end
                if completed % 100 == 0 or completed == total_movies:
                    logger.info(f"Fetched credits for {completed}/{total_movies} movies.")

    elapsed_time = time.time() - start_time
    logger.info(f"Completed fetching credits for all movies in {elapsed_time:.2f} seconds.")


def save_to_csv(movies: List[Dict], filename: str = OUTPUT_FILENAME) -> None:
    if not movies:
        logger.warning("No movies to save.")
        return

    fieldnames = [
        "Title",
        "Genres",
        "Release Year",
        "IMDb Rating",
        "Number of Votes",
        "Description",
        "Poster URL",
        "Director",
        "Cast"
    ]

    try:
        with open(filename, "w", newline="", encoding="utf-8") as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            for movie in movies:
                # Exclude 'id' and ensure all fields are present
                row = {field: movie.get(field, "N/A") for field in fieldnames}
                writer.writerow(row)
        logger.info(f"Saved {len(movies)} movies to '{filename}'.")
    except IOError as e:
        logger.error(f"Failed to write to CSV file {filename}: {e}")


def main():
    """
    Main execution function:
      1. Fetch genre mappings.
      2. Fetch top movies across the defined years.
      3. Fetch credits (Director and Cast) for each movie.
      4. Save the final dataset into a CSV file.
    """
    logger.info("Starting movie data fetching process...")
    start_time = time.time()

    genres = fetch_genres()
    if not genres:
        logger.error("Cannot proceed without genre data.")
        return

    # Fetch top movies 
    movies = fetch_top_movies(genres, MAX_TOTAL_MOVIES)
    # Fetch credits for Director and Cast info
    fetch_and_add_credits(movies)
    # Save results to CSV
    save_to_csv(movies)

    elapsed_time = time.time() - start_time
    logger.info(f"Execution time: {elapsed_time:.2f} seconds.")


if __name__ == "__main__":
    main()


2024-12-09 18:28:33,201 [INFO] Starting movie data fetching process...
2024-12-09 18:28:33,525 [INFO] Fetched 19 genres.
2024-12-09 18:28:51,480 [INFO] Fetched 743 movies for year 2022.
2024-12-09 18:28:51,501 [INFO] Total movies collected so far: 743
2024-12-09 18:28:51,639 [INFO] Fetched 335 movies for year 2024.
2024-12-09 18:28:51,651 [INFO] Total movies collected so far: 1078
2024-12-09 18:28:51,791 [INFO] Fetched 623 movies for year 2023.
2024-12-09 18:28:51,804 [INFO] Total movies collected so far: 1701
2024-12-09 18:28:51,825 [INFO] Fetched 733 movies for year 2020.
2024-12-09 18:28:51,834 [INFO] Total movies collected so far: 2434
2024-12-09 18:28:51,911 [INFO] Fetched 774 movies for year 2021.
2024-12-09 18:28:51,917 [INFO] Total movies collected so far: 3208
2024-12-09 18:28:55,326 [INFO] Fetched 37 movies for year 2019.
2024-12-09 18:28:55,328 [INFO] Total movies collected so far: 3245
2024-12-09 18:28:55,585 [INFO] Fetched 22 movies for year 2018.
2024-12-09 18:28:55,590 [