In [144]:
"""
Notebook PySpark para Análise de Clientes, RFM, Clusterização, Recomendação e Churn.
"""

'\nNotebook PySpark para Análise de Clientes, RFM, Clusterização, Recomendação e Churn.\n'

In [145]:
# Importar bibliotecas necessárias
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, count, isnan, when, trim, to_timestamp, datediff, current_date, unix_timestamp,col, udf, when
from pyspark.sql.types import StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.window import Window

In [146]:
# 1. Inicializar SparkSession
spark = SparkSession.builder \
    .appName("Marketin-predictivo") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

2. Carregar os datasets

In [147]:
# Caminhos para os arquivos (ajuste se necessário)
path_clientes = "/home/jovyan/Marketing-Predictivo/CSV/clientes_anonimizados.csv"
path_produtos = "/home/jovyan/Marketing-Predictivo/CSV/productos_con_atributos.csv"
path_ordens = "/home/jovyan/Marketing-Predictivo/CSV/historico_ordenes.csv"

In [148]:
import sys
sys.path.append("/home/jovyan/GitHub/Marketing-Predictivo/CSV")

In [149]:
df_clientes = spark.read.csv(path_clientes, header=True, inferSchema=True, sep=',')
df_clientes.printSchema()
df_clientes.show(5, truncate=False)

root
 |-- user_id: integer (nullable = true)
 |-- user_registered: string (nullable = true)
 |-- user_status: integer (nullable = true)
 |-- billing_city: string (nullable = true)
 |-- billing_country: string (nullable = true)
 |-- billing_state: string (nullable = true)

+-------+-------------------+-----------+------------+---------------+-------------+
|user_id|user_registered    |user_status|billing_city|billing_country|billing_state|
+-------+-------------------+-----------+------------+---------------+-------------+
|4      |2024-04-05 15:38:09|0          |Lima        |PE             |LMA          |
|8      |2024-04-12 16:09:39|0          |AREQUIPA    |PE             |ARE          |
|10     |2024-04-12 17:03:18|0          |LIMA        |PE             |LIM          |
|13     |2024-04-15 22:08:21|0          |Arequipa    |PE             |ARE          |
|14     |2024-04-15 22:16:09|0          |NULL        |NULL           |NULL         |
+-------+-------------------+-----------+------

In [150]:
df_produtos = spark.read.csv(path_produtos, header=True, inferSchema=False, sep=',', multiLine=True, escape='"')
df_produtos = df_produtos.withColumn("product_id", col("product_id").cast(IntegerType())) \
                       .withColumn("_price", col("_price").cast(FloatType())) \
                       .withColumn("_regular_price", col("_regular_price").cast(FloatType())) \
                       .withColumn("_sale_price", col("_sale_price").cast(FloatType())) \
                       .withColumn("total_sales", col("total_sales").cast(IntegerType())) \
                       .withColumn("_stock", col("_stock").cast(FloatType()))
df_produtos.printSchema()
df_produtos.show(10, truncate=True)

root
 |-- product_id: integer (nullable = true)
 |-- custom_partnumber: string (nullable = true)
 |-- post_title: string (nullable = true)
 |-- post_excerpt: string (nullable = true)
 |-- _sku: string (nullable = true)
 |-- _price: float (nullable = true)
 |-- _regular_price: float (nullable = true)
 |-- _sale_price: float (nullable = true)
 |-- total_sales: integer (nullable = true)
 |-- _backorders: string (nullable = true)
 |-- _stock: float (nullable = true)
 |-- _stock_status: string (nullable = true)
 |-- minimum_allowed_quantity: string (nullable = true)
 |-- custom_condition: string (nullable = true)
 |-- custom_eta: string (nullable = true)
 |-- custom_moq: string (nullable = true)
 |-- _product_attributes: string (nullable = true)
 |-- pa_1: string (nullable = true)
 |-- pa_almacenamiento: string (nullable = true)
 |-- pa_color: string (nullable = true)
 |-- pa_condicion: string (nullable = true)
 |-- pa_disco-duro: string (nullable = true)
 |-- pa_emmc: string (nullable = tr

In [151]:
df_produtos.show(10, truncate=True)

+----------+-----------------+--------------------+--------------------+-------------------+------+--------------+-----------+-----------+-----------+------+-------------+------------------------+----------------+----------+----------+--------------------+----+-----------------+--------+------------+-------------+-------+-----------+--------+--------------+-------------+-------------+------+--------------------+-------------------+----------+----------------+
|product_id|custom_partnumber|          post_title|        post_excerpt|               _sku|_price|_regular_price|_sale_price|total_sales|_backorders|_stock|_stock_status|minimum_allowed_quantity|custom_condition|custom_eta|custom_moq| _product_attributes|pa_1|pa_almacenamiento|pa_color|pa_condicion|pa_disco-duro|pa_emmc|pa_garantia|pa_marca|pa_memoria-ram|  pa_pantalla|pa_procesador|pa_ram|pa_sistema-operativo|pa_tarjeta-de-video|pa_teclado|pa_unidad-solida|
+----------+-----------------+--------------------+--------------------+

In [152]:
df_ordens = spark.read.csv(path_ordens, header=True, inferSchema=True, sep=',')
df_ordens.printSchema()
df_ordens.show(5, truncate=False)

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_device_type: string (nullable = true)
 |-- order_session_visited_pages: integer (nullable = true)
 |-- order_session_start_time: string (nullable = true)
 |-- order_traffic_source_type: string (nullable = true)
 |-- order_utm_source_campaign: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_qty: integer (nullable = true)
 |-- product_net_revenue: double (nullable = true)
 |-- tax_amount: double (nullable = true)
 |-- product_gross_revenue: double (nullable = true)
 |-- status: string (nullable = true)

+--------+-------------------+-----------------+---------------------------+------------------------+-------------------------+-------------------------+----------+-----------+-----------+-------------------+----------+---------------------+------------+
|order_id|order_date         |order_device_type|order_session_

3. Limpeza e Pré-processamento dos Dados

3.1 Dataset Clientes

In [153]:
# Verificar nulos
df_clientes.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_clientes.columns]).show()

