In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Creating a Catalog and Schema

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS netflix_catalog;
CREATE SCHEMA IF NOT EXISTS netflix_catalog.netflix_schema;

### Data Reading

In [0]:
df = spark.read.format("csv").option("header", True).load("/Volumes/netflix_catalog/netflix_schema/data/netflix_titles.csv")

In [0]:
df.display()

In [0]:
display(spark.createDataFrame([(df.count(), len(df.columns))], ["total_rows", "total_columns"]))

### Handling Duplicates

In [0]:
# Duplicate records
df.groupBy(df.columns).agg(count("*").alias("count")).filter(col("count") > 1).display()

In [0]:
df = df.dropDuplicates()

In [0]:
# Duplicate records
df.groupBy(df.columns).agg(count("*").alias("count")).filter(col("count") > 1).display()

### Handling null values

In [0]:
def find_null(df):
    l=[]
    from builtins import round
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        null_percent = round((null_count/df.count())*100,2)
        l.append([column, null_count, null_percent])

    display(spark.createDataFrame(l, ["column", "null_count", "null_%"]))

In [0]:
find_null(df)

**How null values are handled?**<br>
show_id -> drop record<br>
title -> drop record<br>
date added -> drop record <br>
release year -> drop record <br>
director -> replace null by "NA" <br>
cast -> replace null by "NA" <br>
listed_in -> replace null by "NA" <br>
description -> replace null by "NA" <br>
duration -> replace null by "NA" <br>
type -> replace null by mode <br>
country -> replace null by mode <br>
rating -> replace null by mode

In [0]:
# Dropping the entire row if show_id, title, date_added, release_year is null
df = df.dropna(subset=["show_id", "title", "date_added", "release_year"])

In [0]:
# Replacing null values in director, cast, listed_in, description, duration by "NA"
df = df.fillna({"director": "NA", "cast": "NA", "listed_in": "NA", "description": "NA", "duration": "NA"})

In [0]:
# Replacing null values in type, country, rating by most frequent value
mode_dict = {}
for col_name in ["type", "country", "rating"]:
    mode_value = df.groupBy(col_name).count().orderBy(desc("count")).first()[0]
    mode_dict[col_name] = mode_value

df = df.fillna(mode_dict)

In [0]:
find_null(df)

### Column Analysis

In [0]:
# Function to find value counts
def value_count(df, column):
    df.groupBy(column).count().orderBy(desc("count")).display()

In [0]:
df.display()

Column -> Show_ID

In [0]:
# Find rows where show_id does not start with "s" or does not end with an integer
pattern = r"^s.*\d$"
df.filter(~col("show_id").rlike(pattern)).display()

# Regular expression explanation:
# ^s      : start of string, followed by 's'
# .*      : any characters (zero or more)
# \d$     : ends with a digit

Column -> Type

In [0]:
value_count(df, "type")

Column -> title

In [0]:
df = df.withColumn("title", trim(col("title")))

Column -> Country

In [0]:
value_count(df, "country")

In [0]:
df = df.withColumn("country", split("country", ","))
df.display()

In [0]:
df = df.withColumn("country", transform("country", lambda x: trim(x)))
df.display()

In [0]:
df = df.withColumn("country", concat_ws(",", "country"))
df.display()

Column -> Director

In [0]:
df = df.withColumn("director", split("director", ","))
df = df.withColumn("director", transform("director", lambda x: trim(x)))
df = df.withColumn("director", concat_ws(",", "director"))
df.display()

In [0]:
df.filter(col("country").startswith(" ") | col("country").endswith(" ")).display()

Column -> Date Added

In [0]:
value_count(df,"date_added")

In [0]:
df.filter(col("date_added").startswith(" ") | col("date_added").endswith(" ")).display()

In [0]:
df = df.withColumn("date_added", trim(col("date_added")))
df.display()

In [0]:
# Finding rows in 'date_added' column that do not match the expected date format "MMMM d, yyyy"
# The regex checks for a full month name, a space, 1-2 digit day, comma, space, and 4 digit year

pattern = r"^(January|February|March|April|May|June|July|August|September|October|November|December) [1-9]{1}[0-9]?, \d{4}$"
df.filter(~col("date_added").rlike(pattern)).display()

In [0]:
df = df.filter(col("date_added").rlike(pattern))
df.display()

Column -> release_year

In [0]:
df = df.withColumn("release_year", trim(col("release_year")))

In [0]:
value_count(df,"release_year")

Column -> rating

In [0]:
df = df.withColumn("rating", trim(col("rating")))

In [0]:
value_count(df,"rating")

In [0]:
df.filter(col("rating").contains("min")).display()

In [0]:
swap_show_ids = ["s5814", "s5542", "s5795"]  

df = df.withColumn("duration",when(col("show_id").isin(swap_show_ids), col("rating")).otherwise(col("duration"))) \
      .withColumn("rating", when(col("show_id").isin(swap_show_ids), "NA").otherwise(col("rating")))

In [0]:
df.filter(col("rating").contains("NA")).display()

In [0]:
# Calculate mode of 'rating' excluding 'NA'
mode_rating = df.filter(col("rating") != "NA") \
    .groupBy("rating") \
    .count() \
    .orderBy(desc("count")) \
    .first()["rating"]

# Replace 'NA' in 'rating' with the mode
df = df.withColumn("rating", when(col("rating") == "NA", mode_rating).otherwise(col("rating")))

In [0]:
df.filter(col("show_id").isin(["s5814", "s5542", "s5795"])).display()

In [0]:
value_count(df,"rating")

Column -> duration

In [0]:
df = df.withColumn("duration", trim(col("duration")))

In [0]:
df = df.withColumn("duration_type", split(col("duration"), " ")[1]) \
      .withColumn("duration", split(col("duration"), " ")[0]) 

Column -> duration_type

In [0]:
value_count(df,"duration_type")

Column -> listed_in

In [0]:
df = df.withColumn("listed_in", trim(col("listed_in")))

In [0]:
df = df.withColumn("listed_in", split("listed_in", ","))
df = df.withColumn("listed_in", transform("listed_in", lambda x: trim(x)))
df = df.withColumn("listed_in", concat_ws(",", "listed_in"))
df.display()

Column -> Description

In [0]:
df = df.withColumn("description", trim(col("description")))

### Data Type Conversion

In [0]:
df = df.withColumn("date_added", to_date(col("date_added"), "MMMM d, yyyy")) \
       .withColumn("release_year", col("release_year").cast(IntegerType())) \
       .withColumn("duration", col("duration").cast(IntegerType()))

df.display()

### Data Writing

In [0]:
df.write.format("delta").mode("overwrite").save("/Volumes/netflix_catalog/netflix_schema/cleaned")

In [0]:
df.write.mode("overwrite").saveAsTable("netflix_catalog.netflix_schema.cleaned_data")