# Objetivo: Processar e integrar dados do iFood com PySpark para análise de campanhas e comportamento de clientes.
Fluxo:

    Importa bibliotecas e inicia sessão Spark.

    Carrega e trata offers, profiles e transactions.

    Integra datasets via join.

    Calcula métricas de funil e comportamento.

    Gera base final para análise e modelagem.

# 1. Importando bibliotecas e Configuração do Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum , explode, when, map_keys, stddev, mean, count, coalesce, array_contains, max as _max
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType, MapType

### Criar sessão Spark

In [2]:
spark = SparkSession.builder \
    .appName("iFood Data Processing") \
    .getOrCreate()

In [3]:
RAW_PATH = "../data/raw/"
PROCESSED_PATH = "../data/processed/"

# 2. Carregando e Pré-processando os Dados

## Offers

As ofertas podem ser enviadas por múltiplos canais. Para facilitar a análise, transformamos a coluna channels em colunas binárias (mobile, social, web), onde 1 indica que o canal foi usado e 0 indica o contrário. A coluna email foi removida, pois todas as ofertas foram enviadas por e-mail, tornando-a uma variável com pouca utilidade para a análise preditiva.

In [4]:
offers_schema = StructType([
    StructField("id", StringType(), True),
    StructField("offer_type", StringType(), True),
    StructField("min_value", IntegerType(), True),
    StructField("duration", IntegerType(), True),
    StructField("discount_value", IntegerType(), True),
    StructField("channels", ArrayType(StringType()), True)
])

offers_df = spark.read.json(f"{RAW_PATH}offers.json", schema=offers_schema)
offers_df = offers_df.withColumnRenamed("id", "offer_id")  # renomear para evitar conflito com IDs de clientes

In [5]:
# Extrair todos os canais distintos
channels_list = offers_df.select(explode(col("channels")).alias("channel")) \
                       .distinct()
print(channels_list.show())

+-------+
|channel|
+-------+
| mobile|
|  email|
| social|
|    web|
+-------+

None


In [6]:
channels_list = ['mobile','email','social','web']
#Criar colunas binárias para cada canal
for channel in channels_list:
    offers_df = offers_df.withColumn(
        channel,
        array_contains(col("channels"), channel).cast("int")
    )

#retirando email pois todas as ofertas foram enviadas por emaiol, então não faz sentido acompanhar
#duration também sempre vazio
offers_df = offers_df.drop("channels","duration","email")

offers_df.show()

+--------------------+-------------+---------+--------------+------+------+---+
|            offer_id|   offer_type|min_value|discount_value|mobile|social|web|
+--------------------+-------------+---------+--------------+------+------+---+
|ae264e3637204a6fb...|         bogo|       10|            10|     1|     1|  0|
|4d5c57ea9a6940dd8...|         bogo|       10|            10|     1|     1|  1|
|3f207df678b143eea...|informational|        0|             0|     1|     0|  1|
|9b98b8c7a33c4b65b...|         bogo|        5|             5|     1|     0|  1|
|0b1e1539f2cc45b7b...|     discount|       20|             5|     0|     0|  1|
|2298d6c36e964ae4a...|     discount|        7|             3|     1|     1|  1|
|fafdcd668e3743c1b...|     discount|       10|             2|     1|     1|  1|
|5a8bc65990b245e5a...|informational|        0|             0|     1|     1|  0|
|f19421c1d4aa40978...|         bogo|        5|             5|     1|     1|  1|
|2906b810c7d441179...|     discount|    

## Profiles

O primeiro filtro remove clientes com o gênero 'O', que provavelmente representam dados faltantes ou categorias indefinidas. O segundo filtro remove os clientes com informações de gênero e/ou limite do cartão de crédito ausentes. Essa limpeza garante que o modelo só usará dados completos.

In [7]:
profile_schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("registeredon", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("id", StringType(), True),
    StructField("credit_card_limit", FloatType(), True)
])

profile_df = spark.read.json(f"{RAW_PATH}profile.json", schema=profile_schema)


In [8]:
profile_df = profile_df.where(col("gender")!='O')