+-------+---------------+-----------+------------+---------------+-------------+
|user_id|user_registered|user_status|billing_city|billing_country|billing_state|
+-------+---------------+-----------+------------+---------------+-------------+
|      0|              0|          0|          13|             13|           14|
+-------+---------------+-----------+------------+---------------+-------------+



In [154]:
# Remover duplicatas baseadas em user_id
print(f"Registros antes da remoção de duplicatas (clientes): {df_clientes.count()}")
df_clientes = df_clientes.dropDuplicates(['user_id'])
print(f"Registros após a remoção de duplicatas (clientes): {df_clientes.count()}")

Registros antes da remoção de duplicatas (clientes): 253
Registros após a remoção de duplicatas (clientes): 253


In [155]:
# Normalizar coluna de data 'user_registered'
df_clientes = df_clientes.withColumn("user_registered_ts", to_timestamp(col("user_registered"), "yyyy-MM-dd HH:mm:ss"))
# Calcular dias desde o registro (exemplo de normalização para ML)
df_clientes = df_clientes.withColumn("dias_desde_registro", datediff(current_date(), col("user_registered_ts")))
df_clientes.select("user_id", "user_registered", "user_registered_ts", "dias_desde_registro").show(5)

+-------+-------------------+-------------------+-------------------+
|user_id|    user_registered| user_registered_ts|dias_desde_registro|
+-------+-------------------+-------------------+-------------------+
|      4|2024-04-05 15:38:09|2024-04-05 15:38:09|                392|
|      8|2024-04-12 16:09:39|2024-04-12 16:09:39|                385|
|     10|2024-04-12 17:03:18|2024-04-12 17:03:18|                385|
|     13|2024-04-15 22:08:21|2024-04-15 22:08:21|                382|
|     14|2024-04-15 22:16:09|2024-04-15 22:16:09|                382|
+-------+-------------------+-------------------+-------------------+
only showing top 5 rows



3.2 Dataset Produtos

In [156]:
df_produtos.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_produtos.columns]).show()

+----------+-----------------+----------+------------+----+------+--------------+-----------+-----------+-----------+------+-------------+------------------------+----------------+----------+----------+-------------------+----+-----------------+--------+------------+-------------+-------+-----------+--------+--------------+-----------+-------------+------+--------------------+-------------------+----------+----------------+
|product_id|custom_partnumber|post_title|post_excerpt|_sku|_price|_regular_price|_sale_price|total_sales|_backorders|_stock|_stock_status|minimum_allowed_quantity|custom_condition|custom_eta|custom_moq|_product_attributes|pa_1|pa_almacenamiento|pa_color|pa_condicion|pa_disco-duro|pa_emmc|pa_garantia|pa_marca|pa_memoria-ram|pa_pantalla|pa_procesador|pa_ram|pa_sistema-operativo|pa_tarjeta-de-video|pa_teclado|pa_unidad-solida|
+----------+-----------------+----------+------------+----+------+--------------+-----------+-----------+-----------+------+-------------+------

In [157]:

df_produtos.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ["product_id", "_sku", "_price"]]).show()

+----------+----+------+
|product_id|_sku|_price|
+----------+----+------+
|         0|  58|    57|
+----------+----+------+



In [158]:
# Remover duplicatas baseadas em product_id
print(f"Registros antes da remoção de duplicatas (produtos): {df_produtos.count()}")
df_produtos = df_produtos.dropDuplicates(['product_id'])
print(f"Registros após a remoção de duplicatas (produtos): {df_produtos.count()}")

Registros antes da remoção de duplicatas (produtos): 5823
Registros após a remoção de duplicatas (produtos): 5823


In [159]:
# Limpar colunas de texto (ex: remover espaços extras)
if 'post_title' in df_produtos.columns:
    df_produtos = df_produtos.withColumn("post_title_cleaned", trim(col("post_title")))

3.3 Dataset Ordens

In [160]:
#Verificando Nulos
df_ordens.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_ordens.columns]).show()

+--------+----------+-----------------+---------------------------+------------------------+-------------------------+-------------------------+----------+-----------+-----------+-------------------+----------+---------------------+------+
|order_id|order_date|order_device_type|order_session_visited_pages|order_session_start_time|order_traffic_source_type|order_utm_source_campaign|product_id|customer_id|product_qty|product_net_revenue|tax_amount|product_gross_revenue|status|
+--------+----------+-----------------+---------------------------+------------------------+-------------------------+-------------------------+----------+-----------+-----------+-------------------+----------+---------------------+------+
|       0|         0|              402|                        402|                     402|                        0|                      402|         0|          0|          0|                  0|         0|                    0|     0|
+--------+----------+-----------------+-

In [161]:
# Remover duplicatas (considerar chave composta se necessário, ex: order_id, product_id)
print(f"Registros antes da remoção de duplicatas (ordens): {df_ordens.count()}")
df_ordens = df_ordens.dropDuplicates(['order_id', 'product_id'])
print(f"Registros após a remoção de duplicatas (ordens): {df_ordens.count()}")

Registros antes da remoção de duplicatas (ordens): 845
Registros após a remoção de duplicatas (ordens): 845


In [162]:
# Normalizar colunas de data 'order_date', 'order_session_start_time'
df_ordens = df_ordens.withColumn("order_date_ts", to_timestamp(col("order_date"), "yyyy-MM-dd HH:mm:ss"))
df_ordens = df_ordens.withColumn("session_start_ts", to_timestamp(col("order_session_start_time"), "yyyy-MM-dd HH:mm:ss"))

