In [166]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySparkDebug").master("local[*]").config("spark.driver.memory", "4g").config("spark.executor.memory", "4g").config("spark.python.worker.memory", "2g").getOrCreate()

In [167]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType

schema = StructType([
    StructField("adult", BooleanType(), True),
    StructField("belongs_to_collection", StringType(), True),
    StructField("budget", IntegerType(), True),
    StructField("genres", StringType(), True),
    StructField("homepage", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("imdb_id", StringType(), True),
    StructField("original_language", StringType(), True),
    StructField("original_title", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("poster_path", StringType(), True),
    StructField("production_companies", StringType(), True),
    StructField("production_countries", StringType(), True),
    StructField("release_date", StringType(), True),
    StructField("revenue", IntegerType(), True),
    StructField("runtime", DoubleType(), True),
    StructField("spoken_languages", StringType(), True),
    StructField("status", StringType(), True),
    StructField("tagline", StringType(), True),
    StructField("title", StringType(), True),
    StructField("video", BooleanType(), True),
    StructField("vote_average", DoubleType(), True),
    StructField("vote_count", IntegerType(), True)
])

In [168]:
file_path = "TheMoviesDataset\\movies_metadata.csv"
movies_df = spark.read.csv(file_path, header=True, schema=schema)

movies_df.show(5)

+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|belongs_to_collection|  budget|              genres|            homepage|   id|  imdb_id|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|  revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+-----------

In [169]:
from pyspark.sql.functions import from_json, col, explode
from pyspark.sql.types import ArrayType, StructType, StructField, StringType

genres_schema = ArrayType(StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
]))

# Parse the genres column
movies_df = movies_df.withColumn("genres_parsed", from_json(col("genres"), genres_schema))

movies_df = movies_df.filter(col("genres_parsed").isNotNull() & col("vote_average").isNotNull())

movies_df.select("genres", "genres_parsed").show(5, truncate=False)


