In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as f
import pyspark.sql.types as t

In [2]:
spark_session = (SparkSession.builder
                .master('local')
                .appName('Big Data Project')
                .config(conf=SparkConf())
                .getOrCreate())

### Define the schemas

In [3]:
name_basics_schema = t.StructType([
    t.StructField("nconst", t.StringType(), nullable=False),
    t.StructField("primaryName", t.StringType(), nullable=False),
    t.StructField("birthYear", t.IntegerType(), nullable=True),
    t.StructField("deathYear", t.IntegerType(), nullable=True),
    t.StructField("primaryProfession", t.StringType(), nullable=True),
    t.StructField("knownForTitles", t.StringType(), nullable=True)
])

In [157]:
# Define the schema for the 'title.basics.tsv' table
title_basics_schema = t.StructType([
    t.StructField("tconst", t.StringType(), nullable=False),
    t.StructField("titleType", t.StringType(), nullable=True),
    t.StructField("primaryTitle", t.StringType(), nullable=True),
    t.StructField("originalTitle", t.StringType(), nullable=True),
    t.StructField("isAdult", t.IntegerType(), nullable=True),
    t.StructField("startYear", t.IntegerType(), nullable=True),
    t.StructField("endYear", t.IntegerType(), nullable=True),
    t.StructField("runtimeMinutes", t.IntegerType(), nullable=True),
    t.StructField("genres", t.StringType(), nullable=True)
])


In [5]:
# Define the schema for the 'title.akas.tsv' table
title_akas_schema = t.StructType([
    t.StructField("titleId", t.StringType(), nullable=False),
    t.StructField("ordering", t.IntegerType(), nullable=True),
    t.StructField("title", t.StringType(), nullable=True),
    t.StructField("region", t.StringType(), nullable=True),
    t.StructField("language",t.StringType(), nullable=True),
    t.StructField("types", t.StringType(), nullable=True),
    t.StructField("attributes", t.StringType(), nullable=True),
    t.StructField("isOriginalTitle", t.IntegerType(), nullable=True)
])

In [6]:
# Define the schema for the 'title.crew.tsv' table
title_crew_schema = t.StructType([
    t.StructField("tconst", t.StringType(), nullable=False),
    t.StructField("directors", t.StringType(), nullable=True),
    t.StructField("writers", t.StringType(), nullable=True)
])

In [7]:
# Define the schema for the 'title.principals.tsv' table
title_principals_schema = t.StructType([
    t.StructField("tconst", t.StringType(), nullable=False),
    t.StructField("ordering", t.IntegerType(), nullable=True),
    t.StructField("nconst", t.StringType(), nullable=True),
    t.StructField("category", t.StringType(), nullable=True),
    t.StructField("job", t.StringType(), nullable=True),
    t.StructField("characters", t.StringType(), nullable=True)
])

In [8]:
# Define the schema for the 'title.ratings.tsv' table
title_ratings_schema = t.StructType([
    t.StructField("tconst", t.StringType(), nullable=False),
    t.StructField("averageRating", t.FloatType(), nullable=True),
    t.StructField("numVotes", t.IntegerType(), nullable=True)
])

In [9]:
# Define the schema for the 'title.episode.tsv' table
title_episode_schema = t.StructType([
    t.StructField("tconst", t.StringType(), nullable=False),
    t.StructField("parentTconst", t.StringType(), nullable=True),
    t.StructField("seasonNumber", t.IntegerType(), nullable=True),
    t.StructField("episodeNumber", t.IntegerType(), nullable=True)
])

### Read the datasets

In [158]:
name_basics_df = spark_session.read.csv("imdb/name.basics.tsv", sep=r'\t', header=True, schema=name_basics_schema)
title_basics_df = spark_session.read.csv("imdb/title.basics.tsv", sep=r'\t', header=True, schema=title_basics_schema)
title_akas_df = spark_session.read.csv("imdb/title.akas.tsv", sep=r'\t', header=True, schema=title_akas_schema)
title_crew_df = spark_session.read.csv("imdb/title.crew.tsv", sep=r'\t', header=True, schema=title_crew_schema)
title_principals_df = spark_session.read.csv("imdb/title.principals.tsv", sep=r'\t', header=True, schema=title_principals_schema)
title_ratings_df = spark_session.read.csv("imdb/title.ratings.tsv", sep=r'\t', header=True, schema=title_ratings_schema)
title_episode_df = spark_session.read.csv("imdb/title.episode.tsv", sep=r'\t', header=True, schema=title_episode_schema)

