In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
spark = SparkSession.builder.appName("Netfilx data analysis").getOrCreate()
print(spark.version)

In [None]:
file_path = "/data/netflix_titles.csv"  # Use forward slashes or double backslashes

In [None]:
netflix_data = spark.read.format('csv').option('header', 'true').option('inferSchema', True).load(file_path)
netflix_data.show()

In [None]:
total_rows = netflix_data.count()
print(f"Total rows: {total_rows}")

In [None]:
for c in netflix_data.columns:
    null_count = netflix_data.filter(col(c).isNull()).count()
    print(f"Column '{c}': {null_count} nulls")
    percentage_null = null_count / total_rows * 100
    print(f"Column '{c}': {percentage_null} %")

In [None]:
netflix_data_null_fill=netflix_data.na.fill({'director':'Unknown',
                      'cast':'Unknown','country':'Unknown'})
netflix_data_null_fill.show()

In [None]:
for c in netflix_data.columns:
    null_count = netflix_data_null_fill.filter(col(c).isNull()).count()
    percentage_null = null_count / total_rows * 100
    print(f"Column '{c}': {percentage_null} %")

In [None]:
netflix_nulls_drop = netflix_data_null_fill.na.drop()
netflix_nulls_drop.show()

In [None]:
netflix_nulls_drop.filter(col('type') == 'Movie').select('listed_in').show(truncate=False)

In [None]:
netflix_nulls_drop.filter(col('type') == 'TV Show').select('listed_in').show(truncate=False)

In [None]:
show_movie_type_df = netflix_nulls_drop.withColumn("movie_type",when(col("type") == "Movie", when(col("listed_in").contains("International"), "Global").otherwise("Local"))
                              ).withColumn("show_type",when(col("type") == "TV Show", when(col("listed_in").contains("International"), "Global").otherwise("Local")))
show_movie_type_df.show()

In [None]:
genre_df = show_movie_type_df.withColumn("genres", split(show_movie_type_df["listed_in"], ",\\s*")).drop('listed_in')  # Split by comma and whitespace
genre_df.select('genres').show(truncate=False)

In [None]:
tsa_columns_gen_df = genre_df.withColumn('added_date', to_date(genre_df['date_added'], "MMMM d, yyyy"))
# Extract year, month, date, and day_of_week
tsa_columns_gen_df = tsa_columns_gen_df.withColumn("year", date_format(col("added_date"), "yyyy")) \
       .withColumn("month", date_format(col("added_date"), "MMMM")) \
       .withColumn("day", date_format(col("added_date"), "d")) \
       .withColumn("day_of_week", date_format(col("added_date"), "EEEE"))

tsa_columns_gen_df.show(truncate=False)

In [None]:
country_split_df = (tsa_columns_gen_df.withColumn("country", regexp_replace(col('country'), r'^\s*,\s*', ''))
                    .withColumn("countries", split("country", r",\s*")))
country_explode_df = country_split_df.withColumn("country", explode_outer(country_split_df["countries"]))
country_explode_df.select('country','countries').show(truncate=False)

In [None]:
valid_country_file_path = r"/data/valid_countries.txt"
valid_countries_df = spark.read.format('csv').option('header', 'true').option('inferSchema', True).load(valid_country_file_path)

# 2. Clean the country column
cleaned_df = country_explode_df.join(valid_countries_df, country_explode_df["country"] == valid_countries_df["country"], "left_anti")

invalid_countries = list(set([row.country for row in cleaned_df.collect()]))

invalid_countries

In [None]:
country_clean_df = country_explode_df.withColumn(
    'country_clean',
    when(col("country").isin(invalid_countries), array(lit('Unknown'))).otherwise(col('countries'))
).drop('countries')

country_clean_df.show()

In [None]:
# Step 1: Read the JSON file into a DataFrame
lang_map_file_path = r"/data/countries_languages.json"
lang_map_df = spark.read.option("multiline", "true").json(lang_map_file_path)

lang_map_df.show(truncate=False)

In [None]:
join_df = lang_map_df.join(country_clean_df, 'country', "inner")
join_df.select('country','Languages','country_clean').show(truncate=False)

In [None]:
df_exploded = join_df.withColumn("Language", explode("Languages"))

# Group by country_clean and collect the languages into a list
languages_df = df_exploded.groupBy("country_clean").agg(collect_list("Language").alias("Languages_array"))
# Distinct the languages in Languages_array
languages_df = languages_df.withColumn("Languages_array", array_distinct(col("Languages_array")))
languages_df.show(truncate=False)


In [None]:
# Join back with the original DataFrame
result_df = join_df.join(languages_df, "country_clean", "left")
result_df.select('country_clean','Country','Languages_array','Languages').show(truncate=False)

In [None]:
result_df_cols_dropped = result_df.drop('Country','Languages').withColumnRenamed('country_clean','released_countries').withColumnRenamed('Languages_array','released_languages').dropDuplicates()
result_df_cols_dropped.select('show_id','released_countries','released_languages').show(truncate=False)

In [None]:
result_df_cols_dropped=result_df_cols_dropped.withColumn('release_year', col('release_year').cast('Integer'))
result_df_cols_dropped.show(truncate=False)

In [None]:
# Transform 'duration' column for movies and TV shows
durations_df = (
    result_df_cols_dropped
    .withColumn(
        'movie_duration',
        when(col('type') == 'Movie', regexp_replace(col('duration'), ' min', '').cast('Integer'))
        .otherwise(lit(None))  # Use None for NULL values
    )
    .withColumn(
        'seasons',
        when(col('type') == 'TV Show', regexp_replace(col('duration'), r'\D', '').cast('Integer'))
        .otherwise(lit(None))
    )
).drop('duration')

# Display the results
durations_df.show(truncate=False)


In [None]:
columns = [
    'show_id',
    'type',
    'title',
    'director',
    'cast',
    'date_added',
    'release_year',
    'rating',
    'movie_duration',
    'seasons',
    'description',
    'released_countries',
    'released_languages',  # Missing comma was added here
    'movie_type',
    'show_type',
    'genres',
    'added_date',
    'year',
    'month',
    'day',
    'day_of_week'
]
netflix_ordered_select = durations_df.select(*columns)
netflix_ordered_select.show()

In [None]:
OUTPUT_PATH = '/target_data'
netflix_ordered_select.write.format('parquet').mode('overwrite').save(OUTPUT_PATH)

In [None]:
netflix_cleaned_data = spark.read.parquet(OUTPUT_PATH)
netflix_cleaned_data.show(truncate=False)

In [None]:
netflix_ordered_select.printSchema()