> **Apenas configurações iniciais para configuirar o ambiente**

In [5]:
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

import findspark
findspark.init()


0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Connecting to security.ubuntu.com (185.125.190.81)] [Connected to cloud.r-p                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http://archive.ubuntu.com/ubuntu jammy-updates/

In [6]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Colab com PySpark") \
    .getOrCreate()


In [141]:
# Criando pasta para os arquivos
!mkdir -p /ifood-case/data/raw/

!mkdir -p /ifood-case/data/processed/

# Análise Exploratória (EDA)

In [147]:
!ls -l ifood-case/data/processed/


total 4
drwxr-xr-x 2 root root 4096 May  7 16:51 df_final_analytics.parquet


In [150]:
df_final_analytics = spark.read.parquet("ifood-case/data/processed/df_final_analytics.parquet")
df_final_analytics.show()


+--------------------+--------------------+------------------+---------------------+------------------+------------------+-----------------+------------------+----------------------+----------------------+--------------+------------------+--------------+--------------------+-----------+-------------+--------------+------------------+----+-----------------+-----------------------+
|          account_id|offer_received_count|offer_viewed_count|offer_completed_count|         view_rate|   completion_rate|transaction_count|total_amount_spent|avg_transaction_amount|std_transaction_amount|completed_bogo|completed_discount|completed_info|preferred_offer_type|channel_web|channel_email|channel_mobile| avg_channel_count| age|credit_card_limit|days_since_registration|
+--------------------+--------------------+------------------+---------------------+------------------+------------------+-----------------+------------------+----------------------+----------------------+--------------+--------------

---

# Montando os modelos

**Objetivo dos modelos:**
- Entender diferentes níveis de engajamento com as ofertas — para então aplicar estratégias diferentes para cada grupo.

1. Modelos multiclasses por estágio de engajamento
Criar um modelo com três classes. Para entender em que estágio o cliente trava

| Classe | Comportamento                     |
| ------ | --------------------------------- |
| `0`    | Recebe a oferta, **mas não vê**   |
| `1`    | Vê a oferta, **mas não completa** |
| `2`    | Vê e **completa a oferta**        |

Estratégia:

- Classe 0: revisar canal e momento do envio
- Classe 1: revisar conteúdo/valor da oferta
- Classe 2: manter estratégia parecida

# 1. Criar coluna de engajamento por estágio do funil

> Nível de engajamento (0 = não viu, 1 = viu mas não completou, 2 = completou)

In [None]:
df_final_analytics.filter(df_final_analytics.nome_da_coluna == "categoria_desejada").show()


> A maioria dos clientes está engajada (nível 2), o que é um bom indicativo.

> Há uma quantidade expressiva de clientes no nível 1 (quase 25%), que são ótimos alvos para estratégias de recuperação e reengajamento.

> O nível 0 representa um grupo muito pequeno (menos de 1%) e pode ser tratado com estratégias de primeiro engajamento, mas talvez não seja um foco prioritário de otimização de curto prazo (embora interessante para análise qualitativa ou modelos futuros de churn/ativação).

In [151]:
from pyspark.sql.functions import when, col

df_final_analytics = df_final_analytics.withColumn(
    "engagement_level",
    when(col("offer_viewed_count") == 0, 0)
    .when((col("offer_viewed_count") > 0) & (col("offer_completed_count") == 0), 1)
    .otherwise(2)
)

df_final_analytics.groupBy("engagement_level").count().orderBy("engagement_level").show()


+----------------+-----+
|engagement_level|count|
+----------------+-----+
|               0|  160|
|               1| 4133|
|               2|12707|
+----------------+-----+



## 1.2. Imputação com estratégia guiada por negócio nos valores nulos

> Como tratar os clientes expostos às campanhas, mas não engajados (consequentemente valores nulos)

| Coluna                                                   | Estratégia recomendada                              | Justificativa                                                          |
| -------------------------------------------------------- | --------------------------------------------------- | ---------------------------------------------------------------------- |
| `transaction_count`, `total_amount_spent`, `completed_*` | Preencher com `0`                                   | Indica ausência de comportamento, o que é informativo                  |
| `avg_transaction_amount`, `std_transaction_amount`       | Preencher com `0` ou `null`                         | Valor só faz sentido se houve transação — `0` indica "não gastou nada" |
| `preferred_offer_type`                                   | Preencher com `"none"`                              | Cliente ainda sem padrão de preferência                                |
| `age`, `credit_card_limit`, `days_since_registration`    | Imputar com média ou criar **faixa "desconhecido"** | Evita perder clientes por falta de cadastro completo                   |
| `view_rate`, `completion_rate`                           | Preencher com `0`                                   | Cliente que não viu ou não completou                                   |


In [None]:
# Valores nulos
df_final_analytics.filter(
    col("avg_transaction_amount").isNull() |
    col("completion_rate").isNull() |
    col("view_rate").isNull()
).show(10, truncate=False)


In [181]:
from pyspark.sql.functions import when, col, lit

df_model = df_final_analytics


df_model = df_model.fillna({
    "transaction_count": 0,
    "total_amount_spent": 0.0,
    "completed_bogo": 0,
    "completed_discount": 0,
    "completed_info": 0,
    "view_rate": 0.0,
    "completion_rate": 0.0,
    "channel_web": 0,
    "channel_email": 0,
    "channel_mobile": 0,
    "avg_channel_count": 0.0
})


> *Variáveis que só fazem sentido se houve transação*

In [182]:
df_model = df_model.withColumn("avg_transaction_amount",
    when(col("avg_transaction_amount").isNull(), None).otherwise(col("avg_transaction_amount"))
)

df_model = df_model.withColumn("std_transaction_amount",
    when(col("std_transaction_amount").isNull(), None).otherwise(col("std_transaction_amount"))
)


> *Tipo de oferta preferida — sem padrão ainda*

In [183]:
df_model = df_model.withColumn("preferred_offer_type",
    when(col("preferred_offer_type").isNull(), "none").otherwise(col("preferred_offer_type"))
)

> *Dados de cadastro — imputar "desconhecido" para preservar cliente*

> *Evita enviesamento estatístico: 0 não será usado como se fosse um gasto válido onde não houve.*

In [184]:
df_model = df_model.fillna({
    "age": -1,
    "credit_card_limit": -1.0,
    "days_since_registration": -1,
    "avg_transaction_amount": -1.0,
    "std_transaction_amount": -1.0
})


# 2.  Seleção de Features/Target e analise de desbalanceamento


> Features

- Essas features foram seçlecionadas pela relação direta com o comportamento do cliente e a estrutura da oferta:



> Target: engagement_level

  - 0: Nunca engajou

  - 1: Engajou parcialmente

  - 2: Engajou totalmente

> Desbalanceamento

- Avaliar impacto do desbalanceamento nos modelos (ex: com matriz de confusão).

- Testar estratégias como:

- Class weights (ex: no RandomForest ou Logistic Regression)

- Estratégia de avaliação focada em recall/precision de grupos minoritários

> *Primeiro teste com o modelo desbalanceado* `Random Forest`

In [190]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

features = [
    # Perfil do cliente
    "age",
    "credit_card_limit",
    "days_since_registration",

    # Comportamento de compra (geral)
    "transaction_count",
    "total_amount_spent",
    "avg_transaction_amount",
    "std_transaction_amount",

    # Canais de marketing
    "channel_web",
    "channel_email",
    "channel_mobile",
    "avg_channel_count"
]


# 2. Indexar coluna categórica
indexer = StringIndexer(inputCol="preferred_offer_type", outputCol="preferred_offer_type_index", handleInvalid="keep")

# 3. Vetorização
assembler = VectorAssembler(
    inputCols=[f for f in features if f != "preferred_offer_type"] + ["preferred_offer_type_index"],
    outputCol="features",
    handleInvalid="keep"  # ← evita erro com nulos ou valores inválidos
)


# 4. Classificador
rf = RandomForestClassifier(labelCol="engagement_level", featuresCol="features", seed=42)

# 5. Pipeline
pipeline = Pipeline(stages=[indexer, assembler, rf])

# 6. Divisão dos dados
train, test = df_model.randomSplit([0.8, 0.2], seed=42)

# 7. Treinamento
model = pipeline.fit(train)

# 8. Previsões
predictions = model.transform(test)

# 9. Avaliação
evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="engagement_level", predictionCol="prediction", metricName="accuracy"
)
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="engagement_level", predictionCol="prediction", metricName="f1"
)