In [159]:
dfs = [name_basics_df, title_basics_df, title_akas_df, title_crew_df, title_principals_df, title_ratings_df, title_episode_df]
df_names = ["name.basics", "title.basics.tsv", "title.akas.tsv", "title.crew.tsv", "title.principals.tsv", "title.ratings.tsv", "title.episode.tsv"]

In [174]:
for df, name in zip(dfs, df_names):
    print(name, ":", df.columns)
    df.show()

name.basics : ['nconst', 'primaryName', 'birthYear', 'deathYear', 'Profession', 'knownForTitle']
+---------+-------------+---------+---------+-------------+-------------+
|   nconst|  primaryName|birthYear|deathYear|   Profession|knownForTitle|
+---------+-------------+---------+---------+-------------+-------------+
|nm0000001| Fred Astaire|     1899|     1987|   soundtrack|    tt0072308|
|nm0000001| Fred Astaire|     1899|     1987|   soundtrack|    tt0031983|
|nm0000001| Fred Astaire|     1899|     1987|   soundtrack|    tt0053137|
|nm0000001| Fred Astaire|     1899|     1987|   soundtrack|    tt0050419|
|nm0000001| Fred Astaire|     1899|     1987|        actor|    tt0072308|
|nm0000001| Fred Astaire|     1899|     1987|        actor|    tt0031983|
|nm0000001| Fred Astaire|     1899|     1987|        actor|    tt0053137|
|nm0000001| Fred Astaire|     1899|     1987|        actor|    tt0050419|
|nm0000001| Fred Astaire|     1899|     1987|miscellaneous|    tt0072308|
|nm0000001| Fre

In [161]:
for df, name in zip(dfs, df_names):
    print(name, df.count())

name.basics 12727713
title.basics.tsv 10044533
title.akas.tsv 36761777
title.crew.tsv 10044533
title.principals.tsv 57431166
title.ratings.tsv 1334225
title.episode.tsv 7643524


### Data Preprocessing

#### Explode columns with multiple values

In [162]:
def explode_and_drop(df, old_col, new_col):
    df = df.withColumn(old_col, f.split(f.col(old_col), ","))
    df = df.select("*", f.explode(f.col(old_col)).alias(new_col))
    df = df.drop(old_col)
    return df

In [163]:
name_basics_df = explode_and_drop(name_basics_df, "primaryProfession", "Profession")

In [164]:
name_basics_df = explode_and_drop(name_basics_df, "knownForTitles", "knownForTitle")

In [165]:
title_basics_df = explode_and_drop(title_basics_df, "genres", "genre")

In [166]:
title_crew_df = explode_and_drop(title_crew_df, "directors", "director")

#### Drop useless columns

In [167]:
title_basics_df = title_basics_df.drop("endYear")

In [168]:
title_akas_df = title_akas_df.drop("language", "isOriginalTitle")

In [169]:
title_crew_df = title_crew_df.drop("writers")

In [170]:
title_principals_df = title_principals_df.drop("characters", "job")

In [171]:
dfs = [name_basics_df, title_basics_df, title_akas_df, title_crew_df, title_principals_df, title_ratings_df, title_episode_df]


### Business Questions

#### Q1: Which directors are associated with the highest number of titles (movies/TV shows) in the database?

In [None]:
directors_with_num_of_firected_films_df = title_crew_df.groupBy('director').agg(f.count("tconst").alias("Titles")).orderBy("Titles", ascending=False)
directors_with_num_of_firected_films_df.show(6)

#### Q2: Highest ranked films where there are more than 2 million votes Films directed by the director of the best rated film

In [25]:
highest_ranked_films_df_filtered = title_ratings_df.filter(title_ratings_df.numVotes > 2000000).orderBy("averageRating", ascending=False)

In [None]:
highest_ranked_films_df_filtered.show()

In [None]:
highest_ranked_films_df_joined = highest_ranked_films_df_filtered.join(title_basics_df.dropDuplicates((['tconst'])), on='tconst', how='inner')
highest_ranked_films_df_joined.show()

#### Q3: Which TV special for adults has the longest duration

