In [12]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

# Netflix Content Analysis

This notebook performs a comprehensive analysis of the Netflix dataset, specifically focusing on content types, null value handling, and statistical summaries. The analysis includes:

- **Initial Exploration**: Loads and explores the dataset with schema inspection and a preview of the first few rows.
- **Data Cleaning**: Handles missing values by filling them with default placeholders and filters out incomplete or invalid rows.
- **Data Transformation**: Converts specific columns (e.g., release year and duration) into more suitable formats for analysis.
- **Content Filtering**: Focuses only on 'Movie' and 'TV Show' content, ensuring a relevant dataset for further exploration.
- **Deduplication**: Removes duplicate entries to ensure data integrity.
- **Statistical Summary**: Provides descriptive statistics and null value investigation to assess the quality of the data.
- **Categorical Analysis**: Analyzes the distribution of content types and top countries by content count.
- **Temporal Distribution**: Examines how content is distributed over time by release year and type.

This notebook aims to give an overview of Netflix's content landscape and help identify key trends and insights.


In [33]:
import pyspark.sql.functions as psf
from pyspark.sql import SparkSession

# Spark initialization with custom configuration
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Netflix Content Analysis") \
    .getOrCreate()

# Dataset ingestion
netflix_data = spark.read.csv('netflix_titles.csv', header=True, inferSchema=True)

# Initial data exploration
print("Schema of the Data:")
netflix_data.printSchema()

print("\nFirst 5 Rows of the Data:")
netflix_data.show(5, truncate=False)

# Content type filtering with expanded validation
accepted_content_types = ['Movie', 'TV Show']
netflix_data = netflix_data.filter(netflix_data['type'].isin(accepted_content_types))

# Comprehensive null value handling
netflix_data = netflix_data.fillna({
    'director': 'Uncredited',
    'cast': 'Ensemble',
    'country': 'International'
})

# Strict data validation
netflix_data = netflix_data.filter(
    netflix_data['type'].isNotNull() &
    netflix_data['title'].isNotNull()
)

# Type casting for better analysis
netflix_data = netflix_data.withColumn(
    'release_year',
    netflix_data['release_year'].cast('int')
)

# Advanced duration extraction
netflix_data = netflix_data.withColumn(
    'duration_minutes',
    psf.regexp_extract('duration', r'(\d+)', 1).cast('int')
)

# Deduplication
netflix_data = netflix_data.dropDuplicates(['show_id'])

# Reporting and analytics
total_entries = netflix_data.count()
column_count = len(netflix_data.columns)

print(f"\nTotal Entries: {total_entries:,}")
print(f"Total Columns: {column_count}")

# Null value investigation
print("\nNull Value Investigation:")
for column in netflix_data.columns:
    null_count = netflix_data.filter(netflix_data[column].isNull()).count()
    print(f"Nulls in '{column}': {null_count}")

# Statistical summaries
print("\nStatistical Summaries:")
netflix_data.describe().show(truncate=False)

# Categorical analyses
print("\nContent Type Distribution:")
netflix_data.groupBy('type').count().show(truncate=False)

print("\nTop 10 Countries with Most Content:")
netflix_data.groupBy('country').count().orderBy('count', ascending=False).show(10, truncate=False)

# Temporal distribution
print("\nTemporal Distribution of Content by Release Year and Type:")
netflix_data.groupBy('release_year', 'type').count().orderBy('release_year', ascending=True).show(truncate=False)


Schema of the Data:
root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)


First 5 Rows of the Data:
+-------+-------+---------------------+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------+------------+------+---------+--------------------------------------------------

### Null Value Handling and Categorical Analysis

This section focuses on:
1. **Null Value Handling**:
   - Replacing missing values in key columns with meaningful defaults, such as "Uncredited" for directors, "Global" for country, and "Unspecified" for the date added.

2. **Categorical Analysis**:
   - Counts the number of unique categories in key categorical columns (`type`, `country`, `rating`, `listed_in`) to assess data diversity.

3. **Country-Based Content Distribution**:
   - Analyzes and displays the top 5 countries with the most content entries in the dataset, ranked by content count.

The results aim to highlight trends in categorical data and provide a geographical breakdown of Netflix's content.


In [34]:
from pyspark.sql.functions import col, lit, count, desc

# Comprehensive null value handling with expanded replacements
netflix_data = netflix_data.fillna({
    'director': 'Uncredited',
    'cast': 'Ensemble Cast',
    'country': 'Global',
    'date_added': 'Unspecified',
    'rating': 'Unrated',
    'release_year': 'Undetermined',
    'title': 'Untitled Content'
})

