In [0]:
# importando bibliotecas
import urllib.request, os
import requests
import tarfile
import io
import pandas as pd
from pyspark.sql import Window
from pyspark.sql import functions as F
from scipy.stats import ttest_ind
import matplotlib.pyplot as plt

In [0]:
# importanto conjunto de dados orders
orders = spark.read.json(
    "s3a://data-architect-test-source/order.json.gz"
)
orders.printSchema()
display(orders)
#print('Linhas totais:', orders.count()) -- está comentado pois consome muito tempo para executar


In [0]:
# importando conjunto de dados consumer
consumer = spark.read.csv(
    "s3a://data-architect-test-source/consumer.csv.gz",
    header=True,
    inferSchema=True
)
consumer.printSchema()
display(consumer)
print('Linhas totais:', consumer.count())

In [0]:
# importando conjunto de dados restaurant
restaurant = spark.read.csv(
    "s3a://data-architect-test-source/restaurant.csv.gz",
    header=True,
    inferSchema=True
)
restaurant.printSchema()
display(restaurant)
print('Linhas totais:', restaurant.count())

In [0]:
# importando conjunto de dados ab_test_ref
url = "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/ab_test_ref.tar.gz"
response = requests.get(url)
tar_bytes = io.BytesIO(response.content)

with tarfile.open(fileobj=tar_bytes, mode="r:gz") as tar:
    csv_member = next(
        m for m in tar.getmembers()
        if m.name == "ab_test_ref.csv"
    )
    f = tar.extractfile(csv_member)
    df_pd = pd.read_csv(f, encoding="latin1")

ab_test_ref = spark.createDataFrame(df_pd)
display(ab_test_ref)
print('Linhas totais:', ab_test_ref.count())

In [0]:
# agrupando os dados em uma única tabela
orders_consumer = orders.join(
    consumer.select("customer_id",F.col("created_at").alias("consumer_created_at"), F.col("active").alias("consumer_active")),
    on="customer_id",
    how="left"
)

orders_consumer_ab = orders_consumer.join(
    ab_test_ref,
    on="customer_id",
    how="left"
)

base_dados = orders_consumer_ab.join(
    restaurant.select("id", F.col("created_at").alias("merchant_created_at"), F.col("enabled").alias("merchant_enabled"), "price_range", "average_ticket", "takeout_time", "delivery_time", "minimum_order_value", "merchant_zip_code", "merchant_city", "merchant_state", "merchant_country"),
    orders_consumer_ab["merchant_id"] == restaurant["id"],
    how="left"
)

display(base_dados)
print('Linhas totais:', base_dados.count())

In [0]:
# display(base_dados.summary()) -- está comentado pois consome muito tempo para executar

In [0]:
# verificando se existem dados nulos
null_counts = base_dados.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c)
    for c in base_dados.columns
])

display(null_counts)

In [0]:
# identificando quais order_id são duplicados
group = base_dados.groupBy("order_id").agg(
    F.count("*").alias("total")
).orderBy(F.col("total").desc())

display(group)

In [0]:
# verificando qual variável está duplicando
display(orders.filter(F.col("order_id")=="acf2da39fc788852c55c8ec05beecb6fae39e1ef4a1a4e72eca4673d458f9c13"))


In [0]:
# excluindo linhas com customer_id nulo e removendo a coluna cpf que está fazendo os dados serem duplicados
base_dados_clean = base_dados.filter(F.col("customer_id").isNotNull()) \
    .drop("cpf") \
    .dropDuplicates()

display(base_dados_clean)
print('Linhas totais:', base_dados_clean.count())

In [0]:
# validando porque order_id continua duplicando
group = base_dados_clean.groupBy("order_id").agg(
    F.count("*").alias("total")
).orderBy(F.col("total").desc())

display(group)

In [0]:
# existem order_id iguais com datas diferentes, serão considerados como pedidos distintos
display(base_dados_clean.filter(F.col("order_id")=="003034fef123a8ee5cf6887069b5eccdbb26ef397348ab1ec2fad6629217bfc9"))