In [9]:
# Temos 2175 registros de profiles sem 'gender' e/ou 'credit_card_limit'. Como essa informação pode ser bem relevante, vamos retirar esses casos sem registros
print((17000-profile_df.where((col("credit_card_limit").isNotNull()&col("gender").isNotNull()&col("age").isNotNull()&col("id").isNotNull())).count()))

2387


In [10]:
profile_df = profile_df.where((col("credit_card_limit").isNotNull()&col("gender").isNotNull())) \
                        .select("id","age","gender","credit_card_limit").withColumnRenamed("id", "account_id")

In [11]:
profile_df.show(10)

+--------------------+---+------+-----------------+
|          account_id|age|gender|credit_card_limit|
+--------------------+---+------+-----------------+
|0610b486422d4921a...| 55|     F|         112000.0|
|78afa995795e4d85b...| 75|     F|         100000.0|
|e2127556f4f64592b...| 68|     M|          70000.0|
|389bc3fa690240e79...| 65|     M|          53000.0|
|2eeac8d8feae4a8ca...| 58|     M|          51000.0|
|aa4862eba776480b8...| 61|     F|          57000.0|
|e12aeaf2d47d42479...| 26|     M|          46000.0|
|31dda685af34476ca...| 62|     F|          71000.0|
|62cf5e10845442329...| 49|     M|          52000.0|
|6445de3b47274c759...| 57|     M|          42000.0|
+--------------------+---+------+-----------------+
only showing top 10 rows


## Transactions
O campo value nas transações é semi-estruturado (JSON), com chaves inconsistentes (offer_id e offer id). Padronizamos esses campos para facilitar a manipulação. A coluna transaction_value é criada a partir do campo amount para registrar o valor da transação.

In [12]:
transactions_schema = StructType([
    StructField("event", StringType(), True),
    StructField("account_id", StringType(), True),
    StructField("time_since_test_start", IntegerType(), True),
    StructField("value", MapType(StringType(), StringType()), True)
])

transactions_df = spark.read.json(f"{RAW_PATH}transactions.json", schema=transactions_schema)
# Extrair todas as chaves presentes no campo 'value'
keys_df = transactions_df.select(explode(map_keys(col("value"))).alias("key"))

# Listar chaves distintas
distinct_keys = keys_df.distinct().collect()
distinct_keys_list = [row["key"] for row in distinct_keys]
print("Chaves encontradas dentro de 'value':", distinct_keys_list)

Chaves encontradas dentro de 'value': ['offer_id', 'offer id', 'amount', 'reward']


In [13]:
# Como temos offer_id e offer id, padronizamos abaixo para offer_id
# Criar colunas específicas a partir da chave dentro de 'value'
transactions_df_padronizado = transactions_df.withColumn("offer_id",coalesce(col("value").getItem("offer_id"),col("value").getItem("offer id"))) \
                                 .withColumn("transaction_value", col("value").getItem("amount").cast(FloatType())) \
                                 .withColumn("reward", col("value").getItem("reward").cast(FloatType())) \
                                 .drop("value")

In [14]:
transactions_df_padronizado = transactions_df_padronizado.fillna('no offer', subset=['offer_id'])

In [15]:
# Seleciona a coluna 'event' e encontra os valores distintos
valores_event = transactions_df_padronizado.select(col("event")).distinct()

# Mostra os resultados.
valores_event.show()

+---------------+
|          event|
+---------------+
|    transaction|
| offer received|
|offer completed|
|   offer viewed|
+---------------+



In [16]:
transactions_df_padronizado = transactions_df_padronizado \
    .withColumn("is_offer_received", when(col("event") == "offer received", 1).otherwise(0)) \
    .withColumn("is_offer_viewed", when(col("event") == "offer viewed", 1).otherwise(0)) \
    .withColumn("is_offer_completed", when(col("event") == "offer completed", 1).otherwise(0)) \
    .withColumn("is_transaction", when(col("event") == "transaction", 1).otherwise(0))

In [17]:
# Agregar por cliente (1 se ocorreu pelo menos uma vez, senão 0)
funnel_df = transactions_df_padronizado.groupBy("account_id","offer_id").agg(
    _max("is_offer_completed").alias("has_offer_completed")
)

