In [0]:
'''This notebook is used to analize tecnical case for the process selection of Ifood'''

'This notebook is used to analize tecnical case for the process selection of Ifood'

In [0]:
from pyspark.sql.functions import col, from_json, explode_outer, sum, round, countDistinct, avg, count, when, date_format, row_number,to_date
from pyspark.sql import DataFrame
from pyspark.sql.window import Window

# Leitura de dados dos datasets

In [0]:
df_order = spark.read.json("s3a://data-architect-test-source/order.json.gz")


In [0]:
df_consumers = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("s3a://data-architect-test-source/consumer.csv.gz")
)

In [0]:
df_merchants = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("s3a://data-architect-test-source/restaurant.csv.gz")
)

Para o df_ab_test:
- Foi realizada extração local dos arquivos utilizando 7-ZIP;
- O arquivo foI adicionados ao databricks para o DBFS;
- A seguir a leitura do arquivo para geração do dataframe

In [0]:
df_ab_test_users = spark.read.option("header", "true").option("inferSchema", "true").csv("/FileStore/tables/ab_test_ref.csv")


# Processamento de Dados

### Order Dataset
Para valores de orde_id que aparecem mais de uma vez, a data mais antiga será recuperada.

In [0]:
df_order = df_order.select(
    "customer_id",
    "order_id",
    "delivery_address_city",
    "delivery_address_country",
    "delivery_address_state",
    "items",
    "merchant_id",
    "order_created_at",
    "order_scheduled",
    "order_total_amount",
    "origin_platform",
)

In [0]:
window_spec = Window.partitionBy("order_id").orderBy("order_created_at")
df_order_ranking = df_order.withColumn("rank", row_number().over(window_spec))
df_order_first = df_order_ranking.where(col("rank") == 1).drop("rank")

In [0]:
df_orders = df_order_first.withColumn(
    "year_month_order", date_format("order_created_at", "yyyy-MM")
).withColumn("order_date", to_date("order_created_at"))

# Criação de Tabelas

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS hive_metastore.analytics")


DataFrame[]

In [0]:
df_orders.coalesce(8).write \
    .mode("overwrite") \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .saveAsTable("analytics.orders")


In [0]:
df_consumers.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("analytics.consumers")


In [0]:
df_merchants.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("analytics.merchants")


In [0]:
df_ab_test_users.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("analytics.ab_test_users")


# Análises


### 1 - Análise sobre a campanha de cupons de desconto

Foram definidos os indicadores: 
- % Pedidos = Total de pedidos de clientes com cupom / total de pedidos;
- Crescimento diário de pedidos (comparativo entre as segmentações);
- Valor médio de pedido = Valor médio de pedido de clientes que utilizam cupom;
- Mediana diária do valor total de pedidos (comparativos entre as segmentações)


## % Pedidos e crescimento diário de pedidos

Percentual de pedidos de clientes que receberam cupons em relação ao total de pedidos

In [0]:
%sql
WITH total_order AS(
   Select count(distinct order_id) As total
   from  analytics.orders 
),
order_from_target AS (
  Select count(distinct r.order_id) As total
  from analytics.orders r
  inner join analytics.ab_test_users a
  on a.customer_id = r.customer_id
where a.is_target = 'target')
SELECT
  o.total AS order_from_target,
  t.total AS total_order,
  round(o.total * 100.0 / t.total,2) AS percent_order_target
FROM order_from_target o, total_order t

order_from_target,total_order,percent_order_target
1416677,2432974,58.23


In [0]:
%sql

WITH pedidos_por_dia AS (
  SELECT
    r.order_date,
    a.is_target,
    COUNT(DISTINCT r.order_id) AS total_pedidos
  FROM analytics.orders r
  INNER JOIN analytics.ab_test_users a
    ON a.customer_id = r.customer_id
  GROUP BY r.order_date, a.is_target
),

crescimento_diario AS (
  SELECT
    order_date,
    is_target,
    total_pedidos,
    LAG(total_pedidos) OVER (
      PARTITION BY is_target
      ORDER BY order_date
    ) AS pedidos_dia_anterior
  FROM pedidos_por_dia
)

SELECT
  order_date,
  is_target,
  total_pedidos,
  pedidos_dia_anterior,
  CASE
    WHEN pedidos_dia_anterior IS NULL OR pedidos_dia_anterior = 0 THEN NULL
    ELSE ROUND(
      (total_pedidos - pedidos_dia_anterior) * 100.0 / pedidos_dia_anterior, 2
    )
  END AS taxa_crescimento_percentual
FROM crescimento_diario
ORDER BY order_date, is_target


order_date,is_target,total_pedidos,pedidos_dia_anterior,taxa_crescimento_percentual
2018-12-03,control,13907,,
2018-12-03,target,19744,,
2018-12-04,control,12508,13907.0,-10.06
2018-12-04,target,18164,19744.0,-8.0
2018-12-05,control,13120,12508.0,4.89
2018-12-05,target,18788,18164.0,3.44
2018-12-06,control,14259,13120.0,8.68
2018-12-06,target,20425,18788.0,8.71
2018-12-07,control,18590,14259.0,30.37
2018-12-07,target,26061,20425.0,27.59


