In [1]:
#Bibliotecas para poder trabajar con Spark
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
#Configuración de Spark con Python
!pip install -q findspark
!pip install pyspark

#Estableciendo variable de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init()
findspark.find()

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Connecting to security.ubuntu.com (185.125.190.36)] [Connected to cloud.r-p[0m                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:6 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease [18.1 kB]
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packages [1,455 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates/restricted amd64 Packages [1,415 kB]
Get:10 http://archive.ubuntu.com/ubuntu jammy-updates/universe am

'/content/spark-3.5.0-bin-hadoop3'

In [117]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml import PipelineModel, Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import split, col

from pyspark.ml import Transformer
from pyspark.ml.param import Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer
from pyspark.ml.classification import LogisticRegression


In [3]:
spark_session = SparkSession.builder.appName('Netflix').getOrCreate()
spark_session

In [119]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [121]:
df = spark_session.read.csv('/content/drive/My Drive/Colab Notebooks (2)/netflix_titles.csv', inferSchema=True, header=True)
df.show()

+--------+-------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+--------+--------+--------------------+--------------------+
| show_id|   type|               title|            director|                cast|             country|       date_added|release_year|  rating|duration|           listed_in|         description|
+--------+-------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+--------+--------+--------------------+--------------------+
|81145628|  Movie|Norm of the North...|Richard Finn, Tim...|Alan Marriott, An...|United States, In...|September 9, 2019|        2019|   TV-PG|  90 min|Children & Family...|Before planning a...|
|80117401|  Movie|Jandino: Whatever...|                NULL|    Jandino Asporaat|      United Kingdom|September 9, 2016|        2016|   TV-MA|  94 min|     Stand-Up Comedy|"Jandino Asporaat...|
|70234439|TV Show|  Transforme

In [122]:
df = df.withColumn("listed_in", split(col("listed_in"), ",").getItem(0))
df.show()

+--------+-------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+--------+--------+--------------------+--------------------+
| show_id|   type|               title|            director|                cast|             country|       date_added|release_year|  rating|duration|           listed_in|         description|
+--------+-------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+--------+--------+--------------------+--------------------+
|81145628|  Movie|Norm of the North...|Richard Finn, Tim...|Alan Marriott, An...|United States, In...|September 9, 2019|        2019|   TV-PG|  90 min|Children & Family...|Before planning a...|
|80117401|  Movie|Jandino: Whatever...|                NULL|    Jandino Asporaat|      United Kingdom|September 9, 2016|        2016|   TV-MA|  94 min|     Stand-Up Comedy|"Jandino Asporaat...|
|70234439|TV Show|  Transforme

In [123]:
df.printSchema()

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [124]:
df = df.dropna()

In [125]:
df = df.select("description", "type")  # Keep only the relevant columns

In [126]:
# Tokenize and remove stopwords
tokenizer = Tokenizer(inputCol="description", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_words")

In [127]:
# Convert words to numerical features using TF-IDF
hashing_tf = HashingTF(inputCol=stop_words_remover.getOutputCol(), outputCol="raw_features", numFeatures=1000)
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="features")

In [128]:
# Step 4: Split the Data
(training_data, testing_data) = df.randomSplit([0.8, 0.2], seed=42)

In [129]:
# Convert genre labels to numerical labels
string_indexer = StringIndexer(inputCol="type", outputCol="label")

In [130]:
# Step 5: Train a Text Classification Model
lr = LogisticRegression(featuresCol=idf.getOutputCol(), labelCol="label")
pipeline = Pipeline(stages=[tokenizer, stop_words_remover, hashing_tf, idf, string_indexer, lr])
model = pipeline.fit(training_data)

In [131]:
# Step 6: Make Predictions
predictions = model.transform(testing_data)
predictions.select("description", "type", "prediction").show()

+--------------------+-----+----------+
|         description| type|prediction|
+--------------------+-----+----------+
|"""Daily Show"" h...|Movie|       0.0|
|"""Parks and Recr...|Movie|       0.0|
|"""SNL"" star Mic...|Movie|       0.0|
|"A director and a...|Movie|       0.0|
|"A masked freedom...|Movie|       0.0|
|"After a one-nigh...|Movie|       0.0|
|"After winning a ...|Movie|       0.0|
|"April and Frank'...|Movie|       0.0|
|"Embracing his be...|Movie|       0.0|
|"Erik's peaceful ...|Movie|       0.0|
|"Everyone wants a...|Movie|       0.0|
|"Fearlessly funny...|Movie|       0.0|
|"Follow the meteo...|Movie|       0.0|
|"From Joe Berling...|Movie|       0.0|
|"In his first-eve...|Movie|       0.0|
|"In this intimate...|Movie|       0.0|
|"Known as the set...|Movie|       0.0|
|"The directors of...|Movie|       0.0|
|"This documentary...|Movie|       0.0|
|"Three teens spen...|Movie|       0.0|
+--------------------+-----+----------+
only showing top 20 rows



In [132]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

Accuracy: 0.9340974212034384


In [134]:
# Make predictions using the loaded model
new_descriptions = ["A suspenseful thriller with unexpected twists.", "A heartwarming family movie"]
new_data = spark_session.createDataFrame([(1, " ".join(new_descriptions))], ["id", "description"])
predictions = model.transform(new_data)
predictions.select("description", "prediction").show()

+--------------------+----------+
|         description|prediction|
+--------------------+----------+
|A suspenseful thr...|       0.0|
+--------------------+----------+



In [135]:
loaded_model = PipelineModel.load("netflix.h5")

In [136]:
new_data = spark_session.createDataFrame([(1, "A TV Show abput a Sample the bold of France with chefs who are pushing the boundaries of fine dining and reinventing a rich culinary tradition.")], ["id", "description"])
predictions = loaded_model.transform(new_data)

In [137]:
predictions.select("description", "prediction").show()

+--------------------+----------+
|         description|prediction|
+--------------------+----------+
|A TV Show abput a...|       0.0|
+--------------------+----------+

