# Preprocess movie data and ratings

In [1]:
# Import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import requests
import datetime
import threading
import re
import os
import time
import aiohttp
import asyncio

from tqdm.notebook import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from difflib import SequenceMatcher
from dotenv import load_dotenv

In [2]:
raw_movie_df = pd.read_csv("../data/raw/movies.csv")

In [3]:
load_dotenv()
api_key = os.getenv('API_KEY')

In [4]:
# # Test code
# url = "https://api.themoviedb.org/3/search/movie"
# headers = {
#     "Authorization": api_key,
#     "Accept": "application/json"
# }
# params = {
#     "query": 'Postman,+The'
# }
# response = requests.get(url, headers=headers, params=params)
# response.json()

## Create a link.csv file to link movieId to TMDb's movie id

In [5]:
# Semaphore to manage rate limits
semaphore = asyncio.Semaphore(39)

In [6]:
# parse_title_year(title) gets the title and year
def parse_title_year(title):
    match = re.match(r"^(.*)\((\d{4})\)$", title)
    if match:
        name = match.group(1).strip()
        year = int(match.group(2))
        clean_title = re.sub(r"\s*\(.*?\)", "", name).strip()
        return clean_title, year
    return title, None

# search_tmdb(title) searches TMDb API for title
async def search_tmdb(session, title, retries=3):
    url = "https://api.themoviedb.org/3/search/movie"
    headers = {
        "Authorization": api_key,
        "Accept": "application/json"
    }
    params = {
        "query": title
    }

    for attempt in range(retries):
        async with semaphore:
            try:
                async with session.get(url, headers=headers, params=params, timeout=5) as response:
                    if response.status == 200:
                        return await response.json()
                    elif response.status == 429:
                        tqdm.write(f"Rate limited. Sleeping... ({title})")
                        asyncio.sleep(2)
                    elif response.status >= 500:
                        tqdm.write(f"Server error {response.status} while querying {title}. Retrying...")
                    else:
                        tqdm.write(f"Client error {response.status} for {title}")
                        return None
            except asyncio.TimeoutError:
                tqdm.write(f"Timeout for {title}")
            except aiohttp.ClientError as e:
                tqdm.write(f"Connection error: {title} - {e}")
        await asyncio.sleep(1)
    return None

# is_in_range(release_date, target_year, tolerance=1) checks if release date is in some range
def is_in_range(release_date, target_year, tolerance=1):
    if not release_date:
        return False
    
    release_year = datetime.datetime.strptime(release_date, "%Y-%m-%d").year
    return (int(target_year) - tolerance) <= release_year <= (int(target_year) + tolerance)

# similar(a, b) checks the similarity of two strings
def similar(a, b):
    return SequenceMatcher(None, a.lower(), b.lower()).ratio()

# get_best_match(results, title, year) gets the best matching movie id from API results
def get_best_match(results, title, year):
    best = None
    best_score = 0

    for r in results:
        tmdb_title = r.get("title", "")
        release_date = r.get("release_date")

        if not release_date:
            continue
        
        score = 0
        
        # Title similarity
        score += similar(tmdb_title, title) * 0.7

        # Year match
        if year and is_in_range(release_date, year):
            score += 0.2

        # Popularity
        score += min(r.get("popularity", 0) / 1000, 0.1)

        if score > best_score:
            best = r
            best_score = score

    # if best_score < 0.7:
    #     return None
        
    return best["id"] if best else None

# get_tmdb_id(title, year) searches TMDb API then returns best matching movie id
async def get_tmdb_id(session, title, year):
    data = await search_tmdb(session, title)

    if not data or "results" not in data:
        return None

    return get_best_match(data["results"], title, year)

# process_row(row) process each row in the movie table
async def process_row(session, row):
    raw_title = row["title"]
    movie_id = row["movieId"]

    title, year = parse_title_year(raw_title)
    tmdb_id = await get_tmdb_id(session, title, year)

    if tmdb_id:
        tmdb_id = int(tmdb_id)
    
    return {
        "movieId": movie_id,
        "tmdbId": tmdb_id
    }

In [7]:
# create_link(df) creates the link between movieId and tmdbId
async def create_link():
    async with aiohttp.ClientSession() as session:
        tasks = [process_row(session, row) for _, row in raw_movie_df.iterrows()]

        results = []
        for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
            result = await coro
            results.append(result)

        return results

In [8]:
# results = await create_link()

# links_df = pd.DataFrame(results)
# links_df.sort_values(by='movieId', inplace=True)
# links_df['tmdbId'] = links_df['tmdbId'].astype('Int64')
# links_df.to_csv("../data/processed/links.csv", index=False)

In [9]:
links_df = pd.read_csv("../data/processed/links.csv")
links_df['tmdbId'] = links_df['tmdbId'].astype('Int64')

