In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, avg
from pyspark.sql.window import Window
from schemas import data_frames

# **Question 1**

> Select top 5 genres that degraded in ratings during 2019-2024



In [None]:
# Filter last 5 years and only valid ratings and genres, group results by startYear and genre
recent = (
    data_frames["title_basics"]
    .filter(col("startYear").isin([2019, 2024]))
    .join(data_frames["title_ratings"], "tconst")
    .filter(
        (col("averageRating").isNotNull()) &
        (col("genres").isNotNull()) &
        (col("numVotes") >= 1000) &
        (col("startYear").cast("int") <= 2025)
        )
    .groupBy("startYear", "genres")
    .agg(avg("averageRating").alias("avgRating"))
  )

# Compare 2019 vs. 2024
genres = (
    recent.groupBy("genres")
    .pivot("startYear", [2019, 2024])
    .agg(avg("avgRating"))
    .withColumnRenamed("2019", "rating_2019")
    .withColumnRenamed("2024", "rating_2024")
    .dropna(subset=["rating_2019", "rating_2024"])
  )

degraded_genres = (
    genres.withColumn("rating_diff", col("rating_2024") - col("rating_2019"))
    .orderBy(col("rating_diff").asc())
    .limit(5)
  )

upgraded_genres = (
    genres.withColumn("rating_diff", col("rating_2024") - col("rating_2019"))
    .orderBy(col("rating_diff").desc())
    .limit(5)
)

degraded_genres_pd = degraded_genres.select("genres", "rating_2019", "rating_2024", "rating_diff").toPandas()
degraded_genres_pd.style \
    .format({
        "rating_diff": "{:.2f}",
        "rating_2019": "{:.2f}",
        "rating_2024": "{:.2f}"
    }) \
    .set_properties(**{'background-color': '#f9f9f9', 'color': 'black'})


Unnamed: 0,genres,rating_2019,rating_2024,rating_diff
0,"Documentary,War",7.5,1.9,-5.6
1,"Drama,Fantasy,Mystery",7.62,4.75,-2.87
2,"Adventure,Biography,Drama",6.57,3.8,-2.77
3,"Animation,Comedy",6.75,4.36,-2.39
4,"Comedy,Fantasy",6.85,4.57,-2.28


In [51]:
upgraded_genres_pd = upgraded_genres.select("genres", "rating_2019", "rating_2024", "rating_diff").limit(20).toPandas()
upgraded_genres_pd.style \
    .format({
        "rating_diff": "{:.2f}",
        "rating_2019": "{:.2f}",
        "rating_2024": "{:.2f}"
    }) \
    .set_properties(**{'background-color': '#f9f9f9', 'color': 'black'})

Unnamed: 0,genres,rating_2019,rating_2024,rating_diff
0,"Comedy,Talk-Show",1.5,7.2,5.7
1,Music,3.7,7.87,4.17
2,"Crime,Horror,Thriller",3.5,7.3,3.8
3,Fantasy,4.1,6.9,2.8
4,"Drama,Family,Fantasy",6.3,9.0,2.7


# **Question 2**
> Analyze the career trajectory of directors by comparing their average movie ratings in their early career (first 5 films)
versus their established period (films after the first 5), for directors with at least 10 films and 15+ years of activity.

In [None]:
director_films = (
    data_frames["title_crew"]
    .join(data_frames["title_ratings"], "tconst")
    .join(data_frames["title_basics"], "tconst")
    .filter(
        col("directors").isNotNull() &
        col("startYear").isNotNull() &
        (col("startYear") > 0) &
        (col("numVotes") >= 1000)
    )
)

director_activity = (
    director_films
    .groupBy("directors")
    .agg(
        F.count("tconst").alias("num_movies"),
        F.max("startYear").alias("max_year"),
        F.min("startYear").alias("min_year")
    )
    .filter(
        (col("num_movies") >= 10) &
        (col("max_year") - col("min_year") >= 15)
    )
)

director_films_filtered = (
    director_films
    .join(director_activity, "directors")
    .filter(
        (col("num_movies") >= 10) &
        (col("max_year") - col("min_year") >= 15)
    )
)

windowSpec = Window.partitionBy("directors").orderBy("startYear")

director_films_with_order = director_films_filtered \
    .withColumn("film_order", F.row_number().over(windowSpec))

early_career = director_films_with_order.filter(col("film_order") <= 5)

windowSpec_total = Window.partitionBy("directors")
total_films = director_films_with_order.withColumn("total_films", F.count("film_order").over(windowSpec_total))
established_period = total_films.filter(col("film_order") > col("total_films") - 5)