In [0]:
# selecionando apenas a colunas da base de dados que serão utilizadas
base_dados_selected = base_dados_clean.select(
    "customer_id",
    "order_id",
    "merchant_id",
    "order_created_at",
    "order_total_amount",
    "consumer_active",
    "is_target"
)

display(base_dados_selected)
print('Linhas totais:', base_dados_selected.count())

In [0]:
# métricas dos consumers para rfm
consumers_metrics = base_dados_selected.groupBy('customer_id').agg(
    F.min('order_created_at').alias('first_order'),
    F.max('order_created_at').alias('last_order'),
    F.count('order_id').alias('frequency'),
    F.sum('order_total_amount').alias('monetary'),
    F.first('is_target').alias('is_target'),
    F.countDistinct('merchant_id').alias('total_restaurants')
)

max_date = base_dados_selected.agg(F.max('order_created_at').alias('max_date')).collect()[0]['max_date']
consumers_metrics = consumers_metrics.withColumn('recency_days', F.datediff(F.lit(max_date), F.col('last_order')))

display(consumers_metrics)
print('Linhas totais:', consumers_metrics.count())

In [0]:
# métricas dos grupos target e control
group_metrics = consumers_metrics.groupBy("is_target").agg(
    F.countDistinct("customer_id").alias("total_customers"),
    F.sum("frequency").alias("total_orders"),
    F.stddev("frequency").alias("desv_frequency"),
    F.sum("total_restaurants").alias("total_restaurants"),
    F.sum("monetary").alias("total_monetary")
).withColumn(
    "orders_per_customer",
    F.try_divide(F.col("total_orders"), F.col("total_customers"))
).withColumn(
    "monetary_per_customer",
    F.try_divide(F.col("total_monetary"), F.col("total_customers"))
).withColumn(
    "tkt_medio_per_order",
    F.try_divide(F.col("total_monetary"), F.col("total_orders"))
).withColumn(
    "restaurants_per_customer",
    F.try_divide(F.col("total_restaurants"), F.col("total_customers"))
)

display(group_metrics)

In [0]:
# Selecionando colunas necessárias para teste de hipótese
df = consumers_metrics.select("frequency", "is_target")

target_freq = df.filter(F.col("is_target") == "target").select("frequency").toPandas()["frequency"]
control_freq = df.filter(F.col("is_target") == "control").select("frequency").toPandas()["frequency"]

# Teste t de Welch
t_stat, p_value = ttest_ind(target_freq, control_freq, equal_var=False)

print("t-statistic:", t_stat)
print(f"p-value: {p_value:.3f}")

In [0]:
# Resultado grupo teste 
# Parâmetros
coupon_value = 5.0
margin = 0.25

# Total de pedidos do grupo target
total_orders_target = group_metrics.filter(F.col("is_target") == "target").select("total_orders").collect()[0]["total_orders"]

# Total de customers no grupo target
total_customers_target = group_metrics.filter(F.col("is_target") == "target").select("total_customers").collect()[0]["total_customers"]

# Custo do cupom para todos os customers do grupo target
coupon_cost_target = total_customers_target * coupon_value

# Receita total gerada pelo grupo target
tkt_medio_target = group_metrics.filter(F.col("is_target") == "target").select("tkt_medio_per_order").collect()[0]["tkt_medio_per_order"]
revenue_target = total_orders_target * tkt_medio_target

# Margem total gerada
margin_target = revenue_target * margin

# ROI
roi = (margin_target - coupon_cost_target) / coupon_cost_target

print('Total de pedidos do grupo target:', total_orders_target)
print('Custo total dos cupons (R$):', coupon_cost_target)
print('Receita total gerada (R$):', revenue_target)
print('Margem total gerada (R$):', margin_target)
print('ROI:', roi)

In [0]:
# Simulação: grupo target com comportamento similar ao control
coupon_value = 5.0
margin = 0.25

# Média de pedidos por customer do grupo control
orders_per_customer_control = group_metrics.filter(F.col("is_target") == "control").select("orders_per_customer").collect()[0]["orders_per_customer"]