In [163]:
# Calcular recência da ordem em dias (será útil para RFM)
df_ordens = df_ordens.withColumn("recencia_ordem_dias", datediff(current_date(), col("order_date_ts")))

In [164]:
# Converter datas para Unix timestamp (outra forma de normalização para ML)
df_ordens = df_ordens.withColumn("order_date_unix", unix_timestamp(col("order_date_ts")))
df_ordens = df_ordens.withColumn("session_start_unix", unix_timestamp(col("session_start_ts")))

In [165]:
df_ordens.select("order_id", "order_date", "order_date_ts", "recencia_ordem_dias", "order_date_unix").show(5)

+--------+-------------------+-------------------+-------------------+---------------+
|order_id|         order_date|      order_date_ts|recencia_ordem_dias|order_date_unix|
+--------+-------------------+-------------------+-------------------+---------------+
|    1918|2024-04-16 23:14:42|2024-04-16 23:14:42|                381|     1713309282|
|    2092|2024-04-18 10:06:57|2024-04-18 10:06:57|                379|     1713434817|
|    2289|2024-04-18 17:45:10|2024-04-18 17:45:10|                379|     1713462310|
|    2319|2024-04-19 09:53:17|2024-04-19 09:53:17|                378|     1713520397|
|    2392|2024-04-19 15:12:21|2024-04-19 15:12:21|                378|     1713539541|
+--------+-------------------+-------------------+-------------------+---------------+
only showing top 5 rows



Próximos passos: Análise RFM, Clusterização, Recomendação, Churn

4. Análise RFM (Recência, Frequência, Monetário)

In [166]:
from pyspark.sql.functions import max, countDistinct, sum, min 

Data de referência para cálculo da recência (pode ser a data mais recente no dataset + 1 dia ou a data atual)
Usaremos a data atual que já foi usada para calcular 'recencia_ordem_dias'

In [167]:
# Calcular Recência (R): Mínimo de dias desde a última compra por cliente
rfm_r = df_ordens.groupBy("customer_id") \
                 .agg(min("recencia_ordem_dias").alias("Recencia"))

In [168]:
# Calcular Frequência (F): Número total de ordens distintas por cliente
rfm_f = df_ordens.groupBy("customer_id") \
                 .agg(countDistinct("order_id").alias("Frequencia"))

In [169]:
# Calcular Monetário (M): Soma total do valor bruto gasto por cliente
# Usando product_gross_revenue. Se preferir o valor líquido, use product_net_revenue
rfm_m = df_ordens.groupBy("customer_id") \
                 .agg(sum("product_gross_revenue").alias("Monetario"))

In [170]:
# Juntar R, F, M
rfm_table = rfm_r.join(rfm_f, "customer_id", "inner") \
                 .join(rfm_m, "customer_id", "inner")

In [171]:
print("Tabela RFM calculada:")
rfm_table.show(10)

Tabela RFM calculada:
+-----------+--------+----------+------------------+
|customer_id|Recencia|Frequencia|         Monetario|
+-----------+--------+----------+------------------+
|        737|     134|         2|           4715.28|
|        516|     154|         1|            1044.3|
|        580|     197|         2|            1770.0|
|        513|     198|        13| 50761.23999999999|
|        613|     210|         1|            1534.0|
|        406|     199|         4| 9355.039999999999|
|        587|     221|         1|            1451.4|
|         26|     212|        53|218783.79999999996|
|        501|     176|         5|          24134.54|
|        577|     140|         1|             831.9|
+-----------+--------+----------+------------------+
only showing top 10 rows



In [172]:
# Atribuir Scores RFM (Exemplo: usando quantis)
# Dividir cada métrica em N quantis (e.g., 5 quantis para scores de 1 a 5)
from pyspark.ml.feature import QuantileDiscretizer

Scores: 1 (pior) a 5 (melhor). Recência: menor = melhor. Frequência/Monetário: maior = melhor.

In [173]:
# Recência Score (menor é melhor, então inverte os labels)
quantile_discretizer_r = QuantileDiscretizer(numBuckets=5, inputCol="Recencia", outputCol="R_Score_temp", relativeError=0.01)
rfm_table = quantile_discretizer_r.fit(rfm_table).transform(rfm_table)
rfm_table = rfm_table.withColumn("R_Score", (5 - col("R_Score_temp")).cast(IntegerType())) # Inverte para 5 ser o melhor

In [174]:
# Frequência Score (maior é melhor)
quantile_discretizer_f = QuantileDiscretizer(numBuckets=5, inputCol="Frequencia", outputCol="F_Score_temp", relativeError=0.01)
rfm_table = quantile_discretizer_f.fit(rfm_table).transform(rfm_table)
rfm_table = rfm_table.withColumn("F_Score", (col("F_Score_temp") + 1).cast(IntegerType())) # Ajusta para 1-5

In [175]:
# Monetário Score (maior é melhor)
quantile_discretizer_m = QuantileDiscretizer(numBuckets=5, inputCol="Monetario", outputCol="M_Score_temp", relativeError=0.01)
rfm_table = quantile_discretizer_m.fit(rfm_table).transform(rfm_table)
rfm_table = rfm_table.withColumn("M_Score", (col("M_Score_temp") + 1).cast(IntegerType())) # Ajusta para 1-5

In [176]:
# Combinar Scores RFM (ex: concatenar como string ou somar)
from pyspark.sql.functions import concat_ws

In [177]:
rfm_final = rfm_table.withColumn("RFM_Score", concat_ws("", col("R_Score"), col("F_Score"), col("M_Score")))
rfm_final = rfm_final.select("customer_id", "Recencia", "Frequencia", "Monetario", "R_Score", "F_Score", "M_Score", "RFM_Score")