### As transações são categorizadas por eventos (offer received, offer viewed, offer completed, transaction). Como nem todas as ofertas visualizadas são registradas, focamos na métrica has_offer_completed, que é mais confiável e representa o sucesso real de uma oferta. Agrupamos por cliente e oferta para criar nossa tabela de fatos, onde cada linha representa o resultado de uma oferta para um cliente.

### Outra coisa é que o reward é o discount_amount dos dados das ofertas, então a gente não precisa ter isso aqui. O mais importante é saber qual oferta foi completada ou não.

In [18]:
funnel_df = funnel_df.withColumn("has_offer_completed", col("has_offer_completed").cast("int"))
funnel_df.groupby("offer_id").agg(
    sum("has_offer_completed").alias("has_offer_completed_sum")
).show()

+--------------------+-----------------------+
|            offer_id|has_offer_completed_sum|
+--------------------+-----------------------+
|0b1e1539f2cc45b7b...|                   2978|
|4d5c57ea9a6940dd8...|                   2885|
|9b98b8c7a33c4b65b...|                   3784|
|f19421c1d4aa40978...|                   3741|
|fafdcd668e3743c1b...|                   4530|
|ae264e3637204a6fb...|                   3177|
|5a8bc65990b245e5a...|                      0|
|2298d6c36e964ae4a...|                   4421|
|2906b810c7d441179...|                   3480|
|            no offer|                      0|
|3f207df678b143eea...|                      0|
+--------------------+-----------------------+



In [19]:
funnel_profile_df = funnel_df.join(profile_df, on="account_id", how="left")
funnel_profile_df = funnel_profile_df.join(offers_df, on="offer_id", how="left")

In [20]:
funnel_profile_df.count()

79866

In [21]:
funnel_profile_df = funnel_profile_df.na.drop()

funnel_profile_df.count()

54453

In [22]:
funnel_profile_df.show()

+--------------------+--------------------+-------------------+---+------+-----------------+-------------+---------+--------------+------+------+---+
|            offer_id|          account_id|has_offer_completed|age|gender|credit_card_limit|   offer_type|min_value|discount_value|mobile|social|web|
+--------------------+--------------------+-------------------+---+------+-----------------+-------------+---------+--------------+------+------+---+
|f19421c1d4aa40978...|0861b9ca31b741bb8...|                  1| 73|     M|          92000.0|         bogo|        5|             5|     1|     1|  1|
|3f207df678b143eea...|4ecbfa77f6f84e779...|                  0| 83|     F|          58000.0|informational|        0|             0|     1|     0|  1|
|4d5c57ea9a6940dd8...|523e2d85f3d94eefb...|                  1| 47|     F|          65000.0|         bogo|       10|            10|     1|     1|  1|
|2906b810c7d441179...|d57bd2200e7b44c6b...|                  0| 56|     M|          47000.0|     dis

# Importante
Em nosso conjunto de dados, observamos que muitos clientes recebem múltiplas ofertas, porém não dispomos das informações sobre o momento exato em que as ofertas foram enviadas nem das datas das compras realizadas. Essa ausência de dados temporais impede uma análise precisa da relação causal entre oferta e retenção do cliente.

Isso representa um desafio, pois uma oferta pode gerar um efeito de “awareness” no cliente, aproximando-o do estabelecimento e influenciando compras futuras, ainda que a oferta em si não seja diretamente utilizada na compra. Portanto, a ausência da sequência temporal entre envio da oferta e compra dificulta distinguir quais ofertas foram efetivamente responsáveis pela fidelização.

Diante dessa limitação, neste projeto, focaremos na medição da conversão das ofertas com base nos dados disponíveis, buscando identificar as melhores combinações de ofertas a serem enviadas que resultem em compras efetivas.

In [23]:
# Nenhuma oferta informational está diretamente relacionada a uma compra
final_data = funnel_profile_df.where(col('offer_type') != "informational")

In [24]:
final_data.show()