+-------------------------------------------------------------------------------------------------+-------------------------------------------------+
|genres                                                                                           |genres_parsed                                    |
+-------------------------------------------------------------------------------------------------+-------------------------------------------------+
|[{'id': 16, 'name': 'Animation'}, {'id': 35, 'name': 'Comedy'}, {'id': 10751, 'name': 'Family'}] |[{16, Animation}, {35, Comedy}, {10751, Family}] |
|[{'id': 12, 'name': 'Adventure'}, {'id': 14, 'name': 'Fantasy'}, {'id': 10751, 'name': 'Family'}]|[{12, Adventure}, {14, Fantasy}, {10751, Family}]|
|[{'id': 10749, 'name': 'Romance'}, {'id': 35, 'name': 'Comedy'}]                                 |[{10749, Romance}, {35, Comedy}]                 |
|[{'id': 35, 'name': 'Comedy'}, {'id': 18, 'name': 'Drama'}, {'id': 10749, 'name': 'Romance'}]    |[

In [170]:
movies_df.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: integer (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: boolean (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: integ

In [171]:
movies_rdd = movies_df.rdd
filtered_rdd = movies_rdd.filter(
    lambda row: row["vote_average"] is not None and 0 <= row["vote_average"] <= 10
)

In [172]:
mapped_rdd = filtered_rdd.flatMap(
    lambda row: [
        (genre["name"], (row["vote_average"], 1))
        for genre in row["genres_parsed"] or []
        if genre and genre["name"]
    ]
)

reduced_rdd = mapped_rdd.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

average_ratings_rdd = reduced_rdd.mapValues(
    lambda x: x[0] / x[1]
)

results = average_ratings_rdd.collect()

for genre, avg_rating in sorted(results, key=lambda x: -x[1]):
    print(f"Genre: {genre}, Average Rating: {avg_rating:.2f}")


Genre: Animation, Average Rating: 6.27
Genre: History, Average Rating: 6.08
Genre: War, Average Rating: 6.00
Genre: Drama, Average Rating: 5.88
Genre: Crime, Average Rating: 5.85
Genre: Music, Average Rating: 5.83
Genre: Mystery, Average Rating: 5.80
Genre: Romance, Average Rating: 5.79
Genre: Documentary, Average Rating: 5.79
Genre: Fantasy, Average Rating: 5.76
Genre: Family, Average Rating: 5.74
Genre: Foreign, Average Rating: 5.70
Genre: Comedy, Average Rating: 5.69
Genre: Adventure, Average Rating: 5.65
Genre: Thriller, Average Rating: 5.59
Genre: Action, Average Rating: 5.55
Genre: Science Fiction, Average Rating: 5.30
Genre: Horror, Average Rating: 5.23
Genre: TV Movie, Average Rating: 5.21
Genre: Western, Average Rating: 5.15


In [173]:
movies_rdd = movies_df.rdd

genre_counts_rdd = movies_rdd.flatMap(
    lambda row: [
        (genre["name"], 1)
        for genre in row["genres_parsed"] or []
        if genre and genre["name"]
    ]
)

genre_counts = genre_counts_rdd.reduceByKey(lambda a, b: a + b)

results = genre_counts.collect()

for genre, count in sorted(results, key=lambda x: -x[1]):
    print(f"Genre: {genre}, Movie Count: {count}")


Genre: Drama, Movie Count: 18802
Genre: Comedy, Movie Count: 12031
Genre: Thriller, Movie Count: 7172
Genre: Romance, Movie Count: 6262
Genre: Action, Movie Count: 6095
Genre: Horror, Movie Count: 4396
Genre: Crime, Movie Count: 4010
Genre: Documentary, Movie Count: 3526
Genre: Adventure, Movie Count: 3219
Genre: Science Fiction, Movie Count: 2792
Genre: Family, Movie Count: 2527
Genre: Mystery, Movie Count: 2313
Genre: Fantasy, Movie Count: 2124
Genre: Animation, Movie Count: 1736
Genre: Foreign, Movie Count: 1508
Genre: Music, Movie Count: 1405
Genre: History, Movie Count: 1269
Genre: War, Movie Count: 1200
Genre: Western, Movie Count: 961
Genre: TV Movie, Movie Count: 705


In [174]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col

credits_schema = StructType([
    StructField("cast", StringType(), True),
    StructField("crew", StringType(), True),
    StructField("id", StringType(), True)
])

credits_path = "TheMoviesDataset\\credits.csv"

credits_df = spark.read.csv(credits_path, header=True, schema=credits_schema)

movies_df = movies_df.withColumn("id", col("id").cast("string"))

movies_df_trimmed = movies_df.select("id", "title", "vote_average", "popularity")
credits_df_trimmed = credits_df.select("id", "cast")

metadata_join_credits_df = movies_df_trimmed.join(credits_df_trimmed, on="id", how="inner")

metadata_join_credits_df.select("id", "title", "cast").show(5, truncate=False)


+-----+---------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [175]:
metadata_join_credits_rdd = metadata_join_credits_df.rdd

import ast

cast_stats_rdd = metadata_join_credits_df.rdd.flatMap(
    lambda row: [
        (cast_member.get("name"), (row["vote_average"], row["popularity"], 1))
        for cast_member in (
            ast.literal_eval(row["cast"]) if row["cast"] else []
        )
        if (
            isinstance(cast_member, dict)
            and cast_member.get("name")
            and row["vote_average"] is not None
            and row["popularity"] is not None
        )
    ]
)

reduced_cast_rdd = cast_stats_rdd.reduceByKey(
    lambda a, b: (
        a[0] + b[0],  # sum of vote_average
        a[1] + b[1],  # sum of popularity
        a[2] + b[2]   # count of movies
    )
)

average_cast_rdd = reduced_cast_rdd.mapValues(
    lambda x: (
        x[0] / x[2],  # avg vote
        x[1] / x[2],  # avg popularity
        x[2]          # num films
    )
)

results = average_cast_rdd.collect()

print("\nTop by average vote:")
for name, (avg_vote, avg_pop, count) in sorted(results, key=lambda x: -x[1][0])[:10]:
    print(f"{name}: Films={count}, Avg Vote={avg_vote:.2f}")

print("\nTop by average popularity:")
for name, (avg_vote, avg_pop, count) in sorted(results, key=lambda x: -x[1][1])[:10]:
    print(f"{name}: Films={count}, Avg Popularity={avg_pop:.2f}")

print("\nTom Hanks stats:")
for name, (avg_vote, avg_pop, count) in results:
    if name == "Tom Hanks":
        print(f"{name}: Films={count}, Avg Vote={avg_vote:.2f}, Avg Popularity={avg_pop:.2f}")



Top by average vote:
Bernard Marbaix: Films=1, Avg Vote=5507396.00
Jérémie Segard: Films=1, Avg Vote=5507396.00
François Olivier: Films=1, Avg Vote=5507396.00
Mireille Bailly: Films=2, Avg Vote=2753701.35
Stéphane Bissot: Films=3, Avg Vote=1835802.70
Déborah François: Films=8, Avg Vote=688430.09
Jérémie Renier: Films=13, Avg Vote=423651.35
Olivier Gourmet: Films=22, Avg Vote=250342.22
Saul Bamberger: Films=1, Avg Vote=96.00
Elias Meintjies: Films=1, Avg Vote=96.00

Top by average popularity:
Kyle Balda: Films=1, Avg Popularity=547.49
Alex Dowding: Films=1, Avg Popularity=547.49
Dave Rosenbaum: Films=2, Avg Popularity=275.30
Paul Thornley: Films=3, Avg Popularity=187.32
Vladislav Koulikov: Films=1, Avg Popularity=183.87
Kazy Tauginas: Films=1, Avg Popularity=183.87
Alexander Frekey: Films=1, Avg Popularity=183.87
Samantha Crawford: Films=1, Avg Popularity=183.87
Toby Leonard Moore: Films=1, Avg Popularity=183.87
Scott Tixier: Films=1, Avg Popularity=183.87

Tom Hanks stats:
Tom Hanks: 