In [1]:
import re
from typing import Optional

import pandas as pd
import wikipedia
from tqdm.notebook import tqdm

tqdm.pandas()

In [2]:
DATA_PATH = "../data_raw/"

In [3]:
movies_raw: pd.DataFrame = pd.read_parquet(DATA_PATH + "movies.pq")  # movies names and genres
links_raw: pd.DataFrame = pd.read_parquet(DATA_PATH + "links.pq")  # ids from different datasets
tmdb_data: pd.DataFrame = pd.read_parquet(DATA_PATH + "tmdb_data.pq")  # data about 5000 movies from TMDB

## Add TMDB data

In [4]:
movies: pd.DataFrame = movies_raw.merge(links_raw, on="movieId")[["movieId", "tmdbId", "title", "genres"]]
movies = movies.merge(tmdb_data, left_on="tmdbId", right_on="movie_id").drop(columns="movie_id")

In [5]:
# collect genres both from MovieLens and TMDB
movies.loc[:, "genres_x"] = movies.genres_x.apply(lambda x: x.split("|"))
movies.loc[:, "genres_y"] = movies.genres_y.apply(list)
movies["genres"] = (movies.genres_x + movies.genres_y).apply(set).apply(list)
movies.drop(columns=["genres_x", "genres_y"], inplace=True)

In [6]:
# remove useless genres

def remove_no_genres(genres_list: list[str]) -> list[str]:
    if ("(no genres listed)" in genres_list) and len(genres_list) > 1:
        return [genre for genre in genres_list if genre != "(no genres listed)"]
    return genres_list


def remove_foreign(genres_list: list[str]) -> list[str]:
    if "Foreign" in genres_list:
        if len(genres_list) > 1:
            return [genre for genre in genres_list if genre != "Foreign"]
        return ["(no genres listed)"]
    return genres_list


movies.loc[:, "genres"] = movies.genres.apply(remove_no_genres).apply(remove_foreign)

In [None]:
# remove duplicating genres (with different names)

def remove_duplicate_music(genres_list: list[str]) -> list[str]:
    if "Music" in genres_list:
        tmp = [genre for genre in genres_list if genre != "Music"]
        if "Musical" in genres_list:
            return tmp
        return tmp + ["Musical"]
    return genres_list


def remove_duplicate_scifi(genres_list: list[str]) -> list[str]:
    if "Science Fiction" in genres_list:
        tmp = [genre for genre in genres_list if genre != "Science Fiction"]
        if "Sci-Fi" in genres_list:
            return tmp
        return tmp + ["Sci-Fi"]
    return genres_list


movies.loc[:, "genres"] = movies.genres.apply(remove_duplicate_music).apply(remove_duplicate_scifi)

## Add movie plot from Wikipedia

In [35]:
# functions from https://towardsdatascience.com/movielens-1m-deep-dive-part-i-8acfeda1ad4


def get_wikipedia_page_name(movie_title: str) -> str:
    matching_pages: list[str] = wikipedia.search(movie_title)
    if len(matching_pages) == 0:
        return ""
    return matching_pages[0]


def get_movie_plot(page_name: str) -> Optional[str]:
    try:
        try:
            movie_page_content = str(wikipedia.page(page_name, auto_suggest=False).content)
        except wikipedia.DisambiguationError as disamberror:
            for option in disamberror.options:
                if "film" in option:
                    movie_page_content = str(wikipedia.page(option, auto_suggest=False).content)
            return None
    except (wikipedia.PageError, KeyError):
        return None
    re_groups = re.search("Plot ==(.*?)=+ [A-Z]", str(movie_page_content).replace("\n", ""))
    if re_groups:
        return re_groups.group(1)
    return None

In [None]:
movies["wikipedia_page_name"] = movies["title_x"].progress_apply(get_wikipedia_page_name)
movies["movie_plot"] = movies["wikipedia_page_name"].progress_apply(get_movie_plot)
print(f'There are {movies["movie_plot"].isna().sum()} missing movie plots')

In [None]:
# add movie plots
movies["wikipedia_page_name"] = movies.progress_apply(
    lambda row: get_wikipedia_page_name(row["title_y"]) if not row["movie_plot"] else row["wikipedia_page_name"], axis=1
)
movies["movie_plot"] = movies.progress_apply(
    lambda row: get_movie_plot(row["wikipedia_page_name"]) if not row["movie_plot"] else row["movie_plot"], axis=1
)
print(f'There are {movies["movie_plot"].isna().sum()} missing movie plots')

In [None]:
# for some films plot was not found because wrong page was chosen
#  in many cases the page with "(film)" was the needed one

def get_new_wikipedia_page_name(movie_title: str) -> str:
    matching_pages = wikipedia.search(movie_title)
    if len(matching_pages) == 0:
        return ""
    for name in matching_pages:
        if "(film)" in name:
            return name  # type: ignore [no-any-return]
    return matching_pages[0]  # type: ignore [no-any-return]


movies["wikipedia_page_name"] = movies.progress_apply(
    lambda row: get_new_wikipedia_page_name(row["title_y"]) if not row["movie_plot"] else row["wikipedia_page_name"],
    axis=1,
)
movies["movie_plot"] = movies.progress_apply(
    lambda row: get_movie_plot(row["wikipedia_page_name"]) if not row["movie_plot"] else row["movie_plot"], axis=1
)
print(f'There are {movies["movie_plot"].isna().sum()} missing movie plots')

In [None]:
movies = movies.drop(columns=["title_x", "wikipedia_page_name"]).rename(
    columns={
        "title_y": "title",
        "movieId": "movielens_id",
        "tmdbId": "tmdb_id",
        "popularity": "tmdb_popularity_score",
        "vote_average": "tmdb_rating_avg",
        "vote_count": "tmdb_votes_count",
    }
)

## Save data

In [82]:
movies.to_parquet("data/movies_data.pq")