+--------------------+--------------------+-------------------+---+------+-----------------+----------+---------+--------------+------+------+---+
|            offer_id|          account_id|has_offer_completed|age|gender|credit_card_limit|offer_type|min_value|discount_value|mobile|social|web|
+--------------------+--------------------+-------------------+---+------+-----------------+----------+---------+--------------+------+------+---+
|f19421c1d4aa40978...|0861b9ca31b741bb8...|                  1| 73|     M|          92000.0|      bogo|        5|             5|     1|     1|  1|
|4d5c57ea9a6940dd8...|523e2d85f3d94eefb...|                  1| 47|     F|          65000.0|      bogo|       10|            10|     1|     1|  1|
|2906b810c7d441179...|d57bd2200e7b44c6b...|                  0| 56|     M|          47000.0|  discount|       10|             2|     1|     0|  1|
|9b98b8c7a33c4b65b...|16fb582d943d42c4a...|                  1| 57|     F|          38000.0|      bogo|        5|     

### Análise rápida dos dados
Para entender melhor os grupos que completaram e os que não completaram as ofertas, analisamos algumas estatísticas descritivas. Os clientes que completam ofertas tendem a ser um pouco mais velhos, ter limites de cartão de crédito mais altos e as ofertas que aceitaram geralmente tinham valores de desconto e valores mínimos de compra menores. Isso sugere que os clientes mais velhos e com maior poder aquisitivo são mais propensos a usar ofertas, especialmente quando são fáceis de converter (menor valor mínimo).

In [25]:
# Média e desvio padrão por grupo
final_data.groupBy("has_offer_completed").agg(
    mean("age").alias("mean_age"),
    stddev("age").alias("stddev_age"),
    mean("credit_card_limit").alias("mean_credit"),
    stddev("credit_card_limit").alias("stddev_credit"),
    mean("min_value").alias("mean_min_value"),
    stddev("min_value").alias("stddev_min_value"),
    mean("discount_value").alias("mean_discount_value"),
    stddev("discount_value").alias("stddev_discount_value"),
).show()

+-------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-------------------+---------------------+
|has_offer_completed|          mean_age|        stddev_age|      mean_credit|     stddev_credit|   mean_min_value|  stddev_min_value|mean_discount_value|stddev_discount_value|
+-------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-------------------+---------------------+
|                  1| 55.78177588776215|16.848945085967937|69398.28444735215| 21642.38844430909|9.319303601933631| 4.221124809686318|  4.956747719259986|    2.904575018912144|
|                  0|51.738513261113184| 18.00163562655068|58589.09226746358|19971.073672570034|10.21417009089777|4.7791864949131835| 5.7705142572531445|    3.069217020187086|
+-------------------+------------------+------------------+-----------------+------------------+-----------------+------

In [26]:
#taxa de conversão por gênero
final_data.groupBy("gender").agg(
    count("*").alias("total"),
    sum("has_offer_completed").alias("conversions"),
    (sum("has_offer_completed") / count("*")).alias("conversion_rate")
).show()

+------+-----+-----------+------------------+
|gender|total|conversions|   conversion_rate|
+------+-----+-----------+------------------+
|     F|18207|      13259|0.7282363925962542|
|     M|25368|      14254|0.5618889940081994|
+------+-----+-----------+------------------+



In [27]:
# taxa de conversão por offer_type
final_data.groupBy("offer_type").agg(
    count("*").alias("total"),
    sum("has_offer_completed").alias("conversions"),
    (sum("has_offer_completed") / count("*")).alias("conversion_rate")
).show()

+----------+-----+-----------+------------------+
|offer_type|total|conversions|   conversion_rate|
+----------+-----+-----------+------------------+
|  discount|21816|      14525|0.6657957462412908|
|      bogo|21759|      12988|0.5969024311779034|
+----------+-----+-----------+------------------+



In [28]:
# taxa de conversão por offer_type
final_data.groupBy("offer_id").agg(
    count("*").alias("total"),
    sum("has_offer_completed").alias("conversions"),
    (sum("has_offer_completed") / count("*")).alias("conversion_rate")
).show()

+--------------------+-----+-----------+------------------+
|            offer_id|total|conversions|   conversion_rate|
+--------------------+-----+-----------+------------------+
|0b1e1539f2cc45b7b...| 5511|       2893|0.5249500998003992|
|4d5c57ea9a6940dd8...| 5449|       2824| 0.518260231235089|
|9b98b8c7a33c4b65b...| 5456|       3565|0.6534090909090909|
|f19421c1d4aa40978...| 5368|       3503|0.6525707898658718|
|fafdcd668e3743c1b...| 5472|       4194|0.7664473684210527|
|ae264e3637204a6fb...| 5486|       3096|0.5643456069996354|
|2298d6c36e964ae4a...| 5426|       4106|0.7567268706229267|
|2906b810c7d441179...| 5407|       3332|0.6162382097281301|
+--------------------+-----+-----------+------------------+



