In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName("imdb_query").getOrCreate()

24/10/11 13:02:33 WARN Utils: Your hostname, codespaces-cba268 resolves to a loopback address: 127.0.0.1; using 10.0.1.237 instead (on interface eth0)
24/10/11 13:02:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/11 13:02:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pathlib import Path

imdb_tables = {}
data_folder = Path("./data")

for file in list(data_folder.glob("*.tsv.gz")):
    table_name = "_".join(file.name.split(".")[:2])
    print(table_name)
    imdb_tables[table_name] = (
        spark.read.options(
            **{
                "sep": "\t",
                "header": True,
                "compression": "gzip",
                "nullValue": r"\N",
            }
        ).csv(f"./data/{file.name}")
        # .limit(100)
    )

title_akas
title_principals
title_ratings
title_basics
name_basics
title_episode
title_crew


In [4]:
from transformation import bronze as xform_bronze

In [5]:
bronze_transformation = {
    "name_basics": xform_bronze.name_basics.transformation,
    "title_akas": xform_bronze.title_akas.transformation,
    "title_basics": xform_bronze.title_basics.transformation,
    "title_crew": xform_bronze.title_crew.transformation,
    "title_episode": xform_bronze.title_episode.transformation,
    "title_principals": xform_bronze.title_principals.transformation,
    "title_ratings": xform_bronze.title_ratings.transformation,
}

cleansed_imdb_tables = {
    _table_name: _sdf.transform(bronze_transformation[_table_name])
    for _table_name, _sdf in imdb_tables.items()
}

# You can now access the cleansed table on cleansed_imdb_tables["table_name"]

---

# Task 1

## Retrieve the top 10 movies with a minimum of 500 votes with the ranking determined by

`(numVotes/averageNumberOfVotes)*averageRating`

In [6]:
# Getting a list of movies
title_movie_sdf = (
    cleansed_imdb_tables["title_basics"]
    .filter(F.col("titleType") == "movie")
    .select("tconst")
)

# Getting the value of the average number of votes across all titles including movies and short
average_number_of_voters = (
    cleansed_imdb_tables["title_ratings"]
    .agg(F.avg("numVotes").alias("averageNumberOfVotes"))
    .head()["averageNumberOfVotes"]
)

print(f"{average_number_of_voters = }")

# Ranking Logics is given from the PDF
ranking_logics = (
    F.col("numVotes") * F.col("averageRating") / F.lit(average_number_of_voters)
)
ranked_title_sdf = (
    cleansed_imdb_tables["title_ratings"]
    .join(
        # Filtering the ratings table to movie only
        title_movie_sdf,
        on="tconst",
        how="inner",
    )
    .select(
        "tconst",
        F.col("numVotes"),
        ranking_logics.alias("ranking"),
    )
)

top_10_movies_with_min_500_votes_sdf = (
    ranked_title_sdf.filter(F.col("numVotes") >= 500)
    .orderBy(F.col("ranking").desc())
    .limit(10)
)

top_10_movies_with_min_500_votes_with_title_sdf = (
    top_10_movies_with_min_500_votes_sdf.join(
        cleansed_imdb_tables["title_basics"].select("tconst", "primaryTitle"),
        on="tconst",
        how="left",
    )
)

top_10_movies_with_min_500_votes_with_title_sdf.show(truncate=False)
top_10_movies_with_min_500_votes_with_title_sdf.cache()

average_number_of_voters = 757.0
+------+--------+-------+------------+
|tconst|numVotes|ranking|primaryTitle|
+------+--------+-------+------------+
+------+--------+-------+------------+



DataFrame[tconst: string, numVotes: int, ranking: double, primaryTitle: string]

# Task 2

## For these 10 movies, list the persons who are most often credited

In [7]:
top10_movies_with_credited_person_sdf = (
    top_10_movies_with_min_500_votes_with_title_sdf.select("tconst").join(
        cleansed_imdb_tables["title_principals"].select("tconst", "nconst"),
        on="tconst",
        how="inner",  # Some data is missing in title_principals
    )
)

most_credited_person_for_top10_movies_sdf = (
    top10_movies_with_credited_person_sdf.groupBy("nconst")
    .count()
    .orderBy(F.col("count").desc())
    .limit(1)
)

most_credited_person_for_top10_movies_with_names_sdf = (
    most_credited_person_for_top10_movies_sdf.join(
        cleansed_imdb_tables["name_basics"].select(
            "nconst",
            "primaryName",
        ),
        on="nconst",
        how="left",
    )
)

most_credited_person_for_top10_movies_with_names_sdf.show()

+------+-----+-----------+
|nconst|count|primaryName|
+------+-----+-----------+
+------+-----+-----------+



## For these 10 movies, list the different titles of the 10 movies

In [8]:
top_10_movies_with_aka_sdf = top_10_movies_with_min_500_votes_with_title_sdf.select(
    "tconst", "primaryTitle"
).join(
    cleansed_imdb_tables["title_akas"].select(
        "tconst", "title", "region", "isOriginalTitle"
    ),
    on="tconst",
    how="left",
)
top_10_movies_with_aka_sdf.groupBy("tconst", "primaryTitle").agg(
    F.collect_set("title")
).show(truncate=False, vertical=True, n=10)

(0 rows)



24/10/11 13:02:46 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