In [178]:
print("Tabela RFM final com Scores:")
rfm_final.show(10)

Tabela RFM final com Scores:
+-----------+--------+----------+------------------+-------+-------+-------+---------+
|customer_id|Recencia|Frequencia|         Monetario|R_Score|F_Score|M_Score|RFM_Score|
+-----------+--------+----------+------------------+-------+-------+-------+---------+
|        737|     134|         2|           4715.28|      5|      2|      3|      523|
|        516|     154|         1|            1044.3|      4|      2|      2|      422|
|        580|     197|         2|            1770.0|      3|      2|      2|      322|
|        513|     198|        13| 50761.23999999999|      3|      4|      5|      345|
|        613|     210|         1|            1534.0|      3|      2|      2|      322|
|        406|     199|         4| 9355.039999999999|      3|      3|      4|      334|
|        587|     221|         1|            1451.4|      2|      2|      2|      222|
|         26|     212|        53|218783.79999999996|      3|      4|      5|      345|
|        501| 

(Opcional) Salvar a tabela RFM
rfm_final.write.parquet("/path/to/save/rfm_final.parquet")

5. Clusterização de Clientes com K-Means (baseado em RFM)

In [179]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [180]:
# Selecionar as colunas RFM para clusterização
rfm_features = rfm_final.select("customer_id", "Recencia", "Frequencia", "Monetario")

In [181]:
# Montar o vetor de features
vec_assembler = VectorAssembler(inputCols=["Recencia", "Frequencia", "Monetario"], outputCol="rfm_features_raw")
rfm_vector = vec_assembler.transform(rfm_features)

In [182]:
# Escalar as features (K-Means é sensível à escala)
scaler = StandardScaler(inputCol="rfm_features_raw", outputCol="features", withStd=True, withMean=False)
scaler_model = scaler.fit(rfm_vector)
rfm_scaled = scaler_model.transform(rfm_vector)

In [183]:
print("Features RFM montadas e escaladas:")
rfm_scaled.select("customer_id", "features").show(5, truncate=False)

Features RFM montadas e escaladas:
+-----------+------------------------------------------------------------+
|customer_id|features                                                    |
+-----------+------------------------------------------------------------+
|737        |[2.1328196787611717,0.09615619884527073,0.06954858691027146]|
|516        |[2.45115097409866,0.04807809942263536,0.01540302788177934]  |
|580        |[3.1355632590742597,0.09615619884527073,0.02610682691827007]|
|513        |[3.151479823841134,0.6250152924942597,0.7487089869134278]   |
|613        |[3.342478601043627,0.04807809942263536,0.022625916662500727]|
+-----------+------------------------------------------------------------+
only showing top 5 rows



In [184]:
# Treinar o modelo K-Means
# Definir o número de clusters (k). Pode ser otimizado (ex: método do cotovelo ou silhouette)
# Vamos começar com k=5 como exemplo
k = 5
kmeans = KMeans(featuresCol="features", k=k, seed=1)
model = kmeans.fit(rfm_scaled)

In [185]:
# Fazer previsões (atribuir clusters aos clientes)
predictions = model.transform(rfm_scaled)

In [186]:
print(f"Clientes atribuídos a {k} clusters:")
predictions.select("customer_id", "prediction").show(10)

Clientes atribuídos a 5 clusters:
+-----------+----------+
|customer_id|prediction|
+-----------+----------+
|        737|         0|
|        516|         0|
|        580|         0|
|        513|         0|
|        613|         4|
|        406|         0|
|        587|         4|
|         26|         3|
|        501|         0|
|        577|         0|
+-----------+----------+
only showing top 10 rows