# Count average ratings for each period of time
early_career_avg = early_career.groupBy("directors").agg(F.avg("averageRating").alias("early_career_avg"))
established_period_avg = established_period.groupBy("directors").agg(F.avg("averageRating").alias("established_period_avg"))

career_trajectory = (
    early_career_avg
    .join(established_period_avg, "directors")
    .orderBy(F.col("early_career_avg").desc())
)

directors_with_names = data_frames["name_basics"].select("nconst", "primaryName")

career_trajectory_with_names = (
    career_trajectory
    .join(directors_with_names, career_trajectory["directors"] == directors_with_names["nconst"], "left")
    .select(
        directors_with_names["primaryName"].alias("director_name"),
        career_trajectory["early_career_avg"],
        career_trajectory["established_period_avg"]
    )
    .orderBy(F.col("early_career_avg").desc())
)

total_records = career_trajectory_with_names.count()
print(f"Загальна кількість записів у відповіді: {total_records}")

career_trajectory_with_names_pd = career_trajectory_with_names.select("director_name", "early_career_avg", "established_period_avg").limit(20).toPandas()
career_trajectory_with_names_pd.style \
    .format({
        "early_career_avg": "{:.2f}",
        "established_period_avg": "{:.2f}",
    }) \
    .set_properties(**{'background-color': '#f9f9f9', 'color': 'black'})


Загальна кількість записів у відповіді: 1259


Unnamed: 0,director_name,early_career_avg,established_period_avg
0,Alan Ball,8.98,8.08
1,Vince Gilligan,8.9,9.42
2,Jennifer Getzinger,8.8,8.0
3,Ethan Spaulding,8.74,6.42
4,James Roday Rodriguez,8.74,6.64
5,Katsumi Tokoro,8.72,8.16
6,Alex Graves,8.7,8.36
7,R.W. Goodwin,8.68,8.04
8,Hideo Kojima,8.68,8.58
9,Mehran Modiri,8.64,8.2


# **Question 3**


> Identify movies that have the largest gap between critical reception (IMDb rating) and commercial success (number of votes) over the last 10 years, grouped by genre.


In [57]:
recent_movies = (
    data_frames["title_basics"]
    .filter(
        (F.col("startYear") >= 2015) &
        (F.col("startYear") <= 2025) &
        (col("genres") != "\\N")
    )
    .select("tconst", "genres")
)

movie_ratings = data_frames["title_ratings"].select("tconst", "averageRating", "numVotes")

movies_with_ratings = (
    recent_movies
    .join(movie_ratings, "tconst")
    .filter(
        F.col("averageRating").isNotNull() &
        F.col("numVotes").isNotNull()
    )
)

movies_with_ratings = movies_with_ratings.withColumn(
    "rating_vote_gap",
    (F.col("averageRating") - F.log(F.col("numVotes")))
)

genre_gap = (
    movies_with_ratings.groupBy("genres")
    .agg(
        F.max("rating_vote_gap").alias("max_gap"),
        F.avg("averageRating").alias("avg_rating"),
        F.avg("numVotes").alias("avg_votes")
    )
    .orderBy("max_gap", ascending=False)
)

total_records = genre_gap.count()
print(f"Загальна кількість записів у відповіді: {total_records}")

genre_gap_pd = genre_gap.select("genres", "max_gap", "avg_rating", "avg_votes").limit(20).toPandas()
genre_gap_pd.style \
    .format({
        "max_gap": "{:.2f}",
        "avg_rating": "{:.2f}",
        "avg_votes": "{:.2f}"
    }) \
    .set_properties(**{'background-color': '#f9f9f9', 'color': 'black'})

Загальна кількість записів у відповіді: 1653


Unnamed: 0,genres,max_gap,avg_rating,avg_votes
0,"Musical,Short",8.39,7.59,30.2
1,"Animation,Comedy",8.39,7.11,293.33
2,"Action,Comedy",8.39,6.39,1809.92
3,Documentary,8.39,7.31,105.07
4,"Comedy,Game-Show",8.39,7.29,57.74
5,"News,Talk-Show",8.39,6.96,36.42
6,Drama,8.39,7.05,429.87
7,"Action,Animation,Comedy",8.39,7.3,351.96
8,"Documentary,History",8.39,7.36,152.38
9,Reality-TV,8.39,7.11,39.94


# **Question 4**


> Compare average rating of series for each actor/actress with the average rating of films

In [None]:
movies = data_frames["title_basics"].filter(F.col("titleType") == "movie")
series = data_frames["title_basics"].filter(F.col("titleType") == "tvSeries")