# Total de customers no grupo target
total_customers_target = group_metrics.filter(F.col("is_target") == "target").select("total_customers").collect()[0]["total_customers"]

# Total de pedidos esperados no grupo target se tivesse a mesma média do control
expected_orders_target = orders_per_customer_control * total_customers_target

# Pedidos atuais do grupo target
total_orders_target = group_metrics.filter(F.col("is_target") == "target").select("total_orders").collect()[0]["total_orders"]

# Pedidos a menos gerados
lost_orders_target = expected_orders_target - total_orders_target

# Receita total gerada
tkt_medio_target = group_metrics.filter(F.col("is_target") == "target").select("tkt_medio_per_order").collect()[0]["tkt_medio_per_order"]
revenue_target = (total_orders_target + lost_orders_target) * tkt_medio_target

# Receita perdida
tkt_medio_target = group_metrics.filter(F.col("is_target") == "target").select("tkt_medio_per_order").collect()[0]["tkt_medio_per_order"]
lost_revenue_target = lost_orders_target * tkt_medio_target

# Margem total gerada
margin_target = revenue_target * margin

# Margem perdida
lost_margin_target = lost_revenue_target * margin

# Custo do cupom para todos os customers do grupo target
coupon_cost_target = total_customers_target * coupon_value

# Impacto líquido
net_impact_target = lost_margin_target + coupon_cost_target

print('Simulação: grupo target não recebendo cupom de R$5')
print('Custo total dos cupons (R$):', coupon_cost_target)
print('Pedidos a mais gerados:', lost_orders_target)
print('Pedidos totais:', total_orders_target + lost_orders_target)
print('Receita total final (R$):', revenue_target)
print('Receita perdida (R$):', lost_revenue_target)
print('Margem total final (R$):', margin_target)
print('Margem perdida (R$):', lost_margin_target)
print('Impacto líquido (R$):', net_impact_target)

In [0]:
# Simulação: grupo control recebe cupom de R$5
coupon_value = 5.0
margin = 0.25

# Média de pedidos por customer do grupo target
orders_per_customer_target = group_metrics.filter(F.col("is_target") == "target").select("orders_per_customer").collect()[0]["orders_per_customer"]

# Total de customers no grupo control
total_customers_control = group_metrics.filter(F.col("is_target") == "control").select("total_customers").collect()[0]["total_customers"]

# Total de pedidos esperados no grupo control se tivesse a mesma média do target
expected_orders_control = orders_per_customer_target * total_customers_control

# Pedidos atuais do grupo control
total_orders_control = group_metrics.filter(F.col("is_target") == "control").select("total_orders").collect()[0]["total_orders"]

# Pedidos a mais gerados
extra_orders_control = expected_orders_control - total_orders_control

# Receita total gerada
tkt_medio_control = group_metrics.filter(F.col("is_target") == "control").select("tkt_medio_per_order").collect()[0]["tkt_medio_per_order"]
revenue_control = (total_orders_control + extra_orders_control) * tkt_medio_control

# Receita adicional gerada
tkt_medio_control = group_metrics.filter(F.col("is_target") == "control").select("tkt_medio_per_order").collect()[0]["tkt_medio_per_order"]
inc_revenue_control = extra_orders_control * tkt_medio_control

# Margem adicional
margin_control = revenue_control * margin

# Margem adicional
inc_margin_control = inc_revenue_control * margin

# Custo do cupom para todos os customers do grupo control
coupon_cost_control = total_customers_control * coupon_value

# Impacto líquido
net_impact_control = inc_margin_control - coupon_cost_control

# ROI
roi = (margin_control - coupon_cost_control)/coupon_cost_control

print('Simulação: grupo control recebendo cupom de R$5')
print('Custo total dos cupons (R$):', coupon_cost_control)
print('Pedidos a mais gerados:', extra_orders_control)
print('Receita total final (R$):', revenue_control)
print('Receita adicional (R$):', inc_revenue_control)
print('Margem total final (R$):', margin_control)
print('Margem adicional (R$):', inc_margin_control)
print('Impacto líquido (R$):', net_impact_control)
print('ROI:', roi)

