# TFM Final Project <img style="display: inline; align-right: 250px; position: absolute; right: 0px;" src="files/Logo-AIT-Red600x-8.webp" width="130"/>

👉 Berta Pfaff</br>
👉 Sergio Salvador</br>
👉 Francesc Vilaró</br>

# Project motivation

**Movie recommendation systems have become increasingly popular in recent years** due to the vast amount of movies available for viewers to watch. With the rise of streaming services like Netflix, Hulu, and Amazon Prime, **it has become harder for viewers to decide which movie to watch**, given the plethora of options available.

The proposed project aims to **build a movie recommender system using Python, leveraging the TMDB API to fetch movie metadata from 1980 until 2023**. The TMDB (The Movie Database) is an online database that provides comprehensive information related to movies, TV shows, and other forms of visual media. It is a community-driven platform that is curated and maintained by a team of editors and contributors who gather information from various sources, such as film studios, production companies, and fan communities, among others. The TMDB API (Application Programming Interface) provides developers with access to this wealth of data, allowing them to retrieve and use movie and TV show metadata in their own applications and projects.

The project will consist of three main parts:

1. **Database creation**: An Entity-Relationship model (ERM) will be first created, defining the entities, attributes, and relationships that need to be stored in the database. The next step is to create tables that correspond to the entities and attributes identified in the previous step. Each table should contain columns for the various attributes, along with appropriate data types and constraints.


2. **Data fetching**: Asynchronous calls to the TMDB API will be used to fetch movie data, such as title, genre, release date, and ratings, among others. Traditional methods such as synchronous API requests would be to slow and inefficient for this aplication. Finally, data will be stored in a mySQL database, allowing for high performance and fast access.


3. **EDA (Exploratory Data Analysis)**: Data visualization techniques will be applied to explore and understand the data. Graphs, charts, and histograms will be used to identify trends, patterns, and outliers, which can help improve the model's accuracy and obtain an overall understanding of the movie market in the last 40 years.


4. **Recommendation model**: A model will be built using advanced machine learning algorithms. The model will be trained on the movie metadata to generate personalized movie recommendations. the proposed model will utilize a complex algorithm that takes into consideration numerous variables, such as the movie's plot overview, the cast of actors, and the directors involved in its production, among other factors, to ultimately <u>recommend a set of five other movies that share similarities and patterns with the original movie</u>. This comprehensive approach not only provides a reliable and accurate way to suggest new movie options to viewers but also ensures that the recommended movies align with the viewer's preferences and tastes, resulting in a highly personalized and satisfying viewing experience.

Overall, this project aims to provide a convenient and personalized movie recommendation system that can help users discover new movies they will enjoy. Additionally, the project will also provide an opportunity to learn and apply several data science techniques learned throughout the master's degree, such as data retrieval, data visualization, and machine learning.

# Imported libraries

In [None]:
import mysql.connector
import pandas as pd
import warnings
import random
import requests
import time
from IPython.display import clear_output
import csv
from datetime import date, timedelta
from tqdm import tqdm
import ast
import asyncio
import aiohttp
import os
import json

## MySQL Workbench Database creation

### Connection to MySQL server

In [None]:
warnings.filterwarnings("ignore") # Warnings are disabled

db = mysql.connector.connect(
    host = "localhost",
    user = "root",
    password= "12345"
)

### Cursor object instantiation

In [None]:
cursor = db.cursor()

### Movies database creation

In [None]:
cursor.execute("CREATE DATABASE TMDB")
db.commit()

### Conection to TMDB database

In [None]:
#Connections
warnings.filterwarnings("ignore")
db = mysql.connector.connect(
    host = "localhost",
    user = "root",
    password= "12345",
    database= "TMDB"
)

## Entity-Relationship model (ERM)

<img src="files/DB_ERM_v2.png" style="width:700px">

## Table creation

### Primary table creation

In [None]:
cursor = db.cursor()
query_movie = """
    CREATE TABLE movie(
        id_movie int,
        original_title varchar(300),
        original_language varchar(20), 
        overview varchar(1000),
        popularity float,
        poster_path varchar(200),
        release_date varchar(20),
        title varchar(300),
        vote_average float,
        vote_count int,
        budget int, 
        revenue bigint,
        runtime int,
        PRIMARY KEY (id_movie)
    )
"""