In [187]:
# Avaliar a clusterização (Silhouette Score)
# Nota: Calcular Silhouette pode ser computacionalmente intensivo em datasets grandes
evaluator = ClusteringEvaluator(featuresCol='features', metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score com k={k}: {silhouette}")

Silhouette Score com k=5: 0.5710270590536596


In [188]:
# (Opcional) Explorar os centróides dos clusters
print("Centróides dos Clusters (em escala original aproximada - requer desescalonamento):")
centers = model.clusterCenters()
# Desescalonar os centros (aproximado, pois não armazenamos a média se withMean=True fosse usado)
# scaler_model.std é o desvio padrão usado
# original_center = scaled_center * std_dev
# print(scaler_model.std)
# for i, center in enumerate(centers):
#     # Precisa aplicar a transformação inversa do scaler
#     # Isso é mais complexo, pois StandardScaler não tem um 'inverse_transform' direto em PySpark ML
#     # Vamos mostrar os centros escalados por enquanto
#     print(f"Cluster {i}: {center}")

Centróides dos Clusters (em escala original aproximada - requer desescalonamento):


In [189]:
# Juntar as previsões de cluster com a tabela RFM final
clientes_clusterizados = rfm_final.join(predictions.select("customer_id", col("prediction").alias("cluster")), "customer_id", "inner")

In [190]:
print("Tabela final com RFM Scores e Cluster:")
clientes_clusterizados.show(10)

Tabela final com RFM Scores e Cluster:
+-----------+--------+----------+------------------+-------+-------+-------+---------+-------+
|customer_id|Recencia|Frequencia|         Monetario|R_Score|F_Score|M_Score|RFM_Score|cluster|
+-----------+--------+----------+------------------+-------+-------+-------+---------+-------+
|         26|     212|        53|218783.79999999996|      3|      4|      5|      345|      3|
|         29|     151|        26| 72460.25999999998|      4|      4|      5|      445|      3|
|         30|     189|        19|43949.100000000006|      3|      4|      5|      345|      0|
|         43|     146|        27|          65978.52|      4|      4|      5|      445|      3|
|         60|     197|        30|          79364.44|      3|      4|      5|      345|      3|
|         69|     358|         1|            1439.6|      1|      2|      2|      122|      2|
|         72|     379|         1|           1683.86|      1|      2|      2|      122|      2|
|         7

In [193]:
from pyspark.sql.functions import avg

In [194]:
# Analisar as características de cada cluster
print("\nAnálise das características médias por cluster:")
clientes_clusterizados.groupBy("cluster") \
    .agg(avg("Recencia").alias("Recencia_Media"), \
         avg("Frequencia").alias("Frequencia_Media"), \
         avg("Monetario").alias("Monetario_Medio"), \
         countDistinct("customer_id").alias("Num_Clientes")) \
    .orderBy("cluster") \
    .show()


Análise das características médias por cluster:
+-------+------------------+------------------+------------------+------------+
|cluster|    Recencia_Media|  Frequencia_Media|   Monetario_Medio|Num_Clientes|
+-------+------------------+------------------+------------------+------------+
|      0|162.75757575757575| 6.575757575757576| 17486.52727272728|          33|
|      1|             130.0|             172.0| 528263.5800000001|           1|
|      2| 316.2307692307692|1.0769230769230769|1704.3738461538462|          13|
|      3|171.55555555555554|28.444444444444443|106955.37310666667|           9|
|      4|237.41666666666666|2.4166666666666665|7017.2633333333315|          24|
+-------+------------------+------------------+------------------+------------+



(Opcional) Salvar os resultados da clusterização
clientes_clusterizados.write.parquet("/path/to/save/clientes_clusterizados.parquet")

6. Modelo de Recomendação de Produtos (ALS - Collaborative Filtering)

In [195]:
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import expr

Preparar dados para ALS: user_id, product_id, rating
Usaremos 'customer_id' como user, 'product_id' como item.
Como não temos ratings explícitos, podemos usar a frequência de compra (product_qty) ou simplesmente 1 para indicar interação.
Usar 1 é mais simples e comum para dados implícitos.

In [196]:
# Certificar que os IDs são inteiros e não nulos
als_data = df_ordens.select(
    col("customer_id").cast(IntegerType()),
    col("product_id").cast(IntegerType()),
    col("product_qty").cast(FloatType()) # Usando quantidade como 'rating' implícito
).na.drop() # Remover linhas onde customer_id ou product_id são nulos após cast

In [197]:
# Renomear colunas para o padrão do ALS
als_data = als_data.withColumnRenamed("customer_id", "userCol") \
                   .withColumnRenamed("product_id", "itemCol") \
                   .withColumnRenamed("product_qty", "ratingCol")

In [60]:
# Dividir dados em treino e teste (opcional, mas bom para avaliação)
(training, test) = als_data.randomSplit([0.8, 0.2], seed=1234)

In [61]:
# Construir e treinar o modelo ALS
# Parâmetros podem ser ajustados/otimizados via validação cruzada
als = ALS(maxIter=10, regParam=0.1, userCol="userCol", itemCol="itemCol", ratingCol="ratingCol",
          coldStartStrategy="drop", # Descarta usuários/itens frios nas predições
          implicitPrefs=True, # Indicar que os 'ratings' são implícitos (frequência de compra)
          alpha=1.0) # Parâmetro alpha para implicitPrefs

In [62]:
als_model = als.fit(training)

In [None]:
# Gerar Top 5 recomendações para cada usuário
user_recs = als_model.recommendForAllUsers(5) # Top 5 itens por usuário


Gerando Top 5 recomendações por usuário...


In [64]:
print("Top 5 Recomendações por Usuário (IDs de Produto):")
user_recs.show(10, truncate=False)

Top 5 Recomendações por Usuário (IDs de Produto):
+-------+------------------------------------------------------------------------------------------------------------------+
|userCol|recommendations                                                                                                   |
+-------+------------------------------------------------------------------------------------------------------------------+
|580    |[{5674, 0.7960533}, {19808, 0.69510657}, {20412, 0.66070056}, {14536, 0.595119}, {17766, 0.53452855}]             |
|470    |[{5674, 0.9324528}, {19808, 0.9109026}, {20412, 0.86321026}, {14536, 0.66503054}, {17766, 0.6208252}]             |
|430    |[{2678, 1.2318783}, {13610, 1.1846702}, {16620, 1.1562933}, {13609, 1.1465605}, {5435, 1.1372563}]                |
|450    |[{12936, 0.5012792}, {13437, 0.34588683}, {12913, 0.34109274}, {12926, 0.28449896}, {1134, 0.23210894}]           |
|80     |[{6685, 1.0058322}, {11668, 1.0002928}, {5095, 0.9977284}, {20120,

In [65]:
# Formatar as recomendações para melhor visualização (ex: explodir a lista)
user_recs_formatted = user_recs.withColumn("rec_exp", expr("explode(recommendations)")) \
                               .select("userCol", col("rec_exp.itemCol").alias("product_id"), col("rec_exp.rating").alias("predicted_rating"))

In [66]:
print("Recomendações formatadas (Usuário, Produto ID, Rating Predito):")
user_recs_formatted.show(20)

Recomendações formatadas (Usuário, Produto ID, Rating Predito):
+-------+----------+----------------+
|userCol|product_id|predicted_rating|
+-------+----------+----------------+
|    580|      5674|       0.7960533|
|    580|     19808|      0.69510657|
|    580|     20412|      0.66070056|
|    580|     14536|        0.595119|
|    580|     17766|      0.53452855|
|    470|      5674|       0.9324528|
|    470|     19808|       0.9109026|
|    470|     20412|      0.86321026|
|    470|     14536|      0.66503054|
|    470|     17766|       0.6208252|
|    430|      2678|       1.2318783|
|    430|     13610|       1.1846702|
|    430|     16620|       1.1562933|
|    430|     13609|       1.1465605|
|    430|      5435|       1.1372563|
|    450|     12936|       0.5012792|
|    450|     13437|      0.34588683|
|    450|     12913|      0.34109274|
|    450|     12926|      0.28449896|
|    450|      1134|      0.23210894|
+-------+----------+----------------+
only showing top 20 rows

In [67]:
# Juntar com nomes dos produtos para tornar as recomendações mais legíveis
# Selecionar apenas id e nome do produto para o join
df_produtos_nomes = df_produtos.select(col("product_id").cast(IntegerType()), "post_title")

In [73]:
user_recs_final = user_recs_formatted.join(df_produtos_nomes, user_recs_formatted.product_id == df_produtos_nomes.product_id, "left") \
                                  .select(
    col("userCol"),
    df_produtos_nomes.product_id.alias("product_id"),
    col("post_title"),
    col("predicted_rating")
)

In [75]:
# Agrupar para mostrar top 5 por usuário de forma mais clara
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, collect_list

In [76]:
window_spec = Window.partitionBy("userCol").orderBy(col("predicted_rating").desc())

In [78]:
from pyspark.sql.functions import struct

In [79]:
user_recs_grouped = user_recs_final \
    .withColumn("rank", rank().over(window_spec)) \
    .where(col("rank") <= 5) \
    .groupBy("userCol") \
    .agg(collect_list(struct("product_id", "post_title", "predicted_rating")).alias("top_5_recomendacoes"))

In [80]:
user_recs_grouped.show(10, truncate=False)

+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userCol|top_5_recomendacoes                                                                                                                                                                                                                                                         |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26     |[{4429, ASUS ROG ALLY 7 GAMING CONSOLE, 1.1409168}, {1136, DELL G15 G5535-A643GRY-PUS, 1.1205758}, {4376, HP ENVY X360 15 EW1073CL, 0.9990194}, {10212, AP

Avaliação do Modelo ALS (RMSE)

In [96]:
# Importar bibliotecas necessárias
from pyspark.ml.evaluation import RegressionEvaluator

In [97]:
predictions_test = als_model.transform(test)
predictions_test.show(5)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="ratingCol", predictionCol="prediction")

rmse = evaluator.evaluate(predictions_test.na.drop(subset=["prediction"]))

print(f"\nRoot-Mean-Square Error (RMSE) no conjunto de teste = {rmse}")

+-------+-------+---------+------------+
|userCol|itemCol|ratingCol|  prediction|
+-------+-------+---------+------------+
|     26|   1145|      1.0| 0.063379936|
|     26|   5178|      1.0|  0.10409063|
|     26|   5428|      5.0|-0.073462605|
|     26|   5435|      3.0|  0.93182766|
|     26|  11832|      1.0|  0.10006907|
+-------+-------+---------+------------+
only showing top 5 rows


Root-Mean-Square Error (RMSE) no conjunto de teste = 75.39323965352727


In [102]:
evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="ratingCol", predictionCol="prediction")
mae = evaluator_mae.evaluate(predictions_test)
print(f"Mean Absolute Error (MAE):    {mae:.4f}")

Mean Absolute Error (MAE):    14.5559


#### CHURN

7. Modelo de Previsão de Churn

In [81]:
from pyspark.sql.functions import datediff, current_date, lit, avg as spark_avg, count as spark_count, max as spark_max, struct
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [82]:
# 7.1 Definição de Churn e Criação de Labels
# Definir churn: Cliente que não fez compra nos últimos X dias (ex: 90 dias)
# Este valor pode ser ajustado com base no conhecimento do negócio
churn_threshold_days = 90

In [83]:
# Usar a tabela RFM que já tem a Recência (dias desde a última compra)
# Se Recencia > churn_threshold_days, então churn = 1, senão 0
# Certificar que rfm_final está disponível
if 'rfm_final' not in locals():
    print("Erro: Tabela rfm_final não encontrada. Execute a etapa RFM primeiro.")
    # Adicionar código para recarregar ou recalcular rfm_final se necessário
    # Por exemplo:
    # rfm_r = df_ordens.groupBy("customer_id").agg(spark_min("recencia_ordem_dias").alias("Recencia"))
    # rfm_f = df_ordens.groupBy("customer_id").agg(countDistinct("order_id").alias("Frequencia"))
    # rfm_m = df_ordens.groupBy("customer_id").agg(spark_sum("product_gross_revenue").alias("Monetario"))
    # rfm_final = rfm_r.join(rfm_f, "customer_id", "inner").join(rfm_m, "customer_id", "inner")

In [84]:
churn_labels = rfm_final.withColumn("churn_label",
                                    when(col("Recencia") > churn_threshold_days, 1).otherwise(0))

In [85]:
print(f"Definindo churn como inatividade > {churn_threshold_days} dias.")
churn_labels.groupBy("churn_label").count().show()

Definindo churn como inatividade > 90 dias.
+-----------+-----+
|churn_label|count|
+-----------+-----+
|          1|   80|
+-----------+-----+



In [86]:
# 7.2 Feature Engineering Adicional
# Juntar com df_clientes para obter 'dias_desde_registro'
# Certificar que df_clientes está disponível e processado
if 'df_clientes' not in locals() or 'dias_desde_registro' not in df_clientes.columns:
     print("Erro: df_clientes ou coluna 'dias_desde_registro' não encontrada.")
     # Recarregar/reprocessar df_clientes se necessário
     # df_clientes = spark.read.csv(path_clientes, header=True, inferSchema=True, sep=',')
     # df_clientes = df_clientes.dropDuplicates(['user_id'])
     # df_clientes = df_clientes.withColumn("user_registered_ts", to_timestamp(col("user_registered"), "yyyy-MM-dd HH:mm:ss"))
     # df_clientes = df_clientes.withColumn("dias_desde_registro", datediff(current_date(), col("user_registered_ts")))

In [87]:
churn_data = churn_labels.join(df_clientes.select(col("user_id").alias("customer_id_ref"), "dias_desde_registro"),
                               churn_labels.customer_id == col("customer_id_ref"), "left") \
                         .select("customer_id", "Recencia", "Frequencia", "Monetario", "dias_desde_registro", "churn_label") \
                         .na.fill(0, subset=["dias_desde_registro"]) # Preencher nulos em dias_desde_registro (ex: se cliente não encontrado)

In [88]:
print("Dados preparados para modelo de churn:")
churn_data.show(5)

Dados preparados para modelo de churn:
+-----------+--------+----------+-----------------+-------------------+-----------+
|customer_id|Recencia|Frequencia|        Monetario|dias_desde_registro|churn_label|
+-----------+--------+----------+-----------------+-------------------+-----------+
|        737|     134|         2|          4715.28|                  0|          1|
|        516|     154|         1|           1044.3|                  0|          1|
|        580|     197|         2|           1770.0|                  0|          1|
|        513|     198|        13|50761.23999999999|                  0|          1|
|        613|     210|         1|           1534.0|                  0|          1|
+-----------+--------+----------+-----------------+-------------------+-----------+
only showing top 5 rows



In [89]:
# 7.3 Preparação para Modelagem (VectorAssembler, Scaler, Split)
# Colunas de features
feature_cols = ["Recencia", "Frequencia", "Monetario", "dias_desde_registro"]

In [90]:
# Montar vetor de features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw", handleInvalid="skip") # Skip rows with nulls

In [91]:
# Escalar features numéricas (importante para regressão logística)
scaler_churn = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)

In [92]:
# Criar pipeline para pré-processamento
preprocess_pipeline = Pipeline(stages=[assembler, scaler_churn])
preprocess_model = preprocess_pipeline.fit(churn_data)
final_churn_data = preprocess_model.transform(churn_data)

In [93]:
# Selecionar colunas finais e renomear label
final_churn_data = final_churn_data.select(col("customer_id"), col("features"), col("churn_label").alias("label"))

In [94]:
print("Dados finais com features escaladas e label:")
final_churn_data.show(5, truncate=False)

Dados finais com features escaladas e label:
+-----------+------------------------------------------------------------------------------------+-----+
|customer_id|features                                                                            |label|
+-----------+------------------------------------------------------------------------------------+-----+
|737        |[-1.2204026035000965,-0.33474376723009847,-0.34684949808465804,-0.6423398862187408] |1    |
|516        |[-0.9020713081626082,-0.3828218666527338,-0.40099505711315014,-0.6423398862187408]  |1    |
|580        |[-0.21765902318700828,-0.33474376723009847,-0.39029125807665943,-0.6423398862187408]|1    |
|513        |[-0.20174245842013386,0.1941153264188905,0.3323109019184983,-0.6423398862187408]    |1    |
|613        |[-0.010743681217640863,-0.3828218666527338,-0.39377216833242873,-0.6423398862187408]|1    |
+-----------+------------------------------------------------------------------------------------+-----+
only showi

In [95]:
# Verificar se há dados suficientes após pré-processamento
if final_churn_data.count() == 0:
    print("Erro: Nenhum dado restante após pré-processamento para o modelo de churn.")
else:
    # Dividir em treino e teste
    (train_data, test_data) = final_churn_data.randomSplit([0.8, 0.2], seed=42)
    print(f"Registros de treino: {train_data.count()}, Registros de teste: {test_data.count()}")

    # 7.4 Treinamento do Modelo (Ex: Regressão Logística)
    if train_data.count() > 0:
        lr = LogisticRegression(featuresCol="features", labelCol="label")
        lr_model = lr.fit(train_data)
        print("Modelo de Regressão Logística treinado.")

        # 7.5 Avaliação do Modelo
        if test_data.count() > 0:
            predictions_lr = lr_model.transform(test_data)
            print("Predições no conjunto de teste:")
            predictions_lr.select("customer_id", "label", "probability", "prediction").show(10, truncate=False)

            # Avaliar usando AUC
            evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
            # Lidar com possíveis NaNs na coluna de predição se houver
            auc = evaluator_auc.evaluate(predictions_lr.na.drop(subset=["rawPrediction"]))
            print(f"Área sob a curva ROC (AUC) no conjunto de teste: {auc}")

            # Avaliar usando outras métricas (Accuracy, Precision, Recall, F1)
            evaluator_multi = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
            accuracy = evaluator_multi.evaluate(predictions_lr, {evaluator_multi.metricName: "accuracy"})
            precision = evaluator_multi.evaluate(predictions_lr, {evaluator_multi.metricName: "weightedPrecision"})
            recall = evaluator_multi.evaluate(predictions_lr, {evaluator_multi.metricName: "weightedRecall"})
            f1 = evaluator_multi.evaluate(predictions_lr, {evaluator_multi.metricName: "f1"})

            print(f"Accuracy: {accuracy}")
            print(f"Precision: {precision}")
            print(f"Recall: {recall}")
            print(f"F1 Score: {f1}")
        else:
            print("Não há dados de teste suficientes para avaliação.")
    else:
        print("Não há dados de treino suficientes para treinar o modelo.")

Registros de treino: 63, Registros de teste: 17
Modelo de Regressão Logística treinado.
Predições no conjunto de teste:
+-----------+-----+-----------+----------+
|customer_id|label|probability|prediction|
+-----------+-----+-----------+----------+
|30         |1    |[0.0,1.0]  |1.0       |
|72         |1    |[0.0,1.0]  |1.0       |
|80         |1    |[0.0,1.0]  |1.0       |
|172        |1    |[0.0,1.0]  |1.0       |
|244        |1    |[0.0,1.0]  |1.0       |
|260        |1    |[0.0,1.0]  |1.0       |
|279        |1    |[0.0,1.0]  |1.0       |
|327        |1    |[0.0,1.0]  |1.0       |
|432        |1    |[0.0,1.0]  |1.0       |
|435        |1    |[0.0,1.0]  |1.0       |
+-----------+-----+-----------+----------+
only showing top 10 rows

Área sob a curva ROC (AUC) no conjunto de teste: 1.0
Accuracy: 1.0
Precision: 1.0
Recall: 1.0
F1 Score: 1.0


In [108]:
# 1. Avaliador AUC e AUC-PR
evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
evaluator_pr = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderPR")

# Lidar com possíveis NaNs na coluna de predição se houver (ex: coldStartStrategy)
valid_predictions = predictions_lr.na.drop(subset=["rawPrediction", "prediction", "label"])

if valid_predictions.count() == 0:
    print("Não há predições válidas para avaliação após remover NaNs.")
else: 
    print("Há predições válidas para avaliação após remover NaNs.")

Há predições válidas para avaliação após remover NaNs.


In [109]:
auc_score = evaluator_auc.evaluate(valid_predictions)
pr_auc_score = evaluator_pr.evaluate(valid_predictions)
print(f"Área sob a Curva ROC (AUC): {auc_score:.4f}")
print(f"Área sob a Curva Precision-Recall (AUC-PR): {pr_auc_score:.4f}")



Área sob a Curva ROC (AUC): 1.0000
Área sob a Curva Precision-Recall (AUC-PR): 1.0000


In [110]:
# 2. Avaliador Multiclasse para Accuracy, Precision, Recall, F1
evaluator_multi = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

accuracy = evaluator_multi.evaluate(valid_predictions, {evaluator_multi.metricName: "accuracy"})
precision_w = evaluator_multi.evaluate(valid_predictions, {evaluator_multi.metricName: "weightedPrecision"})
recall_w = evaluator_multi.evaluate(valid_predictions, {evaluator_multi.metricName: "weightedRecall"})
f1_w = evaluator_multi.evaluate(valid_predictions, {evaluator_multi.metricName: "f1"})

print(f"\nAccuracy: {accuracy:.4f}")
print(f"Precision Ponderada: {precision_w:.4f}")
print(f"Recall Ponderado: {recall_w:.4f}")
print(f"F1-Score Ponderado: {f1_w:.4f}")




Accuracy: 1.0000
Precision Ponderada: 1.0000
Recall Ponderado: 1.0000
F1-Score Ponderado: 1.0000


In [115]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Verifique as labels presentes
present_labels = valid_predictions.select("label").distinct().rdd.flatMap(lambda x: x).collect()

evaluator_multi = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

for label in present_labels:
    precision = evaluator_multi.evaluate(valid_predictions, {
        evaluator_multi.metricName: "precisionByLabel",
        evaluator_multi.metricLabel: label
    })
    recall = evaluator_multi.evaluate(valid_predictions, {
        evaluator_multi.metricName: "recallByLabel",
        evaluator_multi.metricLabel: label
    })
    f1 = evaluator_multi.evaluate(valid_predictions, {
        evaluator_multi.metricName: "f1",
        evaluator_multi.metricLabel: label
    })

    print(f"Label {label}: Precision = {precision:.4f}, Recall = {recall:.4f}, F1 = {f1:.4f}")

# 3. Métricas por Classe
print("\n-- Métricas por Classe --")
labels = [0.0, 1.0]
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# 🔍 Verifique quais classes realmente estão presentes
labels_presentes = valid_predictions.select("label").distinct().rdd.flatMap(lambda x: x).collect()

print("\n-- Métricas por Classe (presentes nos dados) --")
for label in sorted(labels_presentes):
    evaluator.setMetricLabel(label)

    evaluator.setMetricName("precisionByLabel")
    precision = evaluator.evaluate(valid_predictions)

    evaluator.setMetricName("recallByLabel")
    recall = evaluator.evaluate(valid_predictions)

    evaluator.setMetricName("f1")
    f1 = evaluator.evaluate(valid_predictions)

    print(f"Label {label}: Precision = {precision:.4f}, Recall = {recall:.4f}, F1 = {f1:.4f}")

    


Label 1: Precision = 1.0000, Recall = 1.0000, F1 = 1.0000

-- Métricas por Classe --

-- Métricas por Classe (presentes nos dados) --
Label 1: Precision = 1.0000, Recall = 1.0000, F1 = 1.0000


In [116]:
# 4. Matriz de Confusão (Cálculo com PySpark)
print("\n-- Matriz de Confusão --")
# Calcula TP, TN, FP, FN diretamente
tp = valid_predictions.filter("label = 1.0 AND prediction = 1.0").count()
tn = valid_predictions.filter("label = 0.0 AND prediction = 0.0").count()
fp = valid_predictions.filter("label = 0.0 AND prediction = 1.0").count()
fn = valid_predictions.filter("label = 1.0 AND prediction = 0.0").count()

print(f"                 Predito 0 | Predito 1")
print(f"Real 0 (Não Churn):  {tn:^8} | {fp:^8}")
print(f"Real 1 (Churn):      {fn:^8} | {tp:^8}")


-- Matriz de Confusão --
                 Predito 0 | Predito 1
Real 0 (Não Churn):     0     |    0    
Real 1 (Churn):         0     |    17   


Próximo passo: Finalizar e entregar o notebook

In [None]:
# Finalizar SparkSession
spark.stop()
print("SparkSession finalizada.")