In [0]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Ch·ªçn feature

In [0]:
df = spark.table("netflix_catalog.silver.netflix_titles") \
    .select("type", "duration_minutes", "duration_seasons", "release_year", "rating")\
    .filter((col("type") == "Movie") | (col("type") == "TV Show"))\
    .withColumn("duration_minutes", col("duration_minutes").cast("double")) \
    .withColumn("duration_seasons", col("duration_seasons").cast("double")) \
    .withColumn("release_year", col("release_year").cast("double"))
df.display()

type,duration_minutes,duration_seasons,release_year,rating
Movie,90.0,1.0,2019.0,TV-PG
Movie,94.0,1.0,2016.0,TV-MA
TV Show,0.0,1.0,2013.0,TV-Y7-FV
TV Show,0.0,1.0,2016.0,TV-Y7
Movie,99.0,1.0,2017.0,TV-14
TV Show,0.0,1.0,2016.0,TV-MA
Movie,110.0,1.0,2014.0,R
Movie,60.0,1.0,2017.0,TV-MA
TV Show,0.0,1.0,2017.0,TV-MA
Movie,90.0,1.0,2014.0,R


### Chu·∫©n b·ªã d·ªØ li·ªáu

In [0]:
# C√°c b∆∞·ªõc chu·∫©n b·ªã pipeline
labelIndexer = StringIndexer(inputCol="type", outputCol="label")
ratingIndexer = StringIndexer(inputCol="rating", outputCol="ratingIndex")
assembler = VectorAssembler(
    inputCols=["duration_minutes", "duration_seasons", "release_year", "ratingIndex"],
    outputCol="features"
)

pipeline = Pipeline(stages=[labelIndexer, ratingIndexer, assembler])

# Fit pipeline tr√™n d·ªØ li·ªáu ƒë√£ √©p ki·ªÉu
df_prepared = pipeline.fit(df).transform(df)

df_prepared.show(5)

+-------+----------------+----------------+------------+--------+-----+-----------+--------------------+
|   type|duration_minutes|duration_seasons|release_year|  rating|label|ratingIndex|            features|
+-------+----------------+----------------+------------+--------+-----+-----------+--------------------+
|  Movie|            90.0|             1.0|      2019.0|   TV-PG|  0.0|        2.0|[90.0,1.0,2019.0,...|
|  Movie|            94.0|             1.0|      2016.0|   TV-MA|  0.0|        0.0|[94.0,1.0,2016.0,...|
|TV Show|             0.0|             1.0|      2013.0|TV-Y7-FV|  1.0|       10.0|[0.0,1.0,2013.0,1...|
|TV Show|             0.0|             1.0|      2016.0|   TV-Y7|  1.0|        7.0|[0.0,1.0,2016.0,7.0]|
|  Movie|            99.0|             1.0|      2017.0|   TV-14|  0.0|        1.0|[99.0,1.0,2017.0,...|
+-------+----------------+----------------+------------+--------+-----+-----------+--------------------+
only showing top 5 rows



### Chia d·ªØ li·ªáu train v√† test

In [0]:
train_data, test_data = df_prepared.randomSplit([0.8, 0.2], seed=42)

### Train model

In [0]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20)
model = rf.fit(train_data)

### ƒê√°nh gi√° model

In [0]:
predictions = model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy = {accuracy:.4f}")

Accuracy = 1.0000


### L∆∞u model

In [0]:
model.write().overwrite().save("abfss://model@netflixprojectdltp.dfs.core.windows.net/type_prediction")