movie_ratings = (
    data_frames["title_ratings"].join(movies, "tconst", "inner")
    .select("tconst", "averageRating", "titleType")
)

series_ratings = (
    data_frames["title_ratings"].join(series, "tconst", "inner")
    .select("tconst", "averageRating", "titleType")
)

# Join with title_principals to get actor/actress 'nconst' and their participation in movies/series
movies_with_actors = (
    movie_ratings.join(data_frames["title_principals"], "tconst")
    .join(data_frames["name_basics"], "nconst")
    .select("nconst", "primaryName", "averageRating", "titleType", "tconst")
)

series_with_actors = (
    series_ratings.join(data_frames["title_principals"], "tconst")
    .join(data_frames["name_basics"], "nconst")
    .select("nconst", "primaryName", "averageRating", "titleType", "tconst")
)

# Group by actor/actress and calculate average ratings for movies and series
# For movies
movies_avg_ratings = (
    movies_with_actors.groupBy("primaryName")
    .agg(
        F.avg("averageRating").alias("avg_movie_rating"),
        F.count("tconst").alias("movie_count")
    )
)
# For series
series_avg_ratings = (
    series_with_actors.groupBy("primaryName")
    .agg(
        F.avg("averageRating").alias("avg_series_rating"),
        F.count("tconst").alias("series_count")
    )
)
# Filter for actors/actresses who have at least 5 titles in both movies and series
actors_with_both = (
    movies_avg_ratings.join(series_avg_ratings, "primaryName")
    .filter(
        (F.col("movie_count") >= 5) &
        (F.col("series_count") >= 5) &
        (F.col("primaryName") == "\\N")
    )
)

total_records = actors_with_both.count()
print(f"Загальна кількість записів у відповіді: {total_records}")

actors_comparison_pd = actors_with_both.select("primaryName", "avg_movie_rating", "avg_series_rating").limit(20).toPandas()
actors_comparison_pd.style \
    .format({
        "avg_movie_rating": "{:.2f}",
        "avg_series_rating": "{:.2f}",
    }) \
    .set_properties(**{'background-color': '#f9f9f9', 'color': 'black'})

Загальна кількість записів у відповіді: 26003


Unnamed: 0,primaryName,avg_movie_rating,avg_series_rating
0,'Weird Al' Yankovic,6.33,7.12
1,50 Cent,5.67,6.18
2,A.D. Miles,6.54,7.29
3,A.J. Benza,5.66,6.86
4,A.J. Henderson,5.63,7.22
5,A.J. Johnson,5.45,7.03
6,A.K. Hangal,6.37,7.48
7,AJ Michalka,5.65,7.19
8,Aadar Malik,7.02,5.12
9,Aadukalam Naren,5.82,7.1


# **Question 5**

> Determine how has the number of movies and TV series produced each year changed over time

In [37]:
title_basics = (
    data_frames["title_basics"]
    .filter(
        (F.col("startYear").isNotNull()) &
        (F.col("startYear") < 2025)
    )
)

title_basics_with_merged_types_df = title_basics.withColumn(
    "merged_title_type",
    F.when(F.col("titleType") == "tvMovie", "movie")
     .when(F.col("titleType") == "tvSeries", "tvMiniSeries")
     .otherwise(F.col("titleType"))
)

yearly_type_counts_df = (
    title_basics_with_merged_types_df
    .groupBy("startYear", "merged_title_type")
    .agg(F.count("*").alias("count"))
)

pivot_df = (
    yearly_type_counts_df
    .groupBy("startYear")
    .pivot("merged_title_type", ["movie", "tvMiniSeries"])
    .agg(F.first("count"))
)
pivot_df = pivot_df.dropna().orderBy("startYear")

renamed_df = (
    pivot_df
    .withColumnRenamed("movie", "movie_count")
    .withColumnRenamed("tvMiniSeries", "series_count")
)

window_spec = Window.orderBy("startYear")
lagged_df = (
    renamed_df
    .withColumn("prev_movie_count", F.lag("movie_count", 1, 0).over(window_spec))
    .withColumn("prev_series_count", F.lag("series_count", 1, 0).over(window_spec))
)

movies_year_df = (
    lagged_df
    .withColumn("movie_change", F.col("movie_count") - F.col("prev_movie_count"))
    .withColumn("series_change", F.col("series_count") - F.col("prev_series_count"))
    .orderBy(F.col("startYear").desc())
    .select("startYear", "movie_count", "series_count", "movie_change", "series_change")
)