Databricks visualization. Run in Databricks to view.

## Valor Médio de Pedidos

In [0]:
%sql
-- valor médio
select is_target,
       round(avg(order_total_amount),2) as avg_total_amount
from analytics.orders r
inner join analytics.ab_test_users a
on a.customer_id = r.customer_id
group by is_target

is_target,avg_total_amount
control,47.92
target,47.81


In [0]:
%sql
-- Mediana diária de valor dos pedidos
SELECT
  is_target,
  order_date,
  PERCENTILE_APPROX(order_total_amount, 0.5) AS median_total_amount
FROM analytics.orders r
INNER JOIN analytics.ab_test_users a
  ON a.customer_id = r.customer_id
GROUP BY is_target, order_date
ORDER BY order_date, is_target

is_target,order_date,median_total_amount
control,2018-12-03,36.0
target,2018-12-03,36.0
control,2018-12-04,35.5
target,2018-12-04,35.69
control,2018-12-05,36.3
target,2018-12-05,36.0
control,2018-12-06,36.7
target,2018-12-06,36.8
control,2018-12-07,40.0
target,2018-12-07,40.0


Databricks visualization. Run in Databricks to view.

## Verificação de resultados estatisticamente significativos para pedidos e para valor de pedidos

In [0]:
import pandas as pd
from scipy.stats import ttest_ind, mannwhitneyu
import matplotlib.pyplot as plt
import seaborn as sns

In [0]:
# Quantidade de pedidos por clientes com cupom
df_target = spark.sql(
    f"""SELECT r.customer_id,
               COUNT(DISTINCT r.order_id) AS order
  from analytics.orders r
  inner join analytics.ab_test_users a
  on a.customer_id = r.customer_id
where a.is_target = 'target'
group by r.customer_id
"""
)

In [0]:
# Quantidade de pedidos por clientes sem cupom
df_control = spark.sql(
    f"""SELECT r.customer_id,
               COUNT(DISTINCT r.order_id) AS order
  from analytics.orders r
  inner join analytics.ab_test_users a
  on a.customer_id = r.customer_id
where a.is_target = 'control'
group by r.customer_id
"""
)

In [0]:
order_target = df_target.toPandas()["order"].tolist()
order_control = df_control.toPandas()["order"].tolist()

In [0]:
stat, p_value = ttest_ind(order_target, order_control)

print(f"p-value: {p_value:.4f}")

if p_value < 0.05:
    print("Diferença estatisticamente significativa")
else:
    print("Sem diferença significativa")

p-value: 0.0000
Diferença estatisticamente significativa


In [0]:
# Valor de pedidos para clientes com cupom
df_amount_target = spark.sql(
    f"""SELECT r.customer_id,
               sum(r.order_total_amount) AS order_amount
  from analytics.orders r
  inner join analytics.ab_test_users a
  on a.customer_id = r.customer_id
where a.is_target = 'target'
group by r.customer_id
"""
)

In [0]:
# Valor de pedidos para clientes sem cupom
df_amount_control = spark.sql(
    f"""SELECT r.customer_id,
               sum(r.order_total_amount) AS order_amount
  from analytics.orders r
  inner join analytics.ab_test_users a
  on a.customer_id = r.customer_id
where a.is_target = 'control'
group by r.customer_id
"""
)

In [0]:
amount_target = df_amount_target.toPandas()["order_amount"].tolist()
amount_control = df_amount_control.toPandas()["order_amount"].tolist()

In [0]:

stat, p_value = mannwhitneyu(amount_target, amount_control, alternative='two-sided')


In [0]:
print(f"Valor-p: {p_value:.4f}")
if p_value < 0.05:
    print("Diferença estatisticamente significativa na receita por cliente.")
else:
    print("Sem diferença significativa na receita por cliente.")


Valor-p: 0.0000
Diferença estatisticamente significativa na receita por cliente.


Media do valor total por cliente e Lift Percentual para comprender o quanto o grupo B performou melhor em termos de receita em relação ao grupo A

In [0]:
import numpy as np
mean_target = np.mean(amount_target)
mean_control = np.mean(amount_control)

lift = (mean_target - mean_control) / mean_control

print(f"Média Target: R$ {mean_target:.2f}")
print(f"Média Control: R$ {mean_control:.2f}")
print(f"Lift percentual: {lift * 100:.2f}%")

Média Target: R$ 151.89
Média Control: R$ 134.33
Lift percentual: 13.07%


##  2. Análise de segmentação para teste A/B 

Análise inicial do perfil dos clientes foram utilizadas as  variáveis:
- Quantidade de pedidos (quant_order);
- Valor total dos pedidos (total_order);
- Média do valor minimo dos resturantes atrelados aos pedidos (minimium_order_value);
- Ticket médio do restaurante (average_ticket)