# Improved output for categorical analysis
print("\nCategorical Analysis:")
categorical_columns = ['type', 'country', 'rating', 'listed_in']
for column_name in categorical_columns:
    unique_value_count = netflix_data.select(column_name).distinct().count()
    print(f"  - '{column_name}': {unique_value_count:,} distinct categories")

# Improved country-based content distribution analysis
print("\nTop 5 Countries with Most Content:")
top_countries = netflix_data.groupBy('country') \
    .count() \
    .orderBy(desc('count')) \
    .limit(5) \
    .select(col('country').alias('Country'), col('count').alias('Content Count'))

top_countries.show(truncate=False)



Categorical Analysis:
  - 'type': 2 distinct categories
  - 'country': 768 distinct categories
  - 'rating': 36 distinct categories
  - 'listed_in': 534 distinct categories

Top 5 Countries with Most Content:
+--------------+-------------+
|Country       |Content Count|
+--------------+-------------+
|United States |2805         |
|India         |972          |
|International |831          |
|United Kingdom|419          |
|Japan         |245          |
+--------------+-------------+



### Temporal Analysis of Content Additions

This section focuses on parsing and analyzing the dates when content was added to Netflix:

1. **Date Parsing**:
   - Configures Spark's legacy time parsing policy to handle the provided date format.
   - Converts the `date_added` column into a proper date format (`MMMM d, yyyy`).

2. **Data Filtering**:
   - Filters out entries with invalid or null `date_added` values to ensure temporal accuracy.

3. **Year Extraction**:
   - Extracts the year from the `date_added` column to enable year-wise analysis of content additions.

4. **Temporal Distribution**:
   - Displays a summary of content additions grouped by year, sorted chronologically, to identify trends over time.

This analysis highlights when Netflix added the most content, offering insights into its content growth strategy.


In [35]:
# Configure Spark's time parsing policy for legacy compatibility
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

from pyspark.sql.functions import to_date, year, col

# Date parsing and transformation
netflix_data = netflix_data.withColumn(
    "date_added",
    to_date("date_added", "MMMM d, yyyy")
)

# Robust date filtering to remove invalid entries
netflix_data = netflix_data.filter(col("date_added").isNotNull())

# Extract year of addition for temporal analysis
netflix_data = netflix_data.withColumn("year_added", year("date_added"))

# Temporal distribution of content additions
print("\nTemporal Distribution of Content Additions by Year:")
temporal_distribution = netflix_data.groupBy("year_added") \
    .count() \
    .orderBy("year_added") \
    .select(
        col("year_added").alias("Year Added"),
        col("count").alias("Content Count")
    )

temporal_distribution.show(truncate=False)



Temporal Distribution of Content Additions by Year:
+----------+-------------+
|Year Added|Content Count|
+----------+-------------+
|2008      |2            |
|2009      |2            |
|2010      |1            |
|2011      |13           |
|2012      |3            |
|2013      |10           |
|2014      |23           |
|2015      |72           |
|2016      |418          |
|2017      |1162         |
|2018      |1623         |
|2019      |1997         |
|2020      |1872         |
|2021      |1491         |
+----------+-------------+



### Numeric Duration Extraction and Analysis

This section processes and analyzes the duration of Netflix content:

1. **Duration Transformation**:
   - Extracts numeric values from the `duration` column using a regular expression.
   - Converts the extracted values into integers for numerical analysis.

2. **Average Duration Calculation**:
   - Focuses on the `Movie` content type to calculate the average duration (in minutes).

3. **Output**:
   - Displays the average duration of movies in a clear, tabular format.

This analysis provides insights into the typical runtime of Netflix movies, enabling a better understanding of content characteristics.


In [36]:
# Extract numeric duration values from content duration strings
from pyspark.sql.functions import regexp_extract

# Transform duration into numeric values
netflix_data = netflix_data.withColumn(
    "duration_numeric",
    regexp_extract('duration', r'(\d+)', 1).cast('int')
)

# Compute average duration for movies
print("\nAverage Duration for Movies (in minutes):")
average_duration = netflix_data \
    .filter(netflix_data.type == 'Movie') \
    .groupBy('type') \
    .avg('duration_numeric') \
    .select(
        col("type").alias("Content Type"),
        col("avg(duration_numeric)").alias("Average Duration (Minutes)")
    )

average_duration.show(truncate=False)



Average Duration for Movies (in minutes):
+------------+--------------------------+
|Content Type|Average Duration (Minutes)|
+------------+--------------------------+
|Movie       |99.57911962035674         |
+------------+--------------------------+

