# TMDB Movies Data Cleaning


In [17]:
# locating the Spark installation
import findspark
findspark.init()

In [18]:
# Initialize Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MovieDataCleaning").getOrCreate()


In [None]:
# Import necessary libraries for the data cleaning process
from pyspark.sql.functions import col, expr, when, concat_ws, size, to_date, round

# Function to clean movie data
def clean_movie_data(filepath):
    # Read the Parquet file
    df = spark.read.parquet(filepath)
    # Drop irrelevant columns
    df = df.drop(*['adult', 'imdb_id', 'original_title', 'video', 'homepage'])
    # Extract collection name
    df = df.withColumn(
        "collection_name",
        when(col("belongs_to_collection").isNotNull(), col("belongs_to_collection.name")).otherwise("")
    )

    # Flatten array of genres into pipe-separated string
    df = df.withColumn(
        "genres",
        when(col("genres").isNotNull(), concat_ws("|", expr("transform(genres, x -> x.name)"))).otherwise("")
    )

    # Flatten spoken languages
    df = df.withColumn(
        "spoken_languages",
        when(col("spoken_languages").isNotNull(), concat_ws("|", expr("transform(spoken_languages, x -> x.name)"))).otherwise("")
    )

    # Flatten production countries
    df = df.withColumn(
        "production_countries",
        when(col("production_countries").isNotNull(), concat_ws("|", expr("transform(production_countries, x -> x.name)"))).otherwise("")
    )

    # Flatten production companies
    df = df.withColumn(
        "production_companies",
        when(col("production_companies").isNotNull(), concat_ws("|", expr("transform(production_companies, x -> x.name)"))).otherwise("")
    )

    # Cast names (first 5 only)
    df = df.withColumn(
        "cast_names",
        when(col("cast").isNotNull(), concat_ws("|", expr("transform(slice(cast, 1, 5), x -> x.name)"))).otherwise("")
    )

    # Directors from crew
    df = df.withColumn(
        "directors",
        when(col("crew").isNotNull(),
             concat_ws("|", expr("transform(filter(crew, x -> x.job = 'Director'), x -> x.name)"))
        ).otherwise("")
    )

    # Cast size
    df = df.withColumn(
        "cast_size",
        when(col("cast").isNotNull(), size(col("cast"))).otherwise(0)
    )

    # Crew size
    df = df.withColumn(
        "crew_size",
        when(col("crew").isNotNull(), size(col("crew"))).otherwise(0)
    )

    # Drop original JSON-like columns
    df = df.drop("belongs_to_collection","cast", "crew")

    """Convert column datatypes:
    'budget', 'id', 'popularity' → Numeric (set invalid values to NaN).
    'release_date' → Convert to datetime.
    etc """
    df = df \
        .withColumn("budget", col("budget").cast("double")) \
        .withColumn("revenue", col("revenue").cast("double")) \
        .withColumn("id", col("id").cast("long")) \
        .withColumn("popularity", col("popularity").cast("double")) \
        .withColumn("release_date", to_date(col("release_date"), "yyyy-MM-dd"))
    
    # Convert 'budget' and 'revenue' to million USD.
    df = df\
        .withColumn("budget_musd", round(col("budget")/1000000,2))\
        .withColumn("revenue_musd", round(col("revenue")/1000000,2))
    
    df_clean = df.select(
    "id", "title", "tagline", "release_date", "genres", "collection_name",
    "original_language", "budget_musd", "revenue_musd", "production_companies",
    "production_countries", "vote_count", "vote_average", "popularity", "runtime",
    "overview", "spoken_languages", "poster_path", "cast_names", "cast_size", "directors", "crew_size"
   )
    
    

    return df_clean


In [52]:
df = clean_movie_data("../data/extracted_movies_df.parquet")
df.show(5)

+------+--------------------+--------------------+------------+--------------------+--------------------+-----------------+-----------+------------+--------------------+--------------------+----------+------------+----------+-------+--------------------+----------------+--------------------+--------------------+---------+--------------------+---------+
|    id|               title|             tagline|release_date|              genres|     collection_name|original_language|budget_musd|revenue_musd|production_companies|production_countries|vote_count|vote_average|popularity|runtime|            overview|spoken_languages|         poster_path|          cast_names|cast_size|           directors|crew_size|
+------+--------------------+--------------------+------------+--------------------+--------------------+-----------------+-----------+------------+--------------------+--------------------+----------+------------+----------+-------+--------------------+----------------+-------------------

In [53]:
# Save the cleaned DataFrame to a Parquet file
df.write.mode("overwrite").parquet("../data/cleaned_movies_df.parquet")