In [0]:
# validando se os cálculos estão corretos
print('total_customers_control:', total_customers_control)
print('orders_per_customer_target:', orders_per_customer_target)
print('expected_orders_control:', expected_orders_control)
print('total_orders_control:', total_orders_control)
print('extra_orders_control:', extra_orders_control)
print('tkt_medio_control:', tkt_medio_control)
print('inc_revenue_control:', inc_revenue_control)
print('inc_margin_control:', inc_margin_control)

In [0]:
# Quantis para cálculo do score RFM
quantiles = consumers_metrics.approxQuantile(
    ["recency_days", "frequency", "monetary"], 
    [0.2, 0.4, 0.6, 0.8], 
    0.0
)

qR, qF, qM = quantiles
print("Quantis calculados:")
print("Recency:", qR)
print("Frequency:", qF)
print("Monetary:", qM)


In [0]:
# Pontuando de 1 a 5 os clientes com base nos quantis
rfm = (
    consumers_metrics
    # Recency → quanto menor, melhor
    .withColumn(
        "R_score",
        F.when(F.col("recency_days") <= qR[0], 5)
         .when(F.col("recency_days") <= qR[1], 4)
         .when(F.col("recency_days") <= qR[2], 3)
         .when(F.col("recency_days") <= qR[3], 2)
         .otherwise(1)
    )
    # Frequency → quanto maior, melhor
    .withColumn(
        "F_score",
        F.when(F.col("frequency") <= qF[0], 1)
         .when(F.col("frequency") <= qF[1], 2)
         .when(F.col("frequency") <= qF[2], 3)
         .when(F.col("frequency") <= qF[3], 4)
         .otherwise(5)
    )
    # Monetary → quanto maior, melhor
    .withColumn(
        "M_score",
        F.when(F.col("monetary") <= qM[0], 1)
         .when(F.col("monetary") <= qM[1], 2)
         .when(F.col("monetary") <= qM[2], 3)
         .when(F.col("monetary") <= qM[3], 4)
         .otherwise(5)
    )
)

#Cálculo score
rfm = rfm.withColumn(
    "total_score",
    F.col("R_score") + F.col("F_score") + F.col("M_score")
)

rfm = rfm.withColumn(
    "segment",
    F.when(F.col("total_score") >= 14, "Premium")
     .when(F.col("total_score") >= 10, "Leal")
     .when(F.col("total_score") >= 5, "Em risco")
     .otherwise("Inativo")
)


rfm_dist = (rfm.groupBy("segment")
       .agg(
           F.countDistinct("customer_id").alias("qtde_clientes"),
           F.round(F.avg("monetary"), 2).alias("receita_media"),
           F.round(F.avg("frequency"), 2).alias("compras_medias"),
           F.round(F.avg("recency_days"), 2).alias("recencia_media")
       )
       .orderBy(F.desc("receita_media")))

display(rfm_dist)

In [0]:
# Converter apenas colunas necessárias para Pandas
pdf = rfm.select("segment", "frequency", "monetary").toPandas()

# Mapear segmentos para códigos numéricos (para colorir)
pdf["segment_code"] = pdf["segment"].astype("category").cat.codes

plt.figure(figsize=(8,6))

# Gráfico de dispersão
plt.scatter(
    pdf["frequency"],
    pdf["monetary"],
    c=pdf["segment_code"],     # cores automáticas por segmento
    cmap="viridis",            # colormap padrão permitido
    alpha=0.6
)

plt.xlabel("Frequency")
plt.ylabel("Monetary")
plt.title("Frequency vs Monetary por Segmento")
plt.colorbar(label="Segmento (códigos)")

plt.show()


In [0]:
# Quantidade de clientes target e control em cada segmento RFM
segment_counts = rfm.groupBy("is_target", "segment").agg(
    F.countDistinct("customer_id").alias("qtde_clientes")
)