cursor.execute(query_movie)
db.commit()

In [None]:
cursor = db.cursor()

### Secondary tables

#### Streaming companies

##### str_comp table creation

In [None]:
query_str_comp = """
    CREATE TABLE str_comp (
    id_str_comp int AUTO_INCREMENT,
    name varchar(50),
    PRIMARY KEY (id_str_comp)
)
"""

cursor.execute(query_str_comp)
db.commit()

##### movie_str_comp junction table creation

In [None]:
query_movie_str_comp = """
    CREATE TABLE movie_str_comp (
    id_mov_str_comp int AUTO_INCREMENT,
    id_movie int,
    id_str_comp int,
    PRIMARY KEY (id_mov_str_comp),
    FOREIGN KEY (id_movie) REFERENCES movie(id_movie),
    FOREIGN KEY (id_str_comp) REFERENCES str_comp(id_str_comp)
)
"""

cursor.execute(query_movie_str_comp)
db.commit()

#### Production companies

##### prod_comp table creation

In [None]:
query_prod_comp = """
    CREATE TABLE prod_comp ( 
         id_prod_comp int NOT NULL,
         name varchar (100),
         origin_country varchar (20),

         PRIMARY KEY (id_prod_comp)
      )
"""
cursor.execute(query_prod_comp)
db.commit()

##### movie_prod_comp junction table creation

In [None]:
query_movie_prod_comp = """
    CREATE TABLE movie_prod_comp (
        id_mov_prod_comp int AUTO_INCREMENT,
        id_movie int,
        id_prod_comp int,
        
        PRIMARY KEY (id_mov_prod_comp),
        FOREIGN KEY (id_movie) REFERENCES movie(id_movie),
        FOREIGN KEY (id_prod_comp) REFERENCES prod_comp(id_prod_comp)
      )
"""
cursor.execute(query_movie_prod_comp)
db.commit()

#### Genres

##### genre table creation

In [None]:
query_genre = """
    CREATE TABLE genre ( 
         id_genre int NOT NULL,
         genre varchar (30),

         PRIMARY KEY(id_genre)
      )
"""
cursor.execute(query_genre)
db.commit()

##### movie_genre table creation

In [None]:
query_movie_genre = """
    CREATE TABLE movie_genre (
        id_mov_genre int AUTO_INCREMENT,
        id_movie int,
        id_genre int,

        PRIMARY KEY (id_mov_genre),
        FOREIGN KEY (id_movie) REFERENCES movie(id_movie),
        FOREIGN KEY (id_genre) REFERENCES genre(id_genre)
      )
"""
cursor.execute(query_movie_genre)
db.commit()

#### People

##### _person_ table creation

In [None]:
query_person = """
    CREATE TABLE person ( 
         id_person int NOT NULL,
         name varchar (50),
         gender int,

         PRIMARY KEY (id_person)
      )
"""
cursor.execute(query_person)
db.commit()

##### job table creation

In [None]:
query_job = """
    CREATE TABLE job (
        id_job int AUTO_INCREMENT,
        job_name varchar(50),
        
        PRIMARY KEY (id_job)
      )
"""
cursor.execute(query_job)
db.commit()

##### movie_person junction table creation

In [None]:
query_movie_person = """
    CREATE TABLE movie_person ( 
         id_mov_person int AUTO_INCREMENT,
         id_movie int,
         id_person int,
         id_job int,

         PRIMARY KEY (id_mov_person),
         FOREIGN KEY (id_movie) REFERENCES movie(id_movie),
         FOREIGN KEY (id_person) REFERENCES person(id_person),
         FOREIGN KEY (id_job) REFERENCES job(id_job)
      )
"""
cursor.execute(query_movie_person)
db.commit()

## API key

In [1]:
api_key = "ac6862efab2ddf803567630c9f474ab8"

## Independent tables generation

### Genres table

In [None]:
url_genres = "https://api.themoviedb.org/3/genre/movie/list"

query_params = {
                "api_key": api_key,
}

