In [29]:
from pyspark.sql.functions import *
from pyspark.sql.types import (
    StructType, StructField, IntegerType, StringType,
    FloatType, ArrayType, DateType, LongType,
    BooleanType
)

In [30]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("dev-app")\
        .master("spark://spark-master:7077")\
        .getOrCreate()

In [31]:
def read_json_file(file_path):
    df = spark.read \
                .option("mode", "PERMISSIVE") \
                .option("multiline","true") \
                .json(file_path)
    return df

In [32]:
movie_details_df = read_json_file("raw_2025_06_18_movie_details.json")
movie_details_clean = movie_details_df \
                    .select(col("id").cast(IntegerType()), 
                            "imdb_id", 
                            "original_title", 
                            "title", 
                            col("budget").cast(LongType()), 
                            transform(col("genres"), lambda x: struct(
                                                                        x["id"].cast(IntegerType()).alias("genre_id"),
                                                                        x["name"].cast(StringType()).alias("genre_name")
                                                                     )).alias("genres"), 
                            "origin_country",
                            col("popularity").cast(FloatType()),
                            transform(col("production_companies"), lambda x: struct(
                                                                                    x["id"].cast(IntegerType()).alias("company_id"),
                                                                                    x["name"].alias("company_name"),
                                                                                    x["origin_country"].alias("company_country")
                                                                                )).alias("production_companies"), 
                            col("release_date").cast(DateType()),
                            col("revenue").cast(LongType()), 
                            col("runtime").cast(IntegerType()), 
                            col("vote_average").cast(FloatType()), 
                            col("vote_count").cast(IntegerType())) \
                    .withColumn(
                        "genres",
                        when(size(col("genres")) == 0, lit(None)).otherwise(col("genres"))
                    ) \
                    .withColumn(
                        "production_companies",
                        when(size(col("production_companies")) == 0, lit(None)).otherwise(col("production_companies"))
                    ) \
                    .dropna(subset=["id","popularity"]) \
                    .dropDuplicates(["id"]) \
                    .withColumnRenamed("id", "movie_id") \
                    .replace(0, None, ["budget", "revenue"])

movie_details_clean.printSchema(), movie_details_clean.count(), movie_details_df.count()

                                                                                

root
 |-- movie_id: integer (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- title: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- genre_id: integer (nullable = true)
 |    |    |-- genre_name: string (nullable = true)
 |-- origin_country: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- company_id: integer (nullable = true)
 |    |    |-- company_name: string (nullable = true)
 |    |    |-- company_country: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: integer (nullable = true)



                                                                                

(None, 9203, 9203)

In [33]:
credit_df = read_json_file("raw_2025_06_18_movie_credits.json")
credit_clean = credit_df \
                .withColumn("movie_id", col("id").cast(IntegerType())) \
                .drop("id") \
                .withColumn(
                        "cast",
                        transform(col("cast"), lambda x: struct(x["id"].cast(IntegerType()).alias("person_id"), 
                                                                x["gender"].cast(IntegerType()).alias("gender"),
                                                                x["name"],
                                                                x["order"].cast(IntegerType()).alias("order"),
                                                                x["known_for_department"].alias("department"),
                                                               )
                                 )
                    ) \
                .withColumn(
                        "crew",
                        transform(col("crew"), lambda x: struct(x["id"].cast(IntegerType()).alias("person_id"), 
                                                                x["gender"].cast(IntegerType()).alias("gender"),
                                                                x["name"],
                                                                x["department"],
                                                                x["job"]
                                                               )
                                 )
                    ) \
                .dropna(subset=["movie_id"]) \
                .dropDuplicates(["movie_id"])
credit_clean.printSchema(), credit_clean.count()

                                                                                

root
 |-- cast: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- person_id: integer (nullable = true)
 |    |    |-- gender: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- order: integer (nullable = true)
 |    |    |-- department: string (nullable = true)
 |-- crew: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- person_id: integer (nullable = true)
 |    |    |-- gender: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- department: string (nullable = true)
 |    |    |-- job: string (nullable = true)
 |-- movie_id: integer (nullable = true)



                                                                                

(None, 9231)

In [34]:
crew_filter_df = credit_clean \
            .select(
                "movie_id",
                explode("crew").alias("crew")
            ) \
            .select(
                "movie_id",
                "crew.*"
            ) \
            .where((col("job") == "Director")| (col("job") == "Writer") | (col("job") == "Screenplay"))


crew_filter_df.printSchema(), crew_filter_df.count()

root
 |-- movie_id: integer (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- job: string (nullable = true)



                                                                                

(None, 25016)

In [35]:
cast_filter_df = credit_clean \
            .select(
                "movie_id",
                explode("cast").alias("cast")
            ) \
            .select(
                "movie_id",
                "cast.*"
            ) \
            .where(col("order") <= 20)


cast_filter_df.printSchema(), cast_filter_df.count()

root
 |-- movie_id: integer (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- order: integer (nullable = true)
 |-- department: string (nullable = true)



                                                                                

(None, 160565)

In [36]:
box_office_df = read_json_file("raw_2025_06_18_box_office.json")
box_office_df.printSchema(), box_office_df.count()

root
 |-- budget: string (nullable = true)
 |-- domestic_opening: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- worldwide_gross: string (nullable = true)



(None, 8942)

In [37]:
box_office_clean = box_office_df \
                    .withColumn("worldwide_gross", regexp_replace(col("worldwide_gross"), "\\$|,", "").cast(LongType())) \
                    .withColumn("domestic_opening", regexp_replace(col("domestic_opening"), ",", "").cast(LongType())) \
                    .withColumn("budget", regexp_replace(col("budget"), ",", "").cast(LongType())) \
                    .dropna(subset=["imdb_id"]) \
                    .dropDuplicates(["imdb_id"])

In [38]:
box_office_clean.count(), box_office_clean.printSchema()

root
 |-- budget: long (nullable = true)
 |-- domestic_opening: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- worldwide_gross: long (nullable = true)



(8942, None)

In [39]:
# movie_details_clean.write.mode("overwrite").parquet("movie_details.parquet")
crew_filter_df.write.mode("overwrite").parquet("crew.parquet")
cast_filter_df.write.mode("overwrite").parquet("cast.parquet")
box_office_df.write.mode("overwrite").parquet("box_office.parquet")

                                                                                

In [40]:
spark.stop()