In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract
from pyspark.sql.functions import regexp_extract, col, when
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when, regexp_extract

In [31]:


# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Netflix Analysis") \
    .getOrCreate()

# Load the dataset into a PySpark DataFrame
df_spark = spark.read.csv("/content/netflix_titles.csv", header=True, inferSchema=True)

# Show the first few rows to confirm
df_spark.show(5)


+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|       director|                cast|      country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|Kirsten Johnson|                NULL|United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|           NULL|Ama Qamata, Khosi...| South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglands|Julien Leclercq|Sami Bouajila, Tr...|         NULL|Septem

In [32]:


# Extract numeric duration for movies using regular expression
df_spark = df_spark.withColumn(
    "duration_minutes",
    when(col("type") == "Movie",
         regexp_extract(col("duration"), r"(\d+)", 0).cast("int"))
    .otherwise(None)
)

# Show the data with the new column
df_spark.select("title", "type", "duration", "duration_minutes").show(5)


+--------------------+-------+---------+----------------+
|               title|   type| duration|duration_minutes|
+--------------------+-------+---------+----------------+
|Dick Johnson Is Dead|  Movie|   90 min|              90|
|       Blood & Water|TV Show|2 Seasons|            NULL|
|           Ganglands|TV Show| 1 Season|            NULL|
|Jailbirds New Orl...|TV Show| 1 Season|            NULL|
|        Kota Factory|TV Show|2 Seasons|            NULL|
+--------------------+-------+---------+----------------+
only showing top 5 rows



In [33]:
# Remove duplicates
df_spark = df_spark.dropDuplicates()

# Handle missing values (e.g., drop rows where essential columns are missing)
df_spark = df_spark.dropna(subset=["title", "type"])

# Extract duration (for movies) in minutes using regular expression
df_spark = df_spark.withColumn(
    "duration_minutes",
    when(col("type") == "Movie",
         regexp_extract(col("duration"), r"(\d+)", 0).cast("int"))
    .otherwise(None)
)

# Show the data with the new column
df_spark.select("title", "type", "duration", "duration_minutes").show(5)


+--------------------+-----+--------+----------------+
|               title| type|duration|duration_minutes|
+--------------------+-----+--------+----------------+
|The Haunting in C...|Movie| 101 min|             101|
|What Dreams May Come|Movie| 114 min|             114|
|        Blue Miracle|Movie|  97 min|              97|
|   Playing for Keeps|Movie| 106 min|             106|
|     Phir Hera Pheri|Movie| 150 min|             150|
+--------------------+-----+--------+----------------+
only showing top 5 rows



In [34]:
# Use VectorAssembler with handleInvalid="skip"
assembler = VectorAssembler(
    inputCols=["release_year", "rating_index"],
    outputCol="features",
    handleInvalid="skip"  # Skips rows with null values in the feature columns
)


In [35]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Netflix Classification") \
    .getOrCreate()

# Dataset testing)
data = [
    ("s1", "Movie", "Dick Johnson Is Dead", "Kirsten Johnson", "United States", 2020, "PG-13", "90 min"),
    ("s2", "TV Show", "Blood & Water", None, "South Africa", 2021, "TV-MA", "2 Seasons"),
    ("s3", "TV Show", "Ganglands", "Julien Leclercq", "France", 2021, "TV-MA", "1 Season"),
    ("s4", "TV Show", "Jailbirds New Orleans", None, "United States", 2021, "TV-MA", "1 Season"),
    ("s5", "TV Show", "Kota Factory", None, "India", 2021, "TV-MA", "2 Seasons")
]

# Create DataFrame
columns = ["show_id", "type", "title", "director", "country", "release_year", "rating", "duration"]
df_spark = spark.createDataFrame(data, columns)

# Convert 'release_year' to integer and 'duration' to minutes
df_spark = df_spark.withColumn("release_year", col("release_year").cast("int"))
df_spark = df_spark.withColumn("duration_minutes",
                               when(col("duration").rlike(r"(\d+) min"),
                                    regexp_extract(col("duration"), r"(\d+)", 0).cast("int"))
                               .otherwise(None))

# Drop rows with NULL values in 'release_year' and 'rating' (just a simple cleanup)
df_spark = df_spark.dropna(subset=["release_year", "rating"])

# Index the 'type' column (Movie vs TV Show) to convert it into a numeric value
indexer = StringIndexer(inputCol="type", outputCol="type_index")

# Index the 'rating' column to convert categorical ratings to numeric values (handle nulls)
rating_indexer = StringIndexer(inputCol="rating", outputCol="rating_index", handleInvalid="skip")

# Assemble features: we'll use 'release_year' and 'rating_index' as features for classification
assembler = VectorAssembler(inputCols=["release_year", "rating_index"], outputCol="features", handleInvalid="skip")

# RandomForestClassifier for classification
rf = RandomForestClassifier(labelCol="type_index", featuresCol="features")

# Set up the pipeline with the transformations
pipeline = Pipeline(stages=[rating_indexer, indexer, assembler, rf])

# Split the data into training and test sets
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=1234)

# Fit the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Create a mapping for predictions to human-readable labels
predictions = predictions.withColumn("predicted_type",
                                     when(col("prediction") == 0.0, "TV Show")
                                     .when(col("prediction") == 1.0, "Movie")
                                     .otherwise("Unknown"))

# Show the predictions with the human-readable labels
predictions.select("title", "type", "predicted_type").show(5)


+-------------+-------+--------------+
|        title|   type|predicted_type|
+-------------+-------+--------------+
|Blood & Water|TV Show|       TV Show|
+-------------+-------+--------------+