accuracy = evaluator_acc.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)

print(f"✅ Acurácia: {accuracy:.4f}")
print(f"🎯 F1-score: {f1_score:.4f}")


✅ Acurácia: 0.9886
🎯 F1-score: 0.9829


> Overfitting?

In [198]:
# Avaliação no treino
train_predictions = model.transform(train)

train_acc = evaluator_acc.evaluate(train_predictions)
train_f1 = evaluator_f1.evaluate(train_predictions)

print(f"📘 Acurácia Treino: {train_acc:.4f}")
print(f"📘 F1-score Treino: {train_f1:.4f}")


📘 Acurácia Treino: 0.9909
📘 F1-score Treino: 0.9865


In [199]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

param_grid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20, 50]).build()

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator_f1,
    numFolds=5
)

cv_model = crossval.fit(train)
cv_predictions = cv_model.transform(test)

print("F1-score com validação cruzada:", evaluator_f1.evaluate(cv_predictions))


F1-score com validação cruzada: 0.9829267765922475


> *Matriz de Confusão*

> *Não esta conseguindo classificar a classe 0*

In [200]:
predictions.groupBy("engagement_level", "prediction").count().orderBy("engagement_level", "prediction").show()


+----------------+----------+-----+
|engagement_level|prediction|count|
+----------------+----------+-----+
|               0|       1.0|   21|
|               0|       2.0|   17|
|               1|       1.0|  811|
|               2|       2.0| 2480|
+----------------+----------+-----+