In [29]:
#Mesma ideia para os canais mobile, social, web (que são binários)
for channel in ["mobile", "social", "web"]:
    final_data.groupBy(channel).agg(
        count("*").alias("total"),
        sum("has_offer_completed").alias("conversions"),
        (sum("has_offer_completed") / count("*")).alias("conversion_rate")
    ).show()

+------+-----+-----------+------------------+
|mobile|total|conversions|   conversion_rate|
+------+-----+-----------+------------------+
|     1|38064|      24620|0.6468053804119378|
|     0| 5511|       2893|0.5249500998003992|
+------+-----+-----------+------------------+

+------+-----+-----------+------------------+
|social|total|conversions|   conversion_rate|
+------+-----+-----------+------------------+
|     1|27201|      17723|0.6515569280541157|
|     0|16374|       9790|0.5978991083424942|
+------+-----+-----------+------------------+

+---+-----+-----------+------------------+
|web|total|conversions|   conversion_rate|
+---+-----+-----------+------------------+
|  1|38089|      24417| 0.641051222137625|
|  0| 5486|       3096|0.5643456069996354|
+---+-----+-----------+------------------+



## Insights: 

A análise de gênero revela uma diferença significativa na taxa de conversão, com mulheres apresentando uma taxa de quase 73% contra 56% dos homens. Isso pode indicar uma maior receptividade do público feminino às ofertas ou uma segmentação mais eficaz para este grupo.

As ofertas de discount têm uma taxa de conversão ligeiramente maior (66.5%) em comparação com as bogo (59.6%). Isso pode ser devido à natureza mais direta e simples de uma oferta de desconto, enquanto as ofertas BOGO (Compre Um, Leve Outro) podem exigir uma compra maior ou mais específica.

Existem duas combinações de offers que estão com mais de 75% de conversão, outras duas apenas 52% e as outras por volta dos 60%.

Em relação ao canais de mkt, nota-se que quanto mais canais, melhor. Porém o que tem o menor impacto quando não utilizado é o social.

## 
Para preparar os dados para o modelo de machine learning, é necessário codificar as variáveis categóricas em formatos numéricos. As variáveis offer_type (bogo e discount) e gender (M e F) são transformadas em variáveis binárias (0 e 1). Essa etapa é fundamental para que o modelo possa processar essas informações corretamente.

In [30]:
final_data = final_data.withColumn(
    "offer_type",
    when(col("offer_type") == "bogo", 0).otherwise(1)
)

final_data = final_data.withColumn(
    "gender",
    when(col("gender") == "M", 0).otherwise(1)
)


In [31]:
final_data.show()

+--------------------+--------------------+-------------------+---+------+-----------------+----------+---------+--------------+------+------+---+
|            offer_id|          account_id|has_offer_completed|age|gender|credit_card_limit|offer_type|min_value|discount_value|mobile|social|web|
+--------------------+--------------------+-------------------+---+------+-----------------+----------+---------+--------------+------+------+---+
|f19421c1d4aa40978...|0861b9ca31b741bb8...|                  1| 73|     0|          92000.0|         0|        5|             5|     1|     1|  1|
|4d5c57ea9a6940dd8...|523e2d85f3d94eefb...|                  1| 47|     1|          65000.0|         0|       10|            10|     1|     1|  1|
|2906b810c7d441179...|d57bd2200e7b44c6b...|                  0| 56|     0|          47000.0|         1|       10|             2|     1|     0|  1|
|9b98b8c7a33c4b65b...|16fb582d943d42c4a...|                  1| 57|     1|          38000.0|         0|        5|     

In [32]:
final_data.groupby("has_offer_completed").count().show()

+-------------------+-----+
|has_offer_completed|count|
+-------------------+-----+
|                  1|27513|
|                  0|16062|
+-------------------+-----+



In [None]:
final_data.write.mode("overwrite").parquet("../data/processed/final_data_numeric.parquet")