In [None]:
def get_genres(url_genres, query_params):
    """
    This function sends a request to the TMDB API and returns a list of tuples 
    containing the id and name of all the available genres.
    """
    response = requests.get(url_genres, query_params).json()
    
    genres = [(genre['id'], genre['name']) for genre in response['genres']]
    
    return genres

In [None]:
genres = get_genres(url_genres, query_params)
genres[0]

In [None]:
def populate_genre(genres):
    insert_query = """
    INSERT INTO genre
    (id_genre, genre)
    VALUES(%s, %s)
    """
    
    cursor.executemany(insert_query, genres)
    db.commit()

In [None]:
populate_genre(genres)

### Streaming Companies table

In [None]:
url_str_comps = "https://api.themoviedb.org/3/watch/providers/movie"

query_params_str_comps = {
                "api_key": api_key,
                "watch-region": "ES"
}

In [None]:
def get_str_comps(url_str_comps, query_params_str_comp):
    """
    This function sends a request to the TMDB API and returns a list of tuples 
    containing the id and name of all the available streaming companies. 
    """
    response = requests.get(url_str_comps, query_params_str_comps).json()
    str_comps = [(result['provider_id'], result['provider_name']) for result in response['results']]
    
    return str_comps

In [None]:
str_comps = get_str_comps(url_str_comps, query_params_str_comps)
str_comps[0]

In [None]:
def populate_str_comps(str_comps):
    insert_query = """
    INSERT INTO str_comp
    (id_str_comp, name)
    VALUES(%s, %s)
    """
    
    cursor.executemany(insert_query, str_comps)
    db.commit()

In [None]:
populate_str_comps(str_comps)

### Jobs table 

In [None]:
jobs = ['Actor', 'Director','Screenplay']

In [None]:
def populate_job(jobs):
    insert_query = """
    INSERT INTO job
    (job_name)
    VALUES(%s)
    """
    for job in jobs:
        cursor.execute(insert_query, [job])
    db.commit()

In [None]:
populate_job(jobs)

## Movies IDs

In [None]:
# parameters

start_year = 1980
end_year = 2023

url_discover = "https://api.themoviedb.org/3/discover/movie"

In [None]:
def month_range_dict(start_year, end_year):
    """
    Returns a dictionary that maps the first day of a month to the last day of the month, given a start and end year.
    """
    month_range = {}
    start_date = date(start_year, 1, 1)
    end_date = date(end_year, 12, 31)

    # Iterate over all months between start and end dates
    while start_date < end_date:
        year = start_date.year
        month = start_date.month
        last_day = (date(year, month, 1) + timedelta(days=32)).replace(day=1) - timedelta(days=1)
        month_range[start_date.strftime('%Y-%m-%d')] = last_day.strftime('%Y-%m-%d')
        start_date = last_day + timedelta(days=1)

    return month_range

In [None]:
def total_pages(start_date, end_date, api_key, url_discover):
    param = {'api_key': api_key,
             'primary_release_date.gte': start_date,
             'primary_release_date.lte': end_date,
             'page': 500}
    
    return requests.get(url_discover, param).json()['total_pages']

In [None]:
total_pages('1980-01-01', '2023-12-31', api_key, url_discover)

