Name: Khushal Khan 2021261
Assignment 3

In [6]:
pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, avg, max, min, split, length

# Initialize Spark Session
def init_spark_session(app_name):
    """Creates and returns a Spark session."""
    return SparkSession.builder.appName(app_name).getOrCreate()

In [None]:
# Loading Dataset
def load_dataset(spark_session, file_path):
    """Loads a dataset from a CSV file into a DataFrame."""
    return spark_session.read.csv(file_path, header=True, inferSchema=True)



In [9]:
# Basic Dataset Information
def display_basic_info(dataframe):
    """Displays schema and record count of the DataFrame."""
    print("\n--- Basic Dataset Information ---")
    dataframe.printSchema()
    print(f"Total Records: {dataframe.count()}")




In [10]:
# Top Directors by Titles
def analyze_top_directors(dataframe):
    """Displays top directors with the most titles."""
    print("\n--- Top Directors by Number of Titles ---")
    dataframe.groupBy("director") \
        .agg(count("*").alias("title_count")) \
        .orderBy(desc("title_count")) \
        .limit(10) \
        .show()



In [11]:
# Average Release Year by Content Type
def analyze_avg_release_year(dataframe):
    """Calculates and displays average release year by content type."""
    print("\n--- Average Release Year by Content Type ---")
    dataframe.groupBy("type") \
        .agg(avg("release_year").alias("avg_release_year")) \
        .orderBy("type") \
        .show()



In [12]:
# Content Duration Analysis
def analyze_content_duration(dataframe):
    """Analyzes and displays duration statistics by content type."""
    print("\n--- Content Duration Statistics ---")
    dataframe.withColumn("duration_numeric", split(col("duration"), " ")[0].cast("int")) \
        .groupBy("type") \
        .agg(
            avg("duration_numeric").alias("avg_duration"),
            max("duration_numeric").alias("max_duration"),
            min("duration_numeric").alias("min_duration")
        ) \
        .show()



In [13]:
# Top Countries by Genre Diversity
def analyze_genre_diversity(dataframe):
    """Displays countries with the most diverse genres."""
    print("\n--- Countries with the Most Diverse Genres ---")
    dataframe.groupBy("country") \
        .agg(count("listed_in").alias("genre_count")) \
        .orderBy(desc("genre_count")) \
        .limit(10) \
        .show()



In [14]:
# Titles with Longest Words
def analyze_longest_titles(dataframe):
    """Displays titles with the longest words."""
    print("\n--- Titles with Longest Words ---")
    dataframe.withColumn("title_length", length(col("title"))) \
        .orderBy(desc("title_length")) \
        .select("title", "title_length") \
        .limit(10) \
        .show()



In [15]:
# Group Content by Rating
def analyze_rating_distribution(dataframe):
    """Analyzes content count by rating."""
    print("\n--- Content Distribution by Rating ---")
    dataframe.groupBy("rating") \
        .agg(count("*").alias("count")) \
        .orderBy(desc("count")) \
        .show()



In [16]:
# Main EDA Function
def run_eda(dataframe):
    """Executes all EDA tasks on the given DataFrame."""
    display_basic_info(dataframe)
    analyze_top_directors(dataframe)
    analyze_avg_release_year(dataframe)
    analyze_content_duration(dataframe)
    analyze_genre_diversity(dataframe)
    analyze_longest_titles(dataframe)
    analyze_rating_distribution(dataframe)



In [17]:
# Script Execution
if __name__ == "__main__":
    # Initialize Spark Session
    spark = init_spark_session("Netflix Dataset EDA")

    # Define dataset path
    dataset_path = "netflix_titles.csv"

    # Load the dataset
    netflix_df = load_dataset(spark, dataset_path)

    # Run EDA
    run_eda(netflix_df)

    # Stop Spark Session
    spark.stop()


--- Basic Dataset Information ---
root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)

Total Records: 8809

--- Top Directors by Number of Titles ---
+--------------------+-----------+
|            director|title_count|
+--------------------+-----------+
|                NULL|       2636|
|       Rajiv Chilaka|         19|
|Raúl Campos, Jan ...|         18|
|        Marcus Raboy|         16|
|         Suhas Kadav|         16|
|           Jay Karas|         14|
| Cathy Garcia-Molina|         13|
|     Youssef Chahine|         12|
|     Martin Scorsese|         12|
|