In [10]:
movie_df = pd.merge(links_df, raw_movie_df, on='movieId')
movie_df.dropna(inplace=True)
movie_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 61491 entries, 0 to 62422
Data columns (total 4 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   movieId  61491 non-null  int64 
 1   tmdbId   61491 non-null  Int64 
 2   title    61491 non-null  object
 3   genres   61491 non-null  object
dtypes: Int64(1), int64(1), object(2)
memory usage: 2.4+ MB


## Enrich movie attributes

In [11]:
movie_df['title'] = ""
movie_df['genres'] = ""
movie_df['keywords'] = ""
movie_df['overview'] = ""
movie_df['directors'] = ""
movie_df['actors'] = ""
movie_df['poster_path'] = ""
movie_df

Unnamed: 0,movieId,tmdbId,title,genres,keywords,overview,directors,actors,poster_path
0,1,862,,,,,,,
1,2,8844,,,,,,,
2,3,15602,,,,,,,
3,4,31357,,,,,,,
4,5,11862,,,,,,,
...,...,...,...,...,...,...,...,...,...
62418,209157,499546,,,,,,,
62419,209159,1218645,,,,,,,
62420,209163,553036,,,,,,,
62421,209169,634868,,,,,,,


In [15]:
# get_tmdb_movie(session, row, retries=3) gets data from tmdb api
async def get_tmdb_movie(session, id, retries=3):
    
    url = f"https://api.themoviedb.org/3/movie/{id}"
    headers = {
        "Authorization": api_key,
        "Accept": "application/json"
    }
    params = {
        "append_to_response": "credits,keywords"
    }

    for attempt in range(retries):
        async with semaphore:
            try:
                async with session.get(url, headers=headers, params=params, timeout=5) as response:
                    if response.status == 200:
                        return await response.json()
                    elif response.status == 429:
                        tqdm.write(f"Rate limited. Sleeping... ({id})")
                        await asyncio.sleep(2)
                    elif response.status >= 500:
                        tqdm.write(f"Server error {response.status} while querying {id}. Retrying...")
                    else:
                        tqdm.write(f"Client error {response.status} for {id}")
                        return None
            except asyncio.TimeoutError:
                tqdm.write(f"Timeout for {id}")
            except aiohttp.ClientError as e:
                tqdm.write(f"Connection error: {id} - {e}")
        await asyncio.sleep(1)

# extract_data(data) extracts data from api call and format it
def extract_data(data):
    if not data:
        return None

    return {
        "title": data.get("title"),
        "genres": [g["name"] for g in data.get("genres", [])],
        "keywords": [k["name"] for k in data.get("keywords", {}).get("keywords", [])],
        "overview": data.get("overview"),
        "directors": [
            d["name"] for d in data.get("credits", {}).get("crew", [])
            if d.get("job") == "Director"
        ],
        "actors": [
            f"{a['name']} -> {a['character']}"
            for a in data.get("credits", {}).get("cast", [])[:5]
        ],
        "poster_path": data.get("poster_path")
    }

# process_movie(session, row) 
async def process_movie(session, row):
    tmdb_id = str(row["tmdbId"])

    data = await get_tmdb_movie(session, tmdb_id)
    movie_info = extract_data(data)

    if not movie_info:
        return None

    return {
        "movieId": row["movieId"],
        "tmdbId": tmdb_id,
        "title": movie_info["title"],
        "genres": "|".join(movie_info["genres"]),
        "keywords": "|".join(movie_info["keywords"]),
        "overview": movie_info["overview"],
        "directors": "|".join(movie_info["directors"]),
        "actors": "|".join(movie_info["actors"]),
        "poster_path": movie_info["poster_path"]
    }
    
# enrich_data(movie_df) returns the enriched dataframe
async def enrich_data(movie_df):
    results = []
    async with aiohttp.ClientSession() as session:
        tasks = [process_movie(session, row) for _, row in movie_df.iterrows()]
        for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
            result = await coro
            if result:
                results.append(result)

    return pd.DataFrame(results)

In [16]:
enriched_df = await enrich_data(movie_df)
enriched_df.to_csv("../data/processed/movies_enriched.csv", index=False)

  0%|          | 0/61491 [00:00<?, ?it/s]

Client error 404 for 1472855
Client error 404 for 1638573


In [19]:
enriched_df = pd.read_csv("../data/processed/movies_enriched.csv")
enriched_df.info()
enriched_df.dropna(inplace=True)
enriched_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 61489 entries, 0 to 61488
Data columns (total 9 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   movieId      61489 non-null  int64 
 1   tmdbId       61489 non-null  int64 
 2   title        61489 non-null  object
 3   genres       60713 non-null  object
 4   keywords     46691 non-null  object
 5   overview     61152 non-null  object
 6   directors    61145 non-null  object
 7   actors       59418 non-null  object
 8   poster_path  60369 non-null  object
dtypes: int64(2), object(7)
memory usage: 4.2+ MB
<class 'pandas.core.frame.DataFrame'>
Index: 45124 entries, 2 to 61487
Data columns (total 9 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   movieId      45124 non-null  int64 
 1   tmdbId       45124 non-null  int64 
 2   title        45124 non-null  object
 3   genres       45124 non-null  object
 4   keywords     45124 non-null  object
 