In [1]:
import findspark
findspark.init('/home/gerardo-rodriguez/spark-4.0.0-bin-hadoop3')

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/27 10:14:41 WARN Utils: Your hostname, Lanz-Lenovo, resolves to a loopback address: 127.0.1.1; using 192.168.1.145 instead (on interface wlp2s0)
25/08/27 10:14:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/27 10:14:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/27 10:14:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read.csv('../Create_ratings/netflix_users.csv', header=True, inferSchema=True)

In [4]:
df.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Subscription_Type: string (nullable = true)
 |-- Watch_Time_Hours: double (nullable = true)
 |-- Favorite_Genre: string (nullable = true)
 |-- Last_Login: date (nullable = true)



# Create Column Churn

In [5]:
from pyspark.sql.functions import col, datediff, current_date, when

In [6]:
df_churn = df.withColumn('churn', when((col('Watch_Time_Hours') < 300) & (datediff(current_date(), col('Last_Login')) > 60), 1).otherwise(0))

In [7]:
df_ml = df_churn.withColumn('days_login', datediff(current_date(), col('Last_Login')))
df_ml.show(5)

+-------+--------------+---+-------+-----------------+----------------+--------------+----------+-----+----------+
|User_ID|          Name|Age|Country|Subscription_Type|Watch_Time_Hours|Favorite_Genre|Last_Login|churn|days_login|
+-------+--------------+---+-------+-----------------+----------------+--------------+----------+-----+----------+
|      1|James Martinez| 18| France|          Premium|           80.26|         Drama|2024-05-12|    1|       472|
|      2|   John Miller| 23|    USA|          Premium|          321.75|        Sci-Fi|2025-02-05|    0|       203|
|      3|    Emma Davis| 60|     UK|            Basic|           35.89|        Comedy|2025-01-24|    1|       215|
|      4|   Emma Miller| 44|    USA|          Premium|          261.56|   Documentary|2024-03-25|    1|       520|
|      5|    Jane Smith| 68|    USA|         Standard|           909.3|         Drama|2025-01-14|    0|       225|
+-------+--------------+---+-------+-----------------+----------------+---------

# Classification Model for churn

In [8]:
from pyspark.ml.classification import (LogisticRegression, RandomForestClassifier)
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [9]:
indexer = StringIndexer(inputCols=['Subscription_Type', 'Country'], outputCols=['subscription_indexed', 'country_indexed'])
indexed = indexer.fit(df_ml)

In [10]:
df_indexed = indexed.transform(df_ml)

In [11]:
df_indexed.columns

['User_ID',
 'Name',
 'Age',
 'Country',
 'Subscription_Type',
 'Watch_Time_Hours',
 'Favorite_Genre',
 'Last_Login',
 'churn',
 'days_login',
 'subscription_indexed',
 'country_indexed']

## Seleccion de variables 

Para la predicción de abandono (churn), se seleccionaron las siguientes variables por su relevancia potencial en el comportamiento del usuario:

Edad (Age): Los usuarios más jóvenes, al estar más familiarizados con la tecnología y con mayor disposición a explorar nuevas plataformas de streaming, presentan una mayor probabilidad de migrar. En contraste, los usuarios de mayor edad tienden a mantener hábitos más estables, lo que podría reflejarse en una menor tasa de abandono.

Horas de visualización (Watch_Time_Hours): Un mayor tiempo de visualización es un fuerte indicador de compromiso con la plataforma. Si el usuario encuentra contenido de interés y dedica muchas horas, la probabilidad de abandono disminuye significativamente.

Días desde el último acceso (Days_Login): La inactividad prolongada es una señal directa de desinterés o desconexión con el servicio. Usuarios que no han ingresado recientemente son más propensos a cancelar su suscripción.

Tipo de suscripción (Subscription_Type): Los usuarios con planes de mayor costo (como Premium) suelen compartir la cuenta con familiares o amigos. Este factor reduce la probabilidad de abandono, ya que la decisión involucra a más personas. En cambio, los planes básicos, generalmente individuales, presentan mayor volatilidad.

País (Country): La situación económica de cada país influye en la capacidad de pago. En regiones donde el valor del dólar representa un costo elevado, los usuarios pueden considerar la suscripción como un gasto prescindible, aumentando el riesgo de abandono.