In [0]:
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
import mlflow
mlflow.autolog(disable=True)

In [0]:
# Perfil destes clientes
df_customer_profile = spark.sql(f'''select a.customer_id,
       c.language,
       r.origin_platform,
       c.active,
       r.delivery_address_state,
       count(distinct r.order_id) as quant_order,
       round(sum(r.order_total_amount),2) as total_order,
       round(avg(m.minimum_order_value),2) as minimum_order_value,
       round(avg(m.average_ticket),2) as average_ticket
from analytics.ab_test_users a
inner join analytics.orders r
on a.customer_id = r.customer_id
left join analytics.merchants m
on m.id = r.merchant_id
left join analytics.consumers c
on c.customer_id = a.customer_id
where active = 'true' and m.enabled = 'true' and minimum_order_value is not null
group by a.customer_id,
       c.language,
       a.is_target,
       r.origin_platform,
       c.active,
       r.delivery_address_state
''')

In [0]:
# Obtendo as varáveis numéricas:
df_numeric_variables = df_customer_profile.select("customer_id","quant_order","total_order","minimum_order_value","average_ticket")

In [0]:
#Convertendo para pandas para utilizar recursos Python
df_pandas = df_numeric_variables.toPandas()


In [0]:
df_only_variables = df_pandas[["quant_order","total_order","minimum_order_value","average_ticket"]]

In [0]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_only_variables)

In [0]:
kmeans = KMeans(n_clusters=2, random_state=42)
df_pandas["cluster"] = kmeans.fit_predict(X_scaled)

Exception ignored on calling ctypes callback function: <function _ThreadpoolInfo._find_modules_with_dl_iterate_phdr.<locals>.match_module_callback at 0x754460387240>
Traceback (most recent call last):
  File "/databricks/python/lib/python3.12/site-packages/threadpoolctl.py", line 400, in match_module_callback
    self._make_module_from_path(filepath)
  File "/databricks/python/lib/python3.12/site-packages/threadpoolctl.py", line 515, in _make_module_from_path
    module = module_class(filepath, prefix, user_api, internal_api)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.12/site-packages/threadpoolctl.py", line 606, in __init__
    self.version = self.get_version()
                   ^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.12/site-packages/threadpoolctl.py", line 646, in get_version
    config = get_config().split()
             ^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'split'


In [0]:
df_clusters = df_pandas[["customer_id", "cluster"]]

Resultados da clusterização:

In [0]:
#Quantidade de clientes por cluster
print(df_clusters["cluster"].value_counts())

0    641930
1     61544
Name: cluster, dtype: int64


In [0]:
# Características por cluster
print(df_clusters.groupby("cluster").mean())

         quant_order  total_order  minimum_order_value  average_ticket
cluster                                                               
0           1.758701    80.527619            88.602909       57.987138
1           9.144157   455.132043            94.654348       59.504087


Convertendo em um dataframe Pyspark

In [0]:
df_cluster_spark = spark.createDataFrame(df_clusters)

In [0]:
df_new_ab_test = df_cluster_spark.withColumn(
    "is_target", when(col("cluster") == 0, "control").otherwise("target")
).drop("cluster")

# Analise do Teste A/B com base no novo segmento

In [0]:
df_current_orders = spark.table("analytics.orders").join(df_new_ab_test, on="customer_id", how="inner")

In [0]:
df_new_amount_target = df_current_orders.where(col("is_target") == "target").groupBy("customer_id").agg(sum("order_total_amount").alias("order_amount"))
df_new_amount_control = df_current_orders.where(col("is_target") == "control").groupBy("customer_id").agg(sum("order_total_amount").alias("order_amount"))


In [0]:
amount_new_target = df_new_amount_target.toPandas()["order_amount"].tolist()
amount_new_control = df_new_amount_control.toPandas()["order_amount"].tolist()

In [0]:

stat, p_value = mannwhitneyu(amount_target, amount_control, alternative='two-sided')

print(f"Valor-p: {p_value:.4f}")
if p_value < 0.05:
    print("Diferença estatisticamente significativa na receita por cliente.")
else:
    print("Sem diferença significativa na receita por cliente.")


Valor-p: 0.0000
Diferença estatisticamente significativa na receita por cliente.


Nova Media do valor total de pedido por cliente e Lift Percentual para comprender o quanto o grupo B performou melhor em termos de receita em relação ao grupo A

In [0]:

mean_new_target = np.mean(amount_new_target)
mean_new_control = np.mean(amount_new_control)

lift = (mean_new_target - mean_new_control) / mean_control

print(f"Média novo Target: R$ {mean_new_target:.2f}")
print(f"Média novo Control: R$ {mean_new_control:.2f}")
print(f"Lift percentual: {lift * 100:.2f}%")

Média novo Target: R$ 624.01
Média novo Control: R$ 135.78
Lift percentual: 363.45%