In [None]:
tv_special_df = title_basics_df.filter((title_basics_df.titleType == "tvSpecial") & f.col("runtimeMinutes").isNotNull())
tv_special_adults_df = tv_special_df.where(f.col("isAdult") == 1).dropDuplicates(["tconst"]).orderBy("runtimeMinutes", ascending=False)
tv_special_adults_df.show()

#### Q4: What are the most popular genres for every content type (titleType)

In [96]:
# First, calculate the count of each genre within its corresponding titleType partition
content_genres_window = Window.partitionBy("titleType", "genre").orderBy("titleType")
content_genres_df = title_basics_df.withColumn("Count", f.count(f.col("genre")).over(content_genres_window)).select("titleType", "genre", "Count").distinct()
content_genres_df.show()

+---------+-----------+------+
|titleType|      genre| Count|
+---------+-----------+------+
|    movie|     Action| 54439|
|    movie|      Adult|  9215|
|    movie|  Adventure| 28445|
|    movie|  Animation|  9232|
|    movie|  Biography| 17416|
|    movie|     Comedy|110022|
|    movie|      Crime| 37602|
|    movie|Documentary|124083|
|    movie|      Drama|236527|
|    movie|     Family| 17805|
|    movie|    Fantasy| 15510|
|    movie|  Film-Noir|   882|
|    movie|  Game-Show|    25|
|    movie|    History| 14434|
|    movie|     Horror| 37622|
|    movie|      Music| 13781|
|    movie|    Musical| 10297|
|    movie|    Mystery| 17279|
|    movie|       News|  1449|
|    movie| Reality-TV|   523|
+---------+-----------+------+
only showing top 20 rows



In [97]:
# Then, find the genre with the highest count for each titleType
genre_rank_window = Window.partitionBy("titleType").orderBy(f.col("Count").desc())
content_genres_df_with_rank = content_genres_df.withColumn("Rank", f.max(f.col("Count")).over(genre_rank_window))
content_genres_max_df = content_genres_df_with_rank.where(f.col("Count") == f.col("Rank")).drop("Rank")
content_genres_max_df.show()

+------------+-----------+-------+
|   titleType|      genre|  Count|
+------------+-----------+-------+
|       movie|      Drama| 236527|
|       short|      Short| 887786|
|   tvEpisode|      Drama|2226176|
|tvMiniSeries|      Drama|  11733|
|     tvMovie|Documentary|  45742|
|     tvPilot|         \N|      1|
|    tvSeries|     Comedy|  57258|
|     tvShort|      Short|   9245|
|   tvSpecial|      Music|  11277|
|       video|      Short| 116017|
|   videoGame|     Action|  14571|
+------------+-----------+-------+



#### Q5: How many episodes are there on average per TV show?

In [181]:
episodes_df = title_episode_df.where((f.col("seasonNumber").isNotNull()) & (f.col("episodeNumber").isNotNull()))

average_episodes_joined = (
    episodes_df
    .groupBy("tconst")
    .agg(f.max("episodeNumber").alias("episodes_quantity"))
)

average_episodes_joined = (
    episodes_df
    .groupBy("tconst")
    .agg(
        f.avg("episodeNumber").alias("average_episodes"),
        f.sum("episodeNumber").alias("sum_episodes")
    )
)
average_episodes_joined.show()

+---------+----------------+------------+
|   tconst|average_episodes|sum_episodes|
+---------+----------------+------------+
|tt0041951|             9.0|           9|
|tt0042816|            17.0|          17|
|tt0043426|            42.0|          42|
|tt0043631|            16.0|          16|
|tt0043693|             8.0|           8|
|tt0043710|             3.0|           3|
|tt0044093|             6.0|           6|
|tt0044668|            16.0|          16|
|tt0044901|            46.0|          46|
|tt0045960|             3.0|           3|
|tt0046135|             5.0|           5|
|tt0046855|             4.0|           4|
|tt0046864|            20.0|          20|
|tt0047852|            15.0|          15|
|tt0047858|             9.0|           9|
|tt0048302|             6.0|           6|
|tt0048371|            11.0|          11|
|tt0049585|             5.0|           5|
|tt0049669|             2.0|           2|
|tt0050031|             4.0|           4|
+---------+----------------+------

In [180]:
episodes_df.join(
    title_basics_df,
    on="tconst",
    how="inner"
).orderBy(f.col("episodeNumber").desc()).show()