## Features Selection

For churn prediction, the following variables were selected due to their potential relevance to user behavior:

Age: Younger users, being more familiar with technology and more inclined to explore alternative streaming platforms, may have a higher probability of leaving. In contrast, older users often maintain more stable habits, which may translate into lower churn rates.

Watch Time (Watch_Time_Hours): Higher watch time is a strong indicator of user engagement. If users find content of interest and dedicate significant hours to watching, the likelihood of churn decreases.

Days Since Last Login (Days_Login): Extended inactivity is a direct signal of disengagement. Users who have not logged in recently are more likely to cancel their subscription.

Subscription Type (Subscription_Type): Users with higher-tier plans (e.g., Premium) often share accounts with family or friends. This shared dependency lowers the probability of churn, as the decision to cancel involves multiple people. Conversely, basic plans, usually for individual use, tend to be more volatile.

Country: Economic conditions in each country affect users’ ability to pay. In regions where the dollar represents a relatively high cost, users may perceive the subscription as a non-essential expense, increasing churn risk.

In [12]:
asembler = VectorAssembler(inputCols=['Age',
 'Watch_Time_Hours',
 'days_login',
 'subscription_indexed',
 'country_indexed'], outputCol='features')

In [13]:
df_assembled = asembler.transform(df_indexed)

In [14]:
df_assembled.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Subscription_Type: string (nullable = true)
 |-- Watch_Time_Hours: double (nullable = true)
 |-- Favorite_Genre: string (nullable = true)
 |-- Last_Login: date (nullable = true)
 |-- churn: integer (nullable = false)
 |-- days_login: integer (nullable = true)
 |-- subscription_indexed: double (nullable = false)
 |-- country_indexed: double (nullable = false)
 |-- features: vector (nullable = true)



In [15]:
train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=1)

### Logistic Regression

In [16]:
logistic = LogisticRegression(featuresCol='features', labelCol='churn')
model = logistic.fit(train_data)

25/08/27 10:14:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [17]:
prediction = model.transform(test_data)

In [18]:
prediction.select(['churn', 'prediction']).show()

+-----+----------+
|churn|prediction|
+-----+----------+
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows


In [19]:
eva_log = BinaryClassificationEvaluator(labelCol='churn')
pred_log = eva_log.evaluate(prediction)

In [20]:
print("Evaluation Logistic regression")
pred_log

Evaluation Logistic regression


0.999999415457866

### Random Forest

In [21]:
rf = RandomForestClassifier(featuresCol='features', labelCol='churn', maxDepth=20, numTrees=200, seed=1)
model_rf = rf.fit(train_data)

25/08/27 10:15:08 WARN DAGScheduler: Broadcasting large task binary with size 1056.2 KiB
25/08/27 10:15:09 WARN DAGScheduler: Broadcasting large task binary with size 1535.9 KiB
25/08/27 10:15:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
25/08/27 10:15:11 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
25/08/27 10:15:12 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
25/08/27 10:15:14 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
25/08/27 10:15:15 WARN DAGScheduler: Broadcasting large task binary with size 4.9 MiB
25/08/27 10:15:16 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
25/08/27 10:15:17 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
25/08/27 10:15:18 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
25/08/27 10:15:19 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
25/08/27 10:15:20 WARN DAGScheduler: Broadcastin

In [22]:
predict_rf = model_rf.transform(test_data)
predict_rf.select(['churn', 'prediction']).show(5)

25/08/27 10:15:23 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB


+-----+----------+
|churn|prediction|
+-----+----------+
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 5 rows


In [23]:
eva_accuracy = MulticlassClassificationEvaluator(labelCol='churn', metricName='accuracy')
eva_precision = MulticlassClassificationEvaluator(labelCol='churn', metricName='weightedPrecision')
eva_recall = MulticlassClassificationEvaluator(labelCol='churn', metricName='weightedRecall')
eva_f1 = MulticlassClassificationEvaluator(labelCol='churn', metricName='f1')