In [None]:
def params_generator(start_year, end_year, api_key, url_discover):
    params = []
    total_pag = 0
    
    for start_date, end_date in month_range_dict(start_year, end_year).items():
        
        total_pag = total_pages(start_date, end_date, api_key, url_discover)
        
        page = 1
        
        while page <= total_pag:
            params.append({
                "api_key": api_key,
                "primary_release_date.gte": start_date,
                "primary_release_date.lte": end_date,
                "page": page
            })
            page += 1
            
    if not os.path.exists('data'):
        os.makedirs('data')
        
    fieldnames = ['api_key', 'primary_release_date.gte', 'primary_release_date.lte', 'page']
    
    with open(f'data/discover_params_{start_year}_{end_year}.csv', 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for param in params:
            writer.writerow(param)
        
    return params

In [None]:
params = params_generator(start_year, end_year, api_key, url_discover)

In [None]:
def read_discover_params_csv(csv_filepath):
    json_list = []
    with open(csv_filepath, 'r', newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            json_list.append(row)
    return json_list

In [None]:
params = read_discover_params_csv(f'data/discover_params_{start_year}_{end_year}.csv')

In [None]:
params[0]

In [None]:
def get_tasks_discover(session, url, params, processed_pages):
    tasks = []
    for i, param in enumerate(params):
        if processed_pages[i]==0:
            tasks.append((i, session.get(url, params=param, timeout = 15)))
    return tasks

In [None]:
processed_pages = {}

for i in range(len(params)):
    processed_pages[i]=0

In [None]:
processed_pages

In [None]:
async def discover_api_call(url, params, processed_pages, max_tries=3):
    results=[]
    exceptions = []
    page_counter = 0
    try_counter = 0
    max_step = 1000
    step = max_step
    while try_counter < max_tries:
        async with aiohttp.ClientSession() as session:
            tasks = get_tasks_discover(session, url, params, processed_pages)
            if not tasks:
                return results, exceptions
            if len(tasks) < max_step:
                step = len(tasks)
            for i in range(0, len(tasks), step): #len(tasks)
                batch = tasks[i:i+step]
                responses = await asyncio.gather(*[t[1] for t in batch], return_exceptions=True)
                #await asyncio.sleep()
                for j, response in enumerate(responses):
                    try:
                        movies_page = await response.json()
                        results.append(movies_page['results'])
                        processed_pages[batch[j][0]] = 1
                        page_counter += 1
                        clear_output(wait=True)
                        print(f'{page_counter} pages out of {len(params)} have been fetched')
                    except:
                        exceptions.append(response)
        try_counter += 1
    return results, exceptions

In [None]:
start = time.monotonic()
discovered_movies, exceptions = await discover_api_call(url_discover, params, processed_pages)
end = time.monotonic()

print(f'Elapsed time: {round(end - start, 2)} seconds')

In [None]:
def get_movie_ids(discovered_movies):
    
    if not os.path.exists('data'):
        os.makedirs('data')
    
    movie_ids = {movie['id']: 0
            for page in discovered_movies
            for movie in page}
    
    with open(f'data/ids_movies_{start_year}-{end_year}.csv', 'w') as f:
        for key in movie_ids.keys():
            f.write("%s,%s\n"%(key,movie_ids[key]))
            
    print(f'{len(movie_ids)} movie IDs have been saved succesfully')

In [None]:
get_movie_ids(discovered_movies)

In [None]:
def csv_to_dict(filepath):
    # create an empty dictionary to store the CSV data
    csv_dict = {}

    # open the CSV file in read mode
    with open(filepath, 'r') as f:

    # create a reader object to read the CSV data
        reader = csv.reader(f)

    # loop through each row in the CSV file
        for row in reader:
            csv_dict[row[0]] = ast.literal_eval(row[1])

    return csv_dict

## Movie details

In [None]:
movie = requests.get("https://api.themoviedb.org/3/movie/337800", params = details_params).json()
movie

In [None]:
url_details = "https://api.themoviedb.org/3/movie/"

details_params = {
                "api_key": "ac6862efab2ddf803567630c9f474ab8",
                "append_to_response": "credits"
}

In [None]:
ids_movies = csv_to_dict(f'data/ids_movies_{start_year}-{end_year}.csv')

In [None]:
def get_tasks_details(session, url_details, details_params, ids_movies):
    tasks = []
    for id_movie, processing_state in ids_movies.items():
        if processing_state==0:
            tasks.append(session.get(url_details + str(id_movie), params=details_params, timeout = 15))
    return tasks

In [None]:
async def details_api_call(url, details_params, ids_movies, output_file, max_tries=3):
    exceptions = []
    movie_counter = 0
    try_counter = 0
    max_step = 1000
    step = max_step
    existing_ids = set()
    if os.path.exists(output_file):
        with open(output_file, "r") as f:
            for line in f:
                movie = json.loads(line)
                existing_ids.add(str(movie['id']))
    with open(output_file, "a") as f:
        while try_counter < max_tries:
            async with aiohttp.ClientSession() as session:
                tasks = get_tasks_details(session, url, details_params, ids_movies)
                if not tasks:
                    return exceptions
                if len(tasks) < max_step:
                    step = len(tasks)
                for i in range(0, len(tasks), step):
                    batch = tasks[i:i+step]
                    responses = await asyncio.gather(*batch, return_exceptions=True)
                    for response in responses:
                        try:
                            movie = await response.json()
                            if movie['id'] and str(movie['id']) not in existing_ids:
                                json.dump(movie, f)
                                f.write('\n')
                                existing_ids.add(str(movie['id']))
                                ids_movies[str(movie['id'])] = 1
                                movie_counter += 1
                                clear_output(wait=True)
                                print(f'{movie_counter} movies out of {len(ids_movies)} have been fetched')
                        except:
                            exceptions.append(response)
            try_counter += 1
    return exceptions


In [None]:
start = time.monotonic()
detailed_movies = await details_api_call(url_details, details_params, ids_movies, output_file = 'data/processed_movies_ids.jsonl')
end = time.monotonic()

print(f'Elapsed time: {round(end - start, 2)} seconds')

## Streaming companies

In [None]:
ids_movies = csv_to_dict(f'data/ids_movies_{start_year}-{end_year}.csv')

In [None]:
str_comps_url = "https://api.themoviedb.org/3/movie/"

str_comps_endpoint = "/watch/providers"

str_comp_params = {
                "api_key": api_key,
                "watch-region": "ES"
}

In [None]:
def get_tasks_str_comps(session, url_str_comp, str_comps_endpoint, str_comp_params, ids_movies):
    tasks = []
    for id_movie, processing_state in ids_movies.items():
        if processing_state==0:
            tasks.append(session.get(str_comps_url + str(id_movie) + str_comps_endpoint, params=str_comp_params, timeout = 15))
    return tasks

In [None]:
async def str_comp_api_call(str_comps_url, str_comps_endpoint, str_comp_params, ids_movies, output_file, max_tries=3):
    exceptions = []
    movie_counter = 0
    try_counter = 0
    max_step = 1000
    step = max_step
    existing_ids = set()
    if os.path.exists(output_file):
        with open(output_file, "r") as f:
            for line in f:
                movie = json.loads(line)
                existing_ids.add(str(movie['id']))
                movie_counter += 1
                ids_movies[str(movie['id'])] = 1
    with open(output_file, "a") as f:
        while try_counter < max_tries:
            async with aiohttp.ClientSession() as session:
                tasks = get_tasks_str_comps(session, str_comps_url, str_comps_endpoint,str_comp_params, ids_movies)
                if not tasks:
                    return exceptions
                if len(tasks) < max_step:
                    step = len(tasks)
                for i in range(0, len(tasks), step):
                    batch = tasks[i:i+step]
                    responses = await asyncio.gather(*batch, return_exceptions=True)
                    for response in responses:
                        try:
                            str_comps = await response.json()
                            if str(str_comps['id']) not in existing_ids:
                                json.dump(str_comps, f)
                                f.write('\n')
                                existing_ids.add(str(str_comps['id']))
                                ids_movies[str(str_comps['id'])] = 1
                                movie_counter += 1
                                clear_output(wait=True)
                                print(f'Watch providers for {movie_counter} movies out of {len(ids_movies)} have been fetched')
                        except:
                            exceptions.append(response)
            try_counter += 1
    return exceptions, movie_counter

In [None]:
start = time.monotonic()
exceptions, movie_counter = await str_comp_api_call(str_comps_url, str_comps_endpoint, str_comp_params, ids_movies, output_file = 'data/str_comps.jsonl', max_tries=3)
end = time.monotonic()

print(f'Elapsed time: {round(end - start, 2)} seconds')

## Table population

In [None]:
def populate_movie(movies):
    insert_query = """
    INSERT INTO movie
    (id_movie, original_title, original_language, overview, popularity, poster_path, release_date, title, vote_average, vote_count, budget, revenue, runtime)
    VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    
    records = [(m['id'], m['original_title'], m['original_language'], m['overview'], m['popularity'], m['poster_path'], m['release_date'], m['title'], m['vote_average'], m['vote_count'], m['budget'], m['revenue'], m['runtime']) for m in movies]
    
    cursor.executemany(insert_query, records)
    
    db.commit()
    
    return cursor.rowcount

In [None]:
def populate_prod_comp(movies):
    insert_query = """
    INSERT IGNORE INTO prod_comp
    (id_prod_comp, name, origin_country)
    VALUES(%s, %s, %s)
    """
    
    prod_comps = [(prod_comp['id'], 
                   prod_comp['name'], 
                   prod_comp['origin_country']) 
                  for movie in movies
                  for prod_comp in movie['production_companies']]
                              
    cursor.executemany(insert_query, prod_comps)
    db.commit()
    
    return cursor.rowcount

In [None]:
def populate_movie_prod_comp(movies):
    insert_query = """
    INSERT INTO movie_prod_comp
    (id_movie, id_prod_comp)
    VALUES(%s, %s)
    """
    
    movie_prod_comp = [(m['id'],
                        prod_comp['id']) 
                       for m in movies 
                       for prod_comp in m['production_companies']]
    
    cursor.executemany(insert_query, movie_prod_comp)
    
    db.commit()
    
    return cursor.rowcount

In [None]:
def populate_movie_genre(movies):
    insert_query = """
    INSERT INTO movie_genre
    (id_movie, id_genre)
    VALUES(%s, %s)
    """
    movie_genre = [(m['id'], 
                     genre['id']) 
                    for m in movies 
                    for genre in m['genres']]
    
    cursor.executemany(insert_query, movie_genre)
    
    db.commit()
    
    return cursor.rowcount

In [None]:
def populate_person(movies, jobs):
    insert_query = """
    INSERT IGNORE INTO person
    (id_person, name, gender)
    VALUES(%s, %s, %s)
    """
    
    actors = [(actor['id'], 
               actor['name'], 
               actor['gender']) 
              for m in movies 
              for actor in m['credits']['cast'][0:7]]
    
    crew = [(crew_mem['id'], 
             crew_mem['name'], 
             crew_mem['gender']) 
            for m in movies 
            for crew_mem in m['credits']['crew']
            if crew_mem['job'] in jobs]
    
    cursor.executemany(insert_query, actors + crew)
    
    db.commit()
    
    return cursor.rowcount

In [None]:
def populate_movie_person(movies, jobs):
    insert_query = """
    INSERT INTO movie_person
    (id_movie, id_person, id_job)
    VALUES(%s, %s, %s)
    """
    
    movies_actors = [(m['id'], 
                      actor['id'], 
                      jobs.index('Actor') + 1)
                     for m in movies 
                     for actor in m['credits']['cast'][0:7]]
    
    movies_crew = [(m['id'], 
                    crew_mem['id'], 
                    jobs.index(crew_mem['job']) + 1) 
                   for m in movies 
                   for crew_mem in m['credits']['crew']
                   if crew_mem['job'] in jobs]
    
    cursor.executemany(insert_query, movies_actors + movies_crew)
    
    db.commit()
    
    return cursor.rowcount

In [None]:
def populate_movie_str_comp(str_comps):
    
    insert_query = """
    INSERT INTO movie_str_comp
    (id_movie, id_str_comp)
    VALUES(%s, %s)
    """
    flatrate_es_comps = []
    
    for str_comp in str_comps:
        id_movie = str_comp.get("id")
        flatrates = str_comp.get("results", {}).get("ES", {}).get("flatrate", [])
        for flatrate in flatrates:
            provider_id = flatrate.get("provider_id")
            flatrate_es_comps.append((id_movie, provider_id))
    
    cursor.executemany(insert_query, flatrate_es_comps)
    
    db.commit()
    
    return cursor.rowcount

In [None]:
jobs = ['Actor', 'Director','Screenplay']

# Define the chunk size
chunk_size = 1000

# Read the JSONL file in chunks
for chunk in pd.read_json('data/processed_movies.jsonl', lines=True, chunksize=chunk_size):
    # Convert the chunk to a list of dictionaries
    movies = chunk.to_dict(orient='records')
    
    populate_movie(movies)
    populate_prod_comp(movies)
    populate_movie_prod_comp(movies)
    populate_movie_genre(movies)
    populate_person(movies, jobs)
    populate_movie_person(movies, jobs)

In [None]:
# Define the chunk size
chunk_size = 1000

for chunk in pd.read_json('data/str_comps.jsonl', lines=True, chunksize=chunk_size):
    # Convert the chunk to a list of dictionaries
    str_comps = chunk.to_dict(orient='records')
    
    populate_movie_str_comp(str_comps)