+----------+------------+------------+-------------+---------+--------------------+--------------------+-------+---------+--------------+----------+
|    tconst|parentTconst|seasonNumber|episodeNumber|titleType|        primaryTitle|       originalTitle|isAdult|startYear|runtimeMinutes|     genre|
+----------+------------+------------+-------------+---------+--------------------+--------------------+-------+---------+--------------+----------+
|tt28396918|   tt0068069|           1|        97251|tvEpisode|    Episode #1.97251|    Episode #1.97251|      0|     2023|          null|     Drama|
|tt28396918|   tt0068069|           1|        97251|tvEpisode|    Episode #1.97251|    Episode #1.97251|      0|     2023|          null|   Romance|
| tt2093578|   tt0417331|          11|        91334|tvEpisode|Quick Fix: Denim ...|Quick Fix: Denim ...|      0|     2011|          null|Reality-TV|
| tt2093577|   tt0417331|          11|        72615|tvEpisode|HDIL Quick Fix: M...|HDIL Quick Fix: M...|  

#### !!!!!!!!Q6: Which actors or actresses have appeared in the most episodes (and their average rating)?

In [65]:
# Group by nconst and primaryName, and count the occurrences of each actor/actress in the knownForTitle column
actor_episode_counts_df = ( 
        name_basics_df
        .groupBy("nconst", "primaryName", "knownForTitle")
        .agg(f.count("knownForTitle").alias("episodes"))
)

# Order the results in descending order based on episode_count
sorted_actor_episode_counts_df = actor_episode_counts_df.orderBy(f.desc("episodes"))

In [83]:

# Join the sorted_actor_episode_counts_df with title.ratings_df to get the average rating for each actor/actress
result_df = sorted_actor_episode_counts_df.join(
    title_ratings_df,
    f.expr("knownForTitle = tconst"),
    how="inner"
)

# Calculate the sum of averageRating and episodes for each actor/actress
result_df = result_df.groupBy("nconst", "primaryName").agg(
    f.sum("averageRating").alias("total_rating"),
    f.sum("episodes").alias("total_episodes")
)

# Calculate the average rating for each actor/actress
result_df = result_df.withColumn("average_rating", f.col("total_rating") / f.col("total_episodes"))

result_df = result_df.select("primaryName", "average_rating", "total_episodes").show()


+--------------------+------------------+--------------+
|         primaryName|    average_rating|total_episodes|
+--------------------+------------------+--------------+
|Manuel Tamayo y Baus| 5.975000023841858|             4|
|       Arline Pretty| 5.200000047683716|             2|
|       Charles Dewey| 5.699999809265137|             1|
|   Kathleen O'Connor| 5.450000047683716|             2|
|          Ali Hubert|2.3749999602635703|            12|
|       Allan Simpson| 6.699999968210856|             3|
|       André Barsacq|              2.25|             6|
|     Captain Kearney| 5.599999904632568|             1|
|         Owen Wister| 3.087499976158142|             8|
|      Lorraine Aalbu|               6.0|             1|
|         Frank Rowan| 3.616666634877523|             6|
|      Anchise Brizzi|5.8999998569488525|             4|
|      Garland Weaver| 3.162499964237213|             8|
|       Lorraine Rugg| 5.699999809265137|             1|
|        Lyda Roberti| 3.037499

#### Q7: Actors that lived the most

In [151]:
current_year = f.year(f.current_date())
filtered_ages_df = name_basics_df.filter(name_basics_df.deathYear.isNotNull() & name_basics_df.birthYear.isNotNull())

# Calculate the age
age_df = filtered_ages_df.withColumn('age',f.col("deathYear") - f.col("birthYear"))

# Filter for actors and drop duplicates
actors_df = age_df.filter(f.col("Profession") == "actor").dropDuplicates(["nconst"])

# Order by age in descending order
result = actors_df.orderBy(f.col("age").desc())

result.show()

+---------+--------------------+---------+---------+----------+-------------+---+
|   nconst|         primaryName|birthYear|deathYear|Profession|knownForTitle|age|
+---------+--------------------+---------+---------+----------+-------------+---+
|nm0406304|    Giuseppe Ianigro|     1898|     2008|     actor|    tt0055753|110|
|nm1158663|        Nino Cochise|     1874|     1984|     actor|    tt0042171|110|
|nm0077980|     Octave Berthier|     1875|     1984|     actor|    tt0216961|109|
|nm0191102|       Hugues Cuénod|     1902|     2010|     actor|   tt13737592|108|
|nm0373535|   Johannes Heesters|     1903|     2011|     actor|    tt0027351|108|
|nm0516093|        Norman Lloyd|     1914|     2021|     actor|    tt0097165|107|
|nm7922088|Pappukutty Bhagav...|     1913|     2020|     actor|    tt0254797|107|
|nm0704527|         Milton Quon|     1913|     2019|     actor|    tt0106064|106|
|nm0350778|       Pierre Gérald|     1906|     2012|     actor|    tt0409184|106|
|nm0531042|     