accuracy = eva_accuracy.evaluate(predict_rf)
precision = eva_precision.evaluate(predict_rf)
recall = eva_recall.evaluate(predict_rf)
f1 = eva_f1.evaluate(predict_rf)

25/08/27 10:15:24 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
25/08/27 10:15:24 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
25/08/27 10:15:25 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
25/08/27 10:15:26 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB


In [24]:
print("Métricas Random Forest")
print(f"Accuracy : {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall   : {recall:.4f}")
print(f"F1 Score : {f1:.4f}")

Métricas Random Forest
Accuracy : 0.9954
Precision: 0.9955
Recall   : 0.9954
F1 Score : 0.9954


## Evaluation Metrics – Random Forest (Churn Prediction)

| Metric     | Value  |
|------------|--------|
| Accuracy   | 0.9954 |
| Precision  | 0.9960 |
| Recall     | 0.9940 |
| F1 Score   | 0.9950 |


# Linear Model

In [78]:
from pyspark.ml.regression import LinearRegression

In [79]:
df_show = spark.read.csv('../Create_ratings/netflix_titles.csv', header=True, inferSchema=True, quote='"', escape='"')
df_show = df_show.select(['show_id',
                     'type',
                     'title',
                     'director',
                     'country',
                     'date_added',
                     'release_year',
                     'rating',
                     'duration',
                     'listed_in',
                     'description']).filter(df_show['type'] == 'Movie')

In [80]:
from pyspark.sql.functions import rand

In [81]:
df_show = df_show.withColumn('rate', rand() * 1)

In [82]:
df_show = df_show.dropna('any')
df_show.printSchema()

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)
 |-- rate: double (nullable = false)



In [83]:
df_show.columns

['show_id',
 'type',
 'title',
 'director',
 'country',
 'date_added',
 'release_year',
 'rating',
 'duration',
 'listed_in',
 'description',
 'rate']

In [84]:
df_train = df_show.select([
    'title',
 'director',
 'release_year',
 'duration',
 'rating',
 'rate'])

df_train.printSchema()

root
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rate: double (nullable = false)



In [85]:
indexer_director = StringIndexer(inputCol='director', outputCol='director_indexed')
indexer_rating = StringIndexer(inputCol="rating", outputCol="rating_indexed", handleInvalid="keep")

In [86]:
from pyspark.sql.functions import year, month, day, regexp_replace, when

In [87]:
def preprocess(df):
    df = df.withColumn("year", col("release_year").cast('int'))

    df = df.withColumn(
        "duration_num",
        regexp_replace(col("duration"), " min", "").cast("int")
    )

    return df
    

In [88]:
df_prepared = preprocess(df_train)

In [89]:
assembler_rf = VectorAssembler(inputCols=[ 'year', 'duration_num','rating_indexed', 'director_indexed'], outputCol='features')

In [98]:
lin_reg = LinearRegression(featuresCol='features', labelCol='rate', maxIter=150, regParam=20)

In [99]:
from pyspark.ml.pipeline import Pipeline

In [100]:
pipeline = Pipeline(stages=[indexer_director, indexer_rating, assembler_rf, lin_reg])

In [106]:
model = pipeline.fit(df_prepared)

In [108]:
predictions = model.transform(df_prepared)
predictions.select("title", "rate", "prediction").show(10, truncate=False)

+--------------------+--------------------+-------------------+
|title               |rate                |prediction         |
+--------------------+--------------------+-------------------+
|Dick Johnson Is Dead|0.058322911102679176|0.4982403671710855 |
|Sankofa             |0.6532677923170594  |0.49794850359588194|
|The Starling        |0.9694531196177445  |0.49821352435614297|
|Je Suis Karl        |0.4238528893905017  |0.4981699414748908 |
|Jeans               |0.6003057609903206  |0.4978976581005633 |
|Grown Ups           |0.9952457092022428  |0.498143409843807  |
|Dark Skies          |0.35172675562028033 |0.4981774360612422 |
|Paranoia            |0.8658451212854049  |0.49816129908150336|
|Birth of the Dragon |0.41232966044634056 |0.49820665464380065|
|Jaws                |0.7694839742121337  |0.4978221170266708 |
+--------------------+--------------------+-------------------+
only showing top 10 rows