> *Métricas por Classe (Precision, Recall, F1)*

> *Parece estar sensivel ao desbalanceamento*

In [203]:
from sklearn.metrics import classification_report

# Converte para Pandas
pred_df = predictions.select("engagement_level", "prediction").toPandas()

# Exibe o relatório por classe
print(classification_report(pred_df["engagement_level"], pred_df["prediction"]))


              precision    recall  f1-score   support

           0       0.00      0.00      0.00        38
           1       0.97      1.00      0.99       811
           2       0.99      1.00      1.00      2480

    accuracy                           0.99      3329
   macro avg       0.66      0.67      0.66      3329
weighted avg       0.98      0.99      0.98      3329



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


> *Testando outros modelos mais sensiveis a esses tipo de distribuição*

> *Balanceamento das classes para utilizar com o modelo `Regressão Logística`*

In [213]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from pyspark.ml.classification import LogisticRegression

# Separa por classe
class_0 = train.filter(col("engagement_level") == 0)
class_1 = train.filter(col("engagement_level") == 1)
class_2 = train.filter(col("engagement_level") == 2)

# Oversample da classe 0
factor = int(class_2.count() / class_0.count())
class_0_oversampled = class_0.unionAll(class_0).limit(class_0.count() * factor)

# Novo train balanceado
train_balanced = class_0_oversampled.unionAll(class_1).unionAll(class_2)


In [208]:
# Indexador para variável categórica
indexer = StringIndexer(inputCol="preferred_offer_type", outputCol="preferred_offer_type_index", handleInvalid="keep")

# 🔗 Vetorização das features
assembler = VectorAssembler(
    inputCols=features + ["preferred_offer_type_index"],
    outputCol="features",
    handleInvalid="keep"
)

#  Modelo de Regressão Logística Multiclasse
lr = LogisticRegression(
    labelCol="engagement_level",
    featuresCol="features",
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.0,  # regressão L2 pura
    family="multinomial"
)

#  Pipeline
pipeline = Pipeline(stages=[indexer, assembler, lr])

#  Treinamento com dados balanceados
model = pipeline.fit(train_balanced)

#  Previsões no conjunto de teste original
predictions = model.transform(test)

#  Avaliação
evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="engagement_level", predictionCol="prediction", metricName="accuracy"
)
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="engagement_level", predictionCol="prediction", metricName="f1"
)

accuracy = evaluator_acc.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)

print(f" Logistic Regression (balanceado) - Acurácia: {accuracy:.4f}")
print(f" Logistic Regression (balanceado) - F1-score: {f1_score:.4f}")

 Logistic Regression (balanceado) - Acurácia: 0.9892
 Logistic Regression (balanceado) - F1-score: 0.9844


> com o balanceamento o modelo passou a reconhecer a classe 0, que antes era completamente ignorada

In [211]:
predictions.groupBy("engagement_level", "prediction").count().orderBy("engagement_level", "prediction").show()


+----------------+----------+-----+
|engagement_level|prediction|count|
+----------------+----------+-----+
|               0|       0.0|    2|
|               0|       1.0|   20|
|               0|       2.0|   16|
|               1|       1.0|  811|
|               2|       2.0| 2480|
+----------------+----------+-----+



# 3. Interpretar os resultados pensando no negócio

Prever o nível de engajamento de um cliente com cupons, antes de enviá-los.
Com isso, conseguimos:

- Evitar enviar cupons para quem não responde (classe 0).
  - nesse caso eu estava tentando pensar em alternativas pra conseguir chamar os clientes mesmo que seja enviando um informativo, porem não obtive sucesso por enquanto

- Ajustar canal e tipo de oferta para quem tem resposta moderada (classe 1).

- Investir mais nos clientes altamente engajados (classe 2).