#### Q8: Actors that worked in the film industry the most

In [176]:
# Join actors and movies on movie_id to get actor and movie details
joined_df = name_basics_df.join(title_basics_df,
    f.expr("knownForTitle = tconst"),
    how="inner"
)

# Group by actor_id and calculate earliest and oldest movie years
actor_year_window = Window.partitionBy("nconst")
filtered_years_df = joined_df.filter(joined_df.startYear.isNotNull())
start_end_years_df = (
                    filtered_years_df
                    .withColumn("earliest_movie_year", f.min("startYear").over(actor_year_window)) 
                    .withColumn("oldest_movie_year", f.max("startYear").over(actor_year_window)) 
                    .select("nconst", "earliest_movie_year", "oldest_movie_year").distinct())

years_worked_df = start_end_years_df.withColumn("years_worked", f.col("oldest_movie_year") - f.col("earliest_movie_year"))
years_worked_df.orderBy(f.col("years_worked").desc()).show()

+----------+-------------------+-----------------+------------+
|    nconst|earliest_movie_year|oldest_movie_year|years_worked|
+----------+-------------------+-----------------+------------+
| nm1485676|               1895|             2017|         122|
|nm12182942|               1900|             2020|         120|
| nm0703075|               1899|             2019|         120|
| nm0923185|               1897|             2015|         118|
| nm3777335|               1902|             2020|         118|
|nm13890136|               1905|             2023|         118|
| nm0502124|               1900|             2016|         116|
| nm2251467|               1904|             2020|         116|
| nm8577610|               1903|             2018|         115|
| nm0685283|               1909|             2023|         114|
| nm0122813|               1898|             2011|         113|
| nm4537461|               1908|             2021|         113|
| nm1375863|               1911|        

In [178]:
# Making sure dataset is tripping(not us)
joined_df = name_basics_df.join(title_basics_df,
    f.expr("knownForTitle = tconst"),
    how="inner"
)
joined_df.where(f.col("nconst")=="nm1485676").show()

+---------+------------+---------+---------+----------+-------------+----------+---------+--------------------+--------------------+-------+---------+--------------+-----------+
|   nconst| primaryName|birthYear|deathYear|Profession|knownForTitle|    tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|runtimeMinutes|      genre|
+---------+------------+---------+---------+----------+-------------+----------+---------+--------------------+--------------------+-------+---------+--------------+-----------+
|nm1485676|Princess Ali|     null|     null|   actress|    tt0229665| tt0229665|    short|        Princess Ali|        Princess Ali|      0|     1895|             1|      Short|
|nm1485676|Princess Ali|     null|     null|   actress|   tt11327742|tt11327742|    short|The Universe Mult...|The Universe Mult...|      0|     2017|          null|Documentary|
|nm1485676|Princess Ali|     null|     null|   actress|   tt11327742|tt11327742|    short|The Universe Mult...

#### Q9: What is the average rating of movies/tvShows/shorts in the database?

In [93]:
window_spec = Window.partitionBy("titleType")

titles_ratings_joined_df = title_basics_df.join(title_ratings_df, "tconst")

ratings_df_with_avg = titles_ratings_joined_df.withColumn("AverageRatingByType", f.avg("averageRating").over(window_spec))

ratings_df_with_avg.select("titleType", "AverageRatingByType").distinct().show()

+------------+-------------------+
|   titleType|AverageRatingByType|
+------------+-------------------+
|       movie|   6.13661492364121|
|       short|   6.87268124400951|
|   tvEpisode|  7.397082662738441|
|tvMiniSeries|  7.149319747241659|
|     tvMovie|  6.547916374691202|
|    tvSeries| 6.9275777417030024|
|     tvShort|  6.878082188289075|
|   tvSpecial|  6.864447639269466|
|       video|  6.561980644607639|
|   videoGame|  6.952307144809957|
+------------+-------------------+