In [109]:
from pyspark.ml.evaluation import RegressionEvaluator

## Nota sobre la calidad del modelo

Es importante recalcar que las métricas de evaluación (RMSE, MAE y R²) presentan valores bajos debido a que los datos utilizados fueron generados de manera completamente aleatoria. Esto significa que no existe una relación lógica o causal entre las variables predictoras y la variable objetivo (rate). En consecuencia, el modelo no puede encontrar patrones reales y sus predicciones no son representativas de un escenario práctico. El objetivo de este ejercicio es únicamente demostrar la aplicación de técnicas de Machine Learning con PySpark y no obtener un modelo predictivo útil en la realidad.

It is important to highlight that the evaluation metrics (RMSE, MAE, and R²) show poor values because the dataset was entirely randomly generated. This means that there is no logical or causal relationship between the predictor variables and the target variable (rate). As a result, the model cannot identify real patterns, and its predictions are not representative of a practical scenario. The purpose of this exercise is solely to demonstrate the application of Machine Learning techniques with PySpark, rather than to build a useful predictive model in reality.

In [110]:
evaluator_rmse = RegressionEvaluator(labelCol="rate", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="rate", predictionCol="prediction", metricName="r2")
evaluator_mae = RegressionEvaluator(labelCol="rate", predictionCol="prediction", metricName="mae")

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)

In [111]:
print("Métricas de Evaluación:")
print(f"RMSE: {rmse:.4f}")
print(f"MAE:  {mae:.4f}")
print(f"R²:   {r2:.4f}")

Métricas de Evaluación:
RMSE: 0.2875
MAE:  0.2479
R²:   0.0000


# Evaluation Metrics – Linear Regression

| Metric     | Value  |
|------------|--------|
| RMSE   | 0.2912 |
| r2  | 0.2530 |
| mae     | 0.0000 |



# Kmeans

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StandardScaler

In [None]:
df_users = spark.read.csv('../Create_ratings/netflix_users.csv', header=True, inferSchema=True)

In [None]:
df_users.printSchema()

In [None]:
df_users.columns

In [None]:
indexer_country = StringIndexer(inputCol='Country', outputCol='country_indexed')
indexer_sub = StringIndexer(inputCol='Subscription_Type', outputCol='sub_indexed')

In [None]:
assembler_k = VectorAssembler(inputCols=[
'country_indexed',
 'Watch_Time_Hours',
 ], outputCol='features')

In [None]:
scale = StandardScaler(inputCol='features', outputCol='features_scale')

In [None]:
val = [x for x in range(2,11)]

for i in val:
    k_mean = KMeans(k=i, featuresCol='features_scale', seed=1)
    pipeline_km = Pipeline(stages=[indexer_country, indexer_sub, assembler_k, scale, k_mean])
    model = pipeline_km.fit(df_users)
    predic = model.transform(df_users)
    eva_clu = ClusteringEvaluator(featuresCol='features_scale')

    print(f'prediction with k = {i}')
    print(eva_clu.evaluate(predic))
    

In [None]:
k_mean = KMeans(k=4, featuresCol='features_scale', seed=1)

In [None]:
pipeline_km = Pipeline(stages=[indexer_country, indexer_sub, assembler_k, scale, k_mean])

In [None]:
model_k = pipeline_km.fit(df_users)

In [None]:
prediction_k = model_k.transform(df_users)

In [None]:
prediction_k.select('Age', 'Watch_Time_Hours', 'prediction').show(5)

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
eva_clu = ClusteringEvaluator(featuresCol='features_scale')

In [None]:
print('eva of kmean 3')
eva_clu.evaluate(prediction_k)

In [None]:
from pyspark.sql.functions import avg

In [None]:
prediction_k.groupBy('prediction').agg(
    avg('Age'),
    avg('Watch_Time_Hours')
).show()

# Evaluation Groups – Kmeans


|prediction|          avg(Age)|avg(Watch_Time_Hours)|
|----------|------------------|---------------------|
|         1| 46.54018632559608|    745.0765687667777|
|         3| 46.70972471086496|   255.25532008470475|
|         2| 46.51924005796168|    748.6533633875353|
|         0|46.169225898369476|   249.52558018046565|