total_by_segment = rfm.groupBy("segment").agg(
    F.countDistinct("customer_id").alias("total_clientes")
)

result = segment_counts.join(
    total_by_segment, on="segment"
).withColumn(
    "percent_clientes",
    F.round(F.col("qtde_clientes") / F.col("total_clientes") * 100, 2)
).select(
    "is_target", "segment", "qtde_clientes", "percent_clientes"
).orderBy("is_target", "segment")

display(result)

In [0]:
# Simulação cupons por segmentação RFM

# Valores dos cupons por segmento
coupon_values = {"Premium": 3, "Leal": 4, "Em risco": 5, "Inativo": 6}
margin = 0.25

# Tabela de médias por segmento
rfm_stats = rfm_dist.select("segment", "receita_media", "compras_medias", "recencia_media").toPandas().set_index("segment")

# Quantidade de clientes por segmento
segment_counts_df = rfm.groupBy("segment").agg(F.countDistinct("customer_id").alias("qtde_clientes")).toPandas().set_index("segment")
segment_clients = segment_counts_df["qtde_clientes"].to_dict()

# Projeção de envio de cupom para 50% dos clientes de cada segmento
clients_coupon = {seg: int(0.5 * segment_clients.get(seg, 0)) for seg in coupon_values.keys()}

# Migração: 20% dos que receberam cupom migram para o próximo segmento
clients_migrate = {seg: int(0.2 * clients_coupon[seg]) for seg in coupon_values.keys()}

# Clientes que permanecem no segmento após ação
clients_stay = {seg: clients_coupon[seg] - clients_migrate[seg] for seg in coupon_values.keys()}

# Receita e margem dos que permanecem (comportamento do próprio segmento)
revenue_stay = {seg: clients_stay[seg] * rfm_stats.loc[seg, "receita_media"] for seg in coupon_values.keys()}
margin_stay = {seg: revenue_stay[seg] * margin for seg in coupon_values.keys()}

# Receita e margem dos que migram (comportamento do próximo segmento)
next_segment = {"Inativo": "Em risco", "Em risco": "Leal", "Leal": "Premium", "Premium": "Premium"}
revenue_migrate = {seg: clients_migrate[seg] * rfm_stats.loc[next_segment[seg], "receita_media"] for seg in coupon_values.keys()}
margin_migrate = {seg: revenue_migrate[seg] * margin for seg in coupon_values.keys()}

# Custo de cupom por segmento
coupon_cost = {seg: clients_coupon[seg] * coupon_values[seg] for seg in coupon_values.keys()}

# Quantidade de clientes em cada segmento após ação
final_clients = {
    "Premium": segment_clients.get("Premium", 0) + clients_migrate["Leal"],
    "Leal": segment_clients.get("Leal", 0) + clients_migrate["Em risco"] - clients_migrate["Leal"],
    "Em risco": segment_clients.get("Em risco", 0) + clients_migrate["Inativo"] - clients_migrate["Em risco"],
    "Inativo": segment_clients.get("Inativo", 0) - clients_migrate["Inativo"]
}

# Montando resultado
result_df = pd.DataFrame({
    "segmento": list(coupon_values.keys()),
    "clientes_cupom": [clients_coupon[seg] for seg in coupon_values.keys()],
    "custo_cupom": [coupon_cost[seg] for seg in coupon_values.keys()],
    "clientes_migraram": [clients_migrate[seg] for seg in coupon_values.keys()],
    "receita_migraram": [revenue_migrate[seg] for seg in coupon_values.keys()],
    "margem_migraram": [margin_migrate[seg] for seg in coupon_values.keys()],
    "clientes_permaneceram": [clients_stay[seg] for seg in coupon_values.keys()],
    "receita_permaneceram": [revenue_stay[seg] for seg in coupon_values.keys()],
    "margem_permaneceram": [margin_stay[seg] for seg in coupon_values.keys()],
    "clientes_final": [final_clients[seg] for seg in coupon_values.keys()]
})

display(result_df)