movies_year_df_pd = movies_year_df.limit(20).toPandas()

total_records = movies_year_df.count()
print(f"Загальна кількість записів у відповіді: {total_records}")

movies_year_df_pd.style.set_properties(**{'background-color': '#f9f9f9', 'color': 'black'})


Загальна кількість записів у відповіді: 98


Unnamed: 0,startYear,movie_count,series_count,movie_change,series_change
0,2024,21447,11134,-1089,-1381
1,2023,22536,12515,-520,-1686
2,2022,23056,14201,1324,-1841
3,2021,21732,16042,2445,-598
4,2020,19287,16640,-3357,1612
5,2019,22644,15028,-210,-235
6,2018,22854,15263,39,163
7,2017,22815,15100,228,759
8,2016,22587,14341,1352,615
9,2015,21235,13726,931,1291


# **Question 6**


> Find directors who have at least 10 films, and at least one film has a rating at least 2 points higher than the average rating of all their other works.


In [12]:
# Filter directors with at least 10 films
director_films = (
    data_frames["title_crew"]
    .withColumn("director", F.explode(F.split(col("directors"), ",")))
    .join(data_frames["title_ratings"], "tconst")
    .join(data_frames["title_basics"], "tconst")
    .filter(col("director").isNotNull())
)

director_activity = (
    director_films
    .groupBy("directors")
    .agg(
        F.count("tconst").alias("num_movies"),
        F.avg("averageRating").alias("career_avg_rating")
    )
    .filter(
        (F.col("num_movies") >= 10) &
        (col("directors") != "\\N")
    )
)

# Find the film with highest rating for each director and calculate the difference
director_films_with_ratings = director_films.join(director_activity, "directors")
director_films_with_ratings = director_films_with_ratings.withColumn(
    "rating_diff",
    F.col("averageRating") - F.col("career_avg_rating")
)

director_rating_diff = (
    director_films_with_ratings
    .groupBy("directors")
    .agg(
        F.max("averageRating").alias("max_rating"),
        F.avg("averageRating").alias("career_avg_rating"),
        F.count("tconst").alias("film_count")
    )
    .withColumn("rating_diff", F.col("max_rating") - F.col("career_avg_rating"))
    .filter(F.col("rating_diff") >= 2)
    .orderBy(col("rating_diff").desc())
)

# Join with directors' names
directors_with_names = data_frames["name_basics"].select("nconst", "primaryName")

director_films_with_ratings = (
    director_rating_diff
    .join(directors_with_names, director_films_with_ratings["directors"] == directors_with_names["nconst"], "left")
    .select(
        directors_with_names["primaryName"].alias("director_name"),
        "rating_diff",
        "career_avg_rating",
        "max_rating"
    )
)
director_rating_diff_pd = director_films_with_ratings.limit(20).toPandas()

total_records = director_films_with_ratings.count()
print(f"Загальна кількість записів у відповіді: {total_records}")

director_rating_diff_pd.style \
    .format({
        "rating_diff": "{:.2f}",
        "career_avg_rating": "{:.2f}",
        "max_rating": "{:.2f}"
    }) \
    .set_properties(**{'background-color': '#f9f9f9', 'color': 'black'})


Загальна кількість записів у відповіді: 3998


Unnamed: 0,director_name,rating_diff,career_avg_rating,max_rating
0,Roger Corman,2.12,5.38,7.5
1,Danny DeVito,2.12,6.78,8.9
2,Robert Duncan McNeill,2.02,7.78,9.8
3,Leonard Nimoy,2.04,6.06,8.1
4,Michael Bay,2.65,6.55,9.2
5,Kathryn Bigelow,2.33,6.87,9.2
6,John Frankenheimer,2.44,6.86,9.3
7,Sydney Pollack,2.07,7.23,9.3
8,Joel Schumacher,2.16,6.34,8.5
9,Corey Allen,2.48,7.12,9.6


# **Save data**

In [None]:

degraded_genres.write.option("header", "true").csv("data/results/degraded_genres.csv")
upgraded_genres.write.option("header", "true").csv("data/results/upgraded_genres.csv")
career_trajectory_with_names.write.option("header", "true").csv("data/results/diresctors_career_trajectory.csv")
genre_gap.write.option("header", "true").csv("data/results//genre_gap.csv")
actors_with_both.write.option("header", "true").csv("data/results/actors_with_series_and_movies.csv")
movies_year_df.write.option("header", "true").csv("data/results/series_and_movies_per_year.csv")
director_films_with_ratings.write.option("header", "true").csv("data/results/director_films_with_ratings.csv")
