In [1]:
from pyspark.sql import functions as f
from pyspark.sql import Window
import sys
from pathlib import Path

In [None]:
#voltando para a raiz do projeto para poder trazer a funcao get_spark_session do Utils
PROJECT_ROOT = Path().resolve().parents[0]
sys.path.append(str(PROJECT_ROOT))

In [3]:
from src.ifood_case.utils import get_spark_session

spark = get_spark_session(app_name="ifood_case_exploration")

[32m2025-12-25 00:02:02.937[0m | [1mINFO    [0m | [36msrc.ifood_case.utils[0m:[36mget_spark_session[0m:[36m18[0m - [1mSubindo Spark Session: ifood_case_exploration[0m
your 131072x1 screen size is bogus. expect trouble
25/12/25 00:02:04 WARN Utils: Your hostname, DESKTOP-HH1RONB resolves to a loopback address: 127.0.1.1; using 172.24.183.91 instead (on interface eth0)
25/12/25 00:02:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/25 00:02:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#informando caminho para leitura dos arquivos parquet
ORDERS_PATH = PROJECT_ROOT / "data" / "processed" / "orders"
CONSUMERS_PATH = PROJECT_ROOT / "data" / "processed" / "consumers"
RESTAURANTS_PATH = PROJECT_ROOT / "data" / "processed" / "restaurants"
AB_TEST_PATH = PROJECT_ROOT / "data" / "processed" / "ab_test_ref"
ORDERS_ITENS_PATH = PROJECT_ROOT / "data" / "processed" / "orders_itens"

orders = spark.read.parquet(str(ORDERS_PATH))
consumers = spark.read.parquet(str(CONSUMERS_PATH))
restaurants = spark.read.parquet(str(RESTAURANTS_PATH))
ab_test = spark.read.parquet(str(AB_TEST_PATH))
orders_itens = spark.read.parquet(str(ORDERS_ITENS_PATH))

In [5]:
#verificando a dsitribuição dos alvos da campanha
ab_test.printSchema()
ab_test.groupBy("is_target").count().show()

root
 |-- customer_id: string (nullable = true)
 |-- is_target: string (nullable = true)



                                                                                

+---------+------+
|is_target| count|
+---------+------+
|  control|360542|
|   target|445925|
+---------+------+



In [6]:
#verificando se todos os usuarios do teste possuem pedido (cobertura da campanha)
ab_users = ab_test.select("customer_id").distinct()

orders_users = orders.select("customer_id").distinct()

coverage = (
    ab_users
    .join(orders_users, "customer_id", "left")
    .withColumn("has_order", f.col("customer_id").isNotNull())
    .groupBy("has_order")
    .count()
)

coverage.show()


[Stage 8:>                                                          (0 + 8) / 8]

+---------+------+
|has_order| count|
+---------+------+
|     true|806467|
+---------+------+



                                                                                

In [27]:
df_tempo_de_casa = orders.join(consumers, "customer_id") \
    .withColumn("tempo_de_casa_days", f.datediff(f.col("order_date"), f.col("created_at")))

df_tempo_de_casa.select(
    f.min("tempo_de_casa_days").alias("min_dias_casa"),
    f.max("tempo_de_casa_days").alias("max_dias_casa")
).show()

[Stage 249:>                                                        (0 + 8) / 9]

+-------------+-------------+
|min_dias_casa|max_dias_casa|
+-------------+-------------+
|          240|          394|
+-------------+-------------+



                                                                                

Como a base já era ativa, pelo menos 240 dias de casa, o objetivo da campanha não era ativação, mas sim aumentar frequência e valor transacionado. Portanto, foquei em métricas de comportamento e monetização

In [None]:
#calculando as métricas principais por variante
#join na orders com teste_ab
df_analysis = orders.join(ab_test, "customer_id", "inner")


metrics_ab = (
    df_analysis
    .groupBy("is_target")
    .agg(
        f.count_distinct("customer_id").alias("n_users"),
        f.count("order_id").alias("total_orders"),
        f.regexp_replace(f.format_number(f.sum("order_total_amount"), 2), ",", "").alias("total_revenue"),
        f.round(f.count("order_id") / f.count_distinct("customer_id"), 2).alias("avg_frequency"),
        f.round(f.avg("order_total_amount"), 2).alias("avg_ticket"),
        f.round(f.sum("order_total_amount") / f.count_distinct("customer_id"), 2).alias("arpu")
    )
)

metrics_ab.show(truncate=False)

[Stage 119:>                                                        (0 + 8) / 9]

+---------+-------+------------+-------------+-------------+----------+------+
|is_target|n_users|total_orders|total_revenue|avg_frequency|avg_ticket|arpu  |
+---------+-------+------------+-------------+-------------+----------+------+
|control  |360528 |1010699     |48432203.49  |2.8          |47.92     |134.34|
|target   |445909 |1416614     |67729986.45  |3.18         |47.81     |151.89|
+---------+-------+------------+-------------+-------------+----------+------+



                                                                                

In [24]:
#com base nas metricas obtidas, trazendo lift percentual para facilitar a analise de resultado
w = Window.orderBy(f.col("is_target").asc())

lift_metrics = (
    metrics_ab
    .withColumn("control_orders", f.lag("total_orders").over(w))
    .withColumn("control_revenue", f.lag("total_revenue").over(w))
    .withColumn("control_freq", f.lag("avg_frequency").over(w))
    .withColumn("control_ticket", f.lag("avg_ticket").over(w))
    .withColumn("control_arpu", f.lag("arpu").over(w))
    .filter(f.col("is_target") == "target")
    .select(
        f.lit("target").alias("alvo"),
        f.round(((f.col("total_orders") / f.col("control_orders")) - 1) * 100, 2).alias("lift_orders"),
        f.round(((f.col("total_revenue") / f.col("control_revenue")) - 1) * 100, 2).alias("lift_revenue"),
        f.round(((f.col("avg_frequency") / f.col("control_freq")) - 1) * 100, 2).alias("lift_frequencia"),
        f.round(((f.col("avg_ticket") / f.col("control_ticket")) - 1) * 100, 2).alias("lift_ticket"),
        f.round(((f.col("arpu") / f.col("control_arpu")) - 1) * 100, 2).alias("lift_arpu")
    )
)

lift_metrics.show()



+------+-----------+------------+---------------+-----------+---------+
|  alvo|lift_orders|lift_revenue|lift_frequencia|lift_ticket|lift_arpu|
+------+-----------+------------+---------------+-----------+---------+
|target|      40.16|       39.84|          13.57|      -0.23|    13.06|
+------+-----------+------------+---------------+-----------+---------+



                                                                                

In [None]:
#analisando valor do desconto do item para tentar achar valor do cupom
df_items_ab = orders_itens.join(ab_test, "customer_id", "inner")

financeiro_ab = (
    df_items_ab
    .groupBy("is_target")
    .agg(
        f.sum("total_value").alias("faturamento_bruto"), # Valor sem cupom
        f.sum("item_discount").alias("custo_total_cupons"), # O valor do cupom propriamente dito
        f.count_distinct("order_id").alias("total_pedidos")
    )
    .withColumn("faturamento_liquido", f.col("faturamento_bruto") - f.col("custo_total_cupons"))
    .withColumn("desconto_medio_por_pedido", f.round(f.col("custo_total_cupons") / f.col("total_pedidos"), 2))
)

financeiro_ab.show()

[Stage 36:>                                                         (0 + 8) / 9]

+---------+--------------------+------------------+-------------+--------------------+-------------------------+
|is_target|   faturamento_bruto|custo_total_cupons|total_pedidos| faturamento_liquido|desconto_medio_por_pedido|
+---------+--------------------+------------------+-------------+--------------------+-------------------------+
|  control| 3.243658969999081E7|               0.0|      1010468| 3.243658969999081E7|                      0.0|
|   target|4.5718660160005346E7|               0.0|      1416276|4.5718660160005346E7|                      0.0|
+---------+--------------------+------------------+-------------+--------------------+-------------------------+



                                                                                

Invalidando analise utilizando o campo item_discount, pois todos os items possuem valor 0 nessa coluna. Muitos itens tambem estao com valor zero no item_value e consequentemente, no total_value.