## Etapa 1: Leitura dos dados e armazenamento 

In [0]:
%sql
-- Arquitetura: Estrutura de armazenamento
CREATE CATALOG IF NOT EXISTS main;
CREATE SCHEMA IF NOT EXISTS main.raw;
CREATE VOLUME IF NOT EXISTS main.raw.ifood_case;
CREATE SCHEMA IF NOT EXISTS main.gold;

In [0]:
TARGET_PATH = "/Volumes/workspace/raw/ifood_case/"

dbutils.fs.mkdirs(TARGET_PATH)

files = [
    "offers.json",
    "profile.json",
    "transactions.json"
]

for f in files:
    dbutils.fs.cp(
        f"/Workspace/ifood-case/data/raw/{f}",
        f"{TARGET_PATH}{f}"
    )

In [0]:
# Leitura dos arquivos brutos e armazenamento
RAW_PATH = TARGET_PATH
dfs = {}

for file in dbutils.fs.ls(RAW_PATH):
    if file.name.endswith('.json'):
        nome = file.name.replace('.json', '')
        dfs[nome] = (
            spark.read
            .option("multiline", "true")
            .json(file.path)
        )

# Dataframes
bronze_offers = dfs.get('offers')
bronze_profile = dfs.get('profile')
bronze_transactions = dfs.get('transactions')


## Etapa 2: Transformação dos dados

**Profile**

Contém atributos de cerca de 17k clientes:
- age (int): idade do cliente na criação da conta
- registeredon (int): data de criação da conta
- gender (string): gênero do cliente
- id (string): id do cliente
- credit_card_limit (float): limite do cartão registrado

In [0]:
# Visualização do DataFrame
bronze_profile.show()

+---+-----------------+------+--------------------+-------------+
|age|credit_card_limit|gender|                  id|registered_on|
+---+-----------------+------+--------------------+-------------+
|118|             NULL|  NULL|68be06ca386d4c319...|     20170212|
| 55|         112000.0|     F|0610b486422d4921a...|     20170715|
|118|             NULL|  NULL|38fe809add3b4fcf9...|     20180712|
| 75|         100000.0|     F|78afa995795e4d85b...|     20170509|
|118|             NULL|  NULL|a03223e636434f42a...|     20170804|
| 68|          70000.0|     M|e2127556f4f64592b...|     20180426|
|118|             NULL|  NULL|8ec6ce2a7e7949b1b...|     20170925|
|118|             NULL|  NULL|68617ca6246f4fbc8...|     20171002|
| 65|          53000.0|     M|389bc3fa690240e79...|     20180209|
|118|             NULL|  NULL|8974fc5686fe429db...|     20161122|
|118|             NULL|  NULL|c4863c7985cf408fa...|     20170824|
|118|             NULL|  NULL|148adfcaa27d485b8...|     20150919|
| 58|     

In [0]:
# Validação: Tamanho da base conforme as orientações
bronze_profile.count()

17000

In [0]:
# Validação: Tipos de dados das colunas do DataFrame
bronze_profile

DataFrame[age: bigint, credit_card_limit: double, gender: string, id: string, registered_on: string]

In [0]:
# Transformação: Converter tipos de dados
from pyspark.sql.functions import to_date, col

silver_profile = (
    bronze_profile
    .withColumn(
        "registered_on",
        to_date(col("registered_on"), "yyyyMMdd") # Campo string para data
    )
    .withColumn(
        "credit_card_limit",
        col("credit_card_limit").cast("decimal(10,2)") # Campo double para decimal
    )
    .withColumn(
        "age",
        col("age").cast("int") # Campo bigint para int
    )
)

# Visualização dos novos tipos de dados
silver_profile

DataFrame[age: int, credit_card_limit: decimal(10,2), gender: string, id: string, registered_on: date]

In [0]:
# Transformação: Remover duplicatas
silver_profile = silver_profile.dropDuplicates()

In [0]:
# Checagem: Quais são os possíveis preenchimentos do campo Gênero? 
display(
    silver_profile.groupBy("gender")
      .count()
      .orderBy("gender")
)
# F: feminino | M: masculino | O: outro | null: não informado
# Campo já normalizado 

gender,count
,2175
F,6129
M,8484
O,212


In [0]:
# Checagem: Qual a maior e menor idade da base? Preenchimento é consistente?
from pyspark.sql.functions import min, max

silver_profile.select( 
    min("age").alias("min_age"), 
    max("age").alias("max_age")
).show()
# 118 anos não é uma idade esperada

+-------+-------+
|min_age|max_age|
+-------+-------+
|     18|    118|
+-------+-------+



In [0]:
# Checagem: Quantos clientes têm 100 anos ou mais?
from pyspark.sql.functions import col, count

silver_profile.filter(
    col("age") >= 100
).groupBy("age")\
 .count()\
 .orderBy("age")\
 .show()
# 2.192 clientes têm 100 anos ou mais
# 2.175 clientes têm 118 anos (~13%)

+---+-----+
|age|count|
+---+-----+
|100|   12|
|101|    5|
|118| 2175|
+---+-----+



In [0]:
# Checagem: Qual o limite de crédito dos clientes que têm 118 anos? E o gênero?
silver_profile \
    .filter(col("age") == 118) \
    .select("credit_card_limit", "gender") \
    .distinct() \
    .show()
# credit_card_limit e gender não foram informados

+-----------------+------+
|credit_card_limit|gender|
+-----------------+------+
|             NULL|  NULL|
+-----------------+------+



In [0]:
# Transformação: Remover clientes com 118 anos
gold_profile = silver_profile.filter(col("age") != 118)
# 2.175 clientes com 118 foram retirados

gold_profile.count()
# 14.825 clientes na base final

14825

In [0]:
# Visualização da tabela final
gold_profile.show()

+---+-----------------+------+--------------------+-------------+
|age|credit_card_limit|gender|                  id|registered_on|
+---+-----------------+------+--------------------+-------------+
| 46|         73000.00|     F|021c1940868647efb...|   2015-02-21|
| 79|         42000.00|     M|88bea58132ac47379...|   2016-11-17|
| 20|         40000.00|     M|0969a11464224731b...|   2017-11-20|
| 48|         74000.00|     F|91e68a13d7e5471cb...|   2017-09-07|
| 57|         66000.00|     M|5a5f5e54f76249b38...|   2017-08-01|
| 59|         73000.00|     M|303bff336e8449d6a...|   2017-08-07|
| 37|         58000.00|     F|2b484f47c64741889...|   2017-10-13|
| 61|         98000.00|     M|8b70ca1f5f5a45bda...|   2017-11-21|
| 54|         47000.00|     F|f50ee71f3a3c4d169...|   2017-11-23|
| 73|         46000.00|     M|3a94b435ee0647ceb...|   2018-06-10|
| 53|         75000.00|     F|bb465e90882143b6a...|   2015-08-07|
| 34|         33000.00|     F|b56a93598fa5432b9...|   2017-11-03|
| 47|     

**Offers**

Contém os ids das ofertas e metadados de cada uma delas:
- id (string): id da oferta
- offer_type (string): o tipo da oferta (BOGO, discount, informational)
- min_value (int): valor mínimo para ativação da oferta
- duration (int): duração da oferta
- discount_value (int): valor do desconto
- channels (list of strings): canais de veiculação

In [0]:
# Visualização do DataFrame
bronze_offers.show()

+--------------------+--------------+--------+--------------------+---------+-------------+
|            channels|discount_value|duration|                  id|min_value|   offer_type|
+--------------------+--------------+--------+--------------------+---------+-------------+
|[email, mobile, s...|            10|     7.0|ae264e3637204a6fb...|       10|         bogo|
|[web, email, mobi...|            10|     5.0|4d5c57ea9a6940dd8...|       10|         bogo|
|[web, email, mobile]|             0|     4.0|3f207df678b143eea...|        0|informational|
|[web, email, mobile]|             5|     7.0|9b98b8c7a33c4b65b...|        5|         bogo|
|        [web, email]|             5|    10.0|0b1e1539f2cc45b7b...|       20|     discount|
|[web, email, mobi...|             3|     7.0|2298d6c36e964ae4a...|        7|     discount|
|[web, email, mobi...|             2|    10.0|fafdcd668e3743c1b...|       10|     discount|
|[email, mobile, s...|             0|     3.0|5a8bc65990b245e5a...|        0|inf

In [0]:
# Validação: Tipos de dados das colunas do DataFrame
bronze_offers

DataFrame[channels: array<string>, discount_value: bigint, duration: double, id: string, min_value: bigint, offer_type: string]

In [0]:
# Transformação: Converter tipos de dados
from pyspark.sql.functions import to_date, col

silver_offers = (
    bronze_offers
    .withColumn(
        "discount_value",
        col("discount_value").cast("decimal(10,2)") # Campo bigint para decimal
    )
    .withColumn(
        "min_value",
        col("min_value").cast("decimal(10,2)") # Campo bigint para decimal
    )
    .withColumn(
        "duration",
        col("duration").cast("int") # Campo double para int
    )
)

# Visualização dos novos tipos de dados
silver_offers

DataFrame[channels: array<string>, discount_value: decimal(10,2), duration: int, id: string, min_value: decimal(10,2), offer_type: string]

In [0]:
# Transformação: Remover duplicatas
gold_offers = silver_offers.dropDuplicates()

In [0]:
# Visualização da tabela final
gold_offers.show()

+--------------------+--------------+--------+--------------------+---------+-------------+
|            channels|discount_value|duration|                  id|min_value|   offer_type|
+--------------------+--------------+--------+--------------------+---------+-------------+
|        [web, email]|          5.00|      10|0b1e1539f2cc45b7b...|    20.00|     discount|
|[web, email, mobi...|          2.00|      10|fafdcd668e3743c1b...|    10.00|     discount|
|[web, email, mobi...|          5.00|       5|f19421c1d4aa40978...|     5.00|         bogo|
|[web, email, mobile]|          5.00|       7|9b98b8c7a33c4b65b...|     5.00|         bogo|
|[web, email, mobi...|         10.00|       5|4d5c57ea9a6940dd8...|    10.00|         bogo|
|[web, email, mobile]|          0.00|       4|3f207df678b143eea...|     0.00|informational|
|[email, mobile, s...|         10.00|       7|ae264e3637204a6fb...|    10.00|         bogo|
|[email, mobile, s...|          0.00|       3|5a8bc65990b245e5a...|     0.00|inf

**Transactions**
Contém cerca de 300k eventos:
- event (str): descrição do evento (transação, oferta recebida, etc.)
- account_id (str): id do cliente
- time_since_test_start (int): tempo desde o começo do teste em dias (t=0)
- value (json): registra offer_id, desconto (reward) ou valor da transação

In [0]:
# Visualização do DataFrame
bronze_transactions.show(truncate=False)

+--------------------------------+--------------+---------------------+----------------------------------------------------+
|account_id                      |event         |time_since_test_start|value                                               |
+--------------------------------+--------------+---------------------+----------------------------------------------------+
|78afa995795e4d85b5d9ceeca43f5fef|offer received|0.0                  |{NULL, 9b98b8c7a33c4b65b9aebfe6a799e6d9, NULL, NULL}|
|a03223e636434f42ac4c3df47e8bac43|offer received|0.0                  |{NULL, 0b1e1539f2cc45b7b9fa7c272da2e1d7, NULL, NULL}|
|e2127556f4f64592b11af22de27a7932|offer received|0.0                  |{NULL, 2906b810c7d4411798c6938adc9daaa5, NULL, NULL}|
|8ec6ce2a7e7949b1bf142def7d0e0586|offer received|0.0                  |{NULL, fafdcd668e3743c1bb461111dcafc2a4, NULL, NULL}|
|68617ca6246f4fbc85e91a2a49552598|offer received|0.0                  |{NULL, 4d5c57ea9a6940dd891ad53e9dbe8da0, NULL, NULL}|


In [0]:
# Validação: Tamanho da base conforme as orientações
bronze_transactions.count()

306534

In [0]:
# Validação: Tipos de dados das colunas do DataFrame
bronze_transactions

DataFrame[account_id: string, event: string, time_since_test_start: double, value: struct<amount:double,offer id:string,offer_id:string,reward:double>]

In [0]:
# Análise: Qual a diferença dos campos offer_id e offer id?
from pyspark.sql.functions import col, when, count

bronze_profile_analysis = (
    bronze_transactions
        .select(
            "event",
            col("value.`offer id`").alias("offer_id_1"),
            col("value.offer_id").alias("offer_id_2")
        )
        .groupBy("event")
        .agg(
            count(when(col("offer_id_1").isNotNull(), True)).alias("qt_offer_id_1"),
            count(when(col("offer_id_2").isNotNull(), True)).alias("qt_offer_id_2")
        )
)

bronze_profile_analysis.show()
# Os campos offer_id e offer id não são preenchidos ao mesmo tempo.
    # Ajuste a ser realizado: juntar os campos em uma única coluna

+---------------+-------------+-------------+
|          event|qt_offer_id_1|qt_offer_id_2|
+---------------+-------------+-------------+
| offer received|        76277|            0|
|   offer viewed|        57725|            0|
|offer completed|            0|        33579|
|    transaction|            0|            0|
+---------------+-------------+-------------+



In [0]:
# Transformação: Extração dos valores do struct
from pyspark.sql.functions import col, coalesce

silver_transactions = bronze_transactions.select(
    "account_id",
    "event",
    "time_since_test_start",
    col("value.amount").alias("amount"),
    coalesce(
        col("value.`offer id`"),
        col("value.offer_id")
    ).alias("offer_id"), # Transformação: Junta os campos em uma única coluna
    col("value.reward").alias("reward")
)

# Visualização da nova estrutura do DataFrame
silver_transactions.show(truncate=False)

+--------------------------------+--------------+---------------------+------+--------------------------------+------+
|account_id                      |event         |time_since_test_start|amount|offer_id                        |reward|
+--------------------------------+--------------+---------------------+------+--------------------------------+------+
|78afa995795e4d85b5d9ceeca43f5fef|offer received|0.0                  |NULL  |9b98b8c7a33c4b65b9aebfe6a799e6d9|NULL  |
|a03223e636434f42ac4c3df47e8bac43|offer received|0.0                  |NULL  |0b1e1539f2cc45b7b9fa7c272da2e1d7|NULL  |
|e2127556f4f64592b11af22de27a7932|offer received|0.0                  |NULL  |2906b810c7d4411798c6938adc9daaa5|NULL  |
|8ec6ce2a7e7949b1bf142def7d0e0586|offer received|0.0                  |NULL  |fafdcd668e3743c1bb461111dcafc2a4|NULL  |
|68617ca6246f4fbc85e91a2a49552598|offer received|0.0                  |NULL  |4d5c57ea9a6940dd891ad53e9dbe8da0|NULL  |
|389bc3fa690240e798340f5a15918d5c|offer received

In [0]:
# Validação: Tipos de dados das colunas do novo DataFrame
silver_transactions

DataFrame[account_id: string, event: string, time_since_test_start: double, amount: double, offer_id: string, reward: double]

In [0]:
# Transformação: Converter tipos de dados
from pyspark.sql.functions import to_date, col

silver_transactions = (
    silver_transactions
    .withColumn(
        "reward",
        col("reward").cast("decimal(10,2)") # Campo double para decimal
    )
    .withColumn(
        "amount",
        col("amount").cast("decimal(10,2)") # Campo double para int
    )
    .withColumn(
        "time_since_test_start",
        col("time_since_test_start").cast("int") # Campo double para int
    )
)

# Visualização dos novos tipos de dados
silver_transactions

DataFrame[account_id: string, event: string, time_since_test_start: int, amount: decimal(10,2), offer_id: string, reward: decimal(10,2)]

In [0]:
gold_transactions = silver_transactions.dropDuplicates()
# Visualização da tabela final
gold_transactions.count()
# 428 registros duplicados

306106

In [0]:
# Transformação: Criação da coluna ID único das transações
from pyspark.sql.functions import expr

gold_transactions = gold_transactions.withColumn("transaction_id", expr("uuid()"))

#Visualização do novo DataFrame
gold_transactions.show()

+--------------------+--------------+---------------------+------+--------------------+------+--------------------+
|          account_id|         event|time_since_test_start|amount|            offer_id|reward|      transaction_id|
+--------------------+--------------+---------------------+------+--------------------+------+--------------------+
|66e04cebe10343a9b...|offer received|                    0|  NULL|5a8bc65990b245e5a...|  NULL|bce684e6-759a-41e...|
|fad67e30e00241c9b...|offer received|                    0|  NULL|9b98b8c7a33c4b65b...|  NULL|952265ce-d321-4d0...|
|9538885fe02043eca...|offer received|                    0|  NULL|4d5c57ea9a6940dd8...|  NULL|4cc82331-f397-4ea...|
|0b6b453772ea4c3a9...|offer received|                    0|  NULL|5a8bc65990b245e5a...|  NULL|eb818c40-5b29-4f2...|
|a885874d0aae40fca...|offer received|                    0|  NULL|fafdcd668e3743c1b...|  NULL|d1c7e7e7-6515-48e...|
|5d02eeee2ca447b38...|offer received|                    0|  NULL|3f207d

In [0]:
# Checagem: Quais são os possíveis preenchimentos do campo Evento? 
display(
    gold_transactions.groupBy("event")
      .count()
      .orderBy(col("count").desc())
)
#Campo já normalizado

event,count
transaction,138930
offer received,76277
offer viewed,57725
offer completed,33174


In [0]:
# Análise de um cliente
cliente_1 = gold_transactions.filter(col("account_id") == "9fa9ae8f57894cc9a3b8a9bbe0fc1b2f").orderBy(col("time_since_test_start"))
cliente_1.show(n=50,truncate=False)

+--------------------------------+---------------+---------------------+------+--------------------------------+------+------------------------------------+
|account_id                      |event          |time_since_test_start|amount|offer_id                        |reward|transaction_id                      |
+--------------------------------+---------------+---------------------+------+--------------------------------+------+------------------------------------+
|9fa9ae8f57894cc9a3b8a9bbe0fc1b2f|transaction    |0                    |34.56 |NULL                            |NULL  |38d6f54c-a955-4f92-9010-1fc60de14ecc|
|9fa9ae8f57894cc9a3b8a9bbe0fc1b2f|offer viewed   |0                    |NULL  |2906b810c7d4411798c6938adc9daaa5|NULL  |47da10f3-fd24-47b4-9283-c8d91cdfacfb|
|9fa9ae8f57894cc9a3b8a9bbe0fc1b2f|offer received |0                    |NULL  |2906b810c7d4411798c6938adc9daaa5|NULL  |36b20c71-ab1a-46fd-b590-535d90b3044a|
|9fa9ae8f57894cc9a3b8a9bbe0fc1b2f|offer completed|0       

In [0]:
# Visualização da tabela final
gold_transactions.show()

+--------------------+--------------+---------------------+------+--------------------+------+--------------------+
|          account_id|         event|time_since_test_start|amount|            offer_id|reward|      transaction_id|
+--------------------+--------------+---------------------+------+--------------------+------+--------------------+
|66e04cebe10343a9b...|offer received|                    0|  NULL|5a8bc65990b245e5a...|  NULL|a3c86233-6526-45b...|
|fad67e30e00241c9b...|offer received|                    0|  NULL|9b98b8c7a33c4b65b...|  NULL|7d0b32cd-7c28-435...|
|9538885fe02043eca...|offer received|                    0|  NULL|4d5c57ea9a6940dd8...|  NULL|5cf0fe5a-0d5b-4bd...|
|0b6b453772ea4c3a9...|offer received|                    0|  NULL|5a8bc65990b245e5a...|  NULL|e7233127-fef8-4d6...|
|a885874d0aae40fca...|offer received|                    0|  NULL|fafdcd668e3743c1b...|  NULL|ff959f84-fd4f-43e...|
|5d02eeee2ca447b38...|offer received|                    0|  NULL|3f207d

## Etapa 3: Construção do dataset unificado

**Objetivo**: Unir as 3 tabelas construídas acima em uma base única para analisar o histórico de transações, ofertas e clientes

In [0]:
# Transformação: Unir as tabelas finais de transações e perfil do cliente
transactions_profile = gold_transactions.join(gold_profile, gold_transactions.account_id == gold_profile.id , how="inner")

# Escolha do inner: Todas as transações têm que ter um perfil do cliente associado
    # Com essa escolha, as transações de clientes com 118 anos foram excluídas da tabela

In [0]:
# Transformação: Unir a tabela final de ofertas e a tabela unificada de transações e perfil
transactions_profile_offers = transactions_profile.join(gold_offers, transactions_profile.offer_id == gold_offers.id, "left")

# Escolha do left: Nem toda transação tem uma oferta associada 

In [0]:
# Transformação: Seleção das colunas relevantes das tabelas unidas
gold_offer_engagement = transactions_profile_offers.select ("transaction_id", #ID da transação
                                              "event", # Evento da transação
                                              "amount", #Valor da transação
                                              "reward", # Desconto
                                              "time_since_test_start", #Tempo desde o início da oferta
                                              "account_id", #ID referente ao cliente
                                              "age", # Idade
                                              "gender", # Gênero
                                              "credit_card_limit", #Limite do cartão de crédito
                                              "registered_on", # Data da criação da conta
                                              "offer_id", # ID da oferta
                                              "offer_type", # Tipo da oferta
                                              "channels", # Canais da oferta
                                              "discount_value", # Valor do desconto 
                                              "duration", # Duração da oferta
                                              "min_value" # Valor mínimo da compra
                                              )

In [0]:
# Visualização da tabela unificada das tabelas transações, perfil e ofertas
gold_offer_engagement.show()

+--------------------+--------------+------+------+---------------------+--------------------+---+------+-----------------+-------------+--------------------+-------------+--------------------+--------------+--------+---------+
|      transaction_id|         event|amount|reward|time_since_test_start|          account_id|age|gender|credit_card_limit|registered_on|            offer_id|   offer_type|            channels|discount_value|duration|min_value|
+--------------------+--------------+------+------+---------------------+--------------------+---+------+-----------------+-------------+--------------------+-------------+--------------------+--------------+--------+---------+
|7d0b32cd-7c28-435...|offer received|  NULL|  NULL|                    0|fad67e30e00241c9b...| 50|     M|         35000.00|   2014-09-17|9b98b8c7a33c4b65b...|         bogo|[web, email, mobile]|          5.00|       7|     5.00|
|5cf0fe5a-0d5b-4bd...|offer received|  NULL|  NULL|                    0|9538885fe02043e

In [0]:
# Transformação: O evento transaction não tem oferta associada, por isso é importante realizar uma transformação para associar as transações que tiveram uma oferta associada
    # Como identificar se a transação tem uma oferta associada?
        # Regras de negócio: 
            # Cliente recebeu uma oferta  
            # Cliente realizou uma transação que atende aos critérios:
                # Valor (amount) maior ou igual ao valor mínimo da oferta (min_value);
                # A diferença entre tempo do teste (time_since_test_start) da transação e do recebimento da oferta tem que ser menor ou igual a duração da oferta
            # Cliente teve um evento de oferta completada/utilizada no mesmo dia da transação (time_since_test_start)
            # O cliente pode utilizar mais de uma oferta na mesma transação

# Exemplo: DataFrame -> transações de um cliente
gold_offer_engagement.filter(
    col("account_id") == '9fa9ae8f57894cc9a3b8a9bbe0fc1b2f'
).select( # Selecionando apenas as colunas que quero visualizar
    "transaction_id",
    "event",
    "amount",
    "reward",
    "time_since_test_start",
    "offer_id",
    "discount_value",
    "duration",
    "min_value"
).orderBy(
    col("time_since_test_start"),
    col("event") # Ordenação para que o evento transaction seja o último por dia desde o início do teste
).show(n=50,truncate=False)

+------------------------------------+---------------+------+------+---------------------+--------------------------------+--------------+--------+---------+
|transaction_id                      |event          |amount|reward|time_since_test_start|offer_id                        |discount_value|duration|min_value|
+------------------------------------+---------------+------+------+---------------------+--------------------------------+--------------+--------+---------+
|b814ed68-49bf-4292-a440-ea1461c95d3e|offer completed|NULL  |2.00  |0                    |2906b810c7d4411798c6938adc9daaa5|2.00          |7       |10.00    |
|36b20c71-ab1a-46fd-b590-535d90b3044a|offer received |NULL  |NULL  |0                    |2906b810c7d4411798c6938adc9daaa5|2.00          |7       |10.00    |
|47da10f3-fd24-47b4-9283-c8d91cdfacfb|offer viewed   |NULL  |NULL  |0                    |2906b810c7d4411798c6938adc9daaa5|2.00          |7       |10.00    |
|38d6f54c-a955-4f92-9010-1fc60de14ecc|transaction   

In [0]:
from pyspark.sql.functions import col

# DataFrame referente aos eventos de ofertas recebidas (offer received)
offers_received = gold_offer_engagement.filter(col("event") == "offer received") \
    .select(
        "account_id",
        "offer_id",
        "time_since_test_start",
        "min_value",
        "duration",
        "offer_type",
        "channels",
        "discount_value"
    ).withColumnRenamed("time_since_test_start", "time_offer_received")

# DataFrame referente aos eventos de transação (transaction)
transactions = gold_offer_engagement.filter(col("event") == "transaction") \
    .select(
        "account_id",
        "transaction_id",
        "amount",
        "time_since_test_start"
    )

# DataFrame referente aos eventos de oferta completada (offer completed)
offers_completed = gold_offer_engagement.filter(col("event") == "offer completed") \
    .select(
        "account_id",
        "offer_id",
        "time_since_test_start"
    ).withColumnRenamed("time_since_test_start", "time_offer_completed")

# União dos eventos de transações com as ofertas recebidas pelo mesmo cliente
transactions_with_offers = transactions.join(
    offers_received,
    on="account_id"
).filter(
    (col("amount") >= col("min_value")) &
    ((col("time_since_test_start") - col("time_offer_received")) <= col("duration"))
)

# Filtro das transações que têm uma oferta completada no mesmo dia
transactions_with_offers_final = transactions_with_offers.join(
    offers_completed,
    on=["account_id", "offer_id"]
).filter(
    col("time_since_test_start") == col("time_offer_completed")
)

# Dataframe final
transactions_with_offers_final = transactions_with_offers_final.select(
    "account_id",
    "transaction_id",
    "offer_id",
    "amount",
    "time_since_test_start",
    "min_value",
    "duration",
    "offer_type",
    "channels",
    "discount_value"
).distinct()

# Visualização da tabela de transações associadas às ofertas utilizadas
transactions_with_offers_final.show(truncate=False)

+--------------------------------+------------------------------------+--------------------------------+------+---------------------+---------+--------+----------+----------------------------+--------------+
|account_id                      |transaction_id                      |offer_id                        |amount|time_since_test_start|min_value|duration|offer_type|channels                    |discount_value|
+--------------------------------+------------------------------------+--------------------------------+------+---------------------+---------+--------+----------+----------------------------+--------------+
|8d49501a3a90477da47e6633109bf8f9|8e69af81-aad1-40a4-8290-a8abd5ae6236|2906b810c7d4411798c6938adc9daaa5|12.16 |21                   |10.00    |7       |discount  |[web, email, mobile]        |2.00          |
|8fecd95fe7b54cc2a130e35ba5a77f48|620bb3fb-73c3-4027-8af0-87f173f0b3aa|2906b810c7d4411798c6938adc9daaa5|26.22 |10                   |10.00    |7       |discount  |[web,

In [0]:
# Visualização das transações com ofertas do cliente selecionado
transactions_with_offers_final.filter(col("account_id") == 'e7fd200255cf48e1a75c4316da554941').show(truncate=False)

+--------------------------------+------------------------------------+--------------------------------+------+---------------------+---------+--------+----------+----------------------------+--------------+
|account_id                      |transaction_id                      |offer_id                        |amount|time_since_test_start|min_value|duration|offer_type|channels                    |discount_value|
+--------------------------------+------------------------------------+--------------------------------+------+---------------------+---------+--------+----------+----------------------------+--------------+
|e7fd200255cf48e1a75c4316da554941|2d6d2b40-761f-4916-b4a5-c0d26006dd2e|2298d6c36e964ae4a3e7e9706d1fb8c2|26.61 |28                   |7.00     |7       |discount  |[web, email, mobile, social]|3.00          |
|e7fd200255cf48e1a75c4316da554941|5914377a-66a4-43fe-a4dd-996e85d0dbd0|2906b810c7d4411798c6938adc9daaa5|32.81 |23                   |10.00    |7       |discount  |[web,

In [0]:
# Criar um Dataframe único
customer_engagement = (
    gold_offer_engagement.alias("oe")
    .join(
        transactions_with_offers_final.alias("t"),
        on = "transaction_id",
        how = "left"
    )
    .select(
        "oe.transaction_id",
        "oe.event",
        "oe.amount",
        "oe.reward",
        "oe.time_since_test_start",
        "oe.account_id",
        "oe.age",
        "oe.gender",
        "oe.credit_card_limit",
        "oe.registered_on",
        coalesce("t.offer_id", "oe.offer_id").alias("offer_id"),
        coalesce("t.offer_type","oe.offer_type").alias("offer_type"),
        coalesce("t.channels","oe.channels").alias("channels"),
        coalesce("t.discount_value","oe.discount_value").alias("discount_value"),
        coalesce("t.duration","oe.duration").alias("duration"),
        coalesce("t.min_value","oe.min_value").alias("min_value")
    )
)

# Visualização da tabela final
customer_engagement.show()

+--------------------+---------------+------+------+---------------------+--------------------+---+------+-----------------+-------------+--------------------+-------------+--------------------+--------------+--------+---------+
|      transaction_id|          event|amount|reward|time_since_test_start|          account_id|age|gender|credit_card_limit|registered_on|            offer_id|   offer_type|            channels|discount_value|duration|min_value|
+--------------------+---------------+------+------+---------------------+--------------------+---+------+-----------------+-------------+--------------------+-------------+--------------------+--------------+--------+---------+
|ec8da1da-519c-425...|   offer viewed|  NULL|  NULL|                    0|6e43570e4517416cb...| 67|     M|         89000.00|   2018-06-01|9b98b8c7a33c4b65b...|         bogo|[web, email, mobile]|          5.00|       7|     5.00|
|311fe35f-d20f-458...| offer received|  NULL|  NULL|                    0|b3f30c0aaf

In [0]:
#Armazenamento na tabela Delta
customer_engagement.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.gold.customer_engagement") #Tabela final

## Etapa 4: Análise da base final

###  Características da base

In [0]:
%sql
select * from main.gold.customer_engagement limit 10

transaction_id,event,amount,reward,time_since_test_start,account_id,age,gender,credit_card_limit,registered_on,offer_id,offer_type,channels,discount_value,duration,min_value
fa012a10-3c47-4215-a7d0-96ed84f448d5,transaction,16.76,,7,6e92f1561dab4dc180245974f546f5b5,20,F,66000.0,2017-12-27,fafdcd668e3743c1bb461111dcafc2a4,discount,"List(web, email, mobile, social)",2.0,10,10.0
fdd42eda-066f-4275-b548-9e049b929790,transaction,14.51,,26,9be0d577c14841bcb61232b6497db1cd,27,M,67000.0,2017-10-18,2906b810c7d4411798c6938adc9daaa5,discount,"List(web, email, mobile)",2.0,7,10.0
cb97d220-eb8e-47d2-96e7-6b9b88c5be52,transaction,9.78,,22,2a01fc19e5c942efa247588405f43b7a,18,M,51000.0,2017-12-08,9b98b8c7a33c4b65b9aebfe6a799e6d9,bogo,"List(web, email, mobile)",5.0,7,5.0
c2e770db-0452-473c-8d0c-8deaf3b1ba76,transaction,26.91,,0,1c8cf4af93464dcaa971cfcffc2cc1e5,73,M,97000.0,2017-07-25,2298d6c36e964ae4a3e7e9706d1fb8c2,discount,"List(web, email, mobile, social)",3.0,7,7.0
1d76cf9b-6f85-4f01-9d2a-8f4bc740f269,transaction,19.0,,10,6336d3be58b24cd0b30f8ebade7247e9,82,M,92000.0,2016-02-13,fafdcd668e3743c1bb461111dcafc2a4,discount,"List(web, email, mobile, social)",2.0,10,10.0
d0bc275e-905d-4def-b187-34d1cb21ad2d,transaction,7.51,,13,e7b245f7399746faac8b4184d500a21a,53,M,64000.0,2018-02-14,9b98b8c7a33c4b65b9aebfe6a799e6d9,bogo,"List(web, email, mobile)",5.0,7,5.0
a26c3ae4-b58b-4d42-b4d4-a011e7e06b72,transaction,17.87,,22,9f7292018f174f6b8bab62353703307c,52,F,113000.0,2017-09-25,ae264e3637204a6fb9bb56bc8210ddfd,bogo,"List(email, mobile, social)",10.0,7,10.0
7e4ff8a0-8ab3-43b7-8bed-730c3ed3c8cc,transaction,15.28,,22,08d06b602c844926a037b0b253901d52,44,M,84000.0,2015-05-22,f19421c1d4aa40978ebb69ca19b0e20d,bogo,"List(web, email, mobile, social)",5.0,5,5.0
c5516009-2c4c-4dbc-b3cc-8532d16aa15f,transaction,8.77,,21,996fb8e62375461f8f6ca0cf7d2eeaa3,47,M,39000.0,2018-06-07,f19421c1d4aa40978ebb69ca19b0e20d,bogo,"List(web, email, mobile, social)",5.0,5,5.0
6953c67d-30e9-4bec-b885-a21c603c35f5,transaction,22.89,,8,109847d6d26e41999625e2db5bacf2ef,51,F,101000.0,2017-11-24,9b98b8c7a33c4b65b9aebfe6a799e6d9,bogo,"List(web, email, mobile)",5.0,7,5.0


In [0]:
%sql
select
  count(distinct transaction_id) as qtde_tl_transacoes,
  count(case when event = 'transaction' then 1 else 0 end) as qtde_tl_compras,
  count(distinct account_id) as qtde_tl_clientes
from
  main.gold.customer_engagement

qtde_tl_transacoes,qtde_tl_compras,qtde_tl_clientes
272363,274670,14825


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
-- Todos os clientes da base receberam uma oferta?
with usuarios_com_oferta as 
(select
  distinct account_id
from
  main.gold.customer_engagement
where
  event = 'offer received'),

sem_ofertas as
(select
  distinct e.account_id
from
  main.gold.customer_engagement e 
  left join usuarios_com_oferta u on u.account_id = e.account_id
where
  u.account_id is null)

select
  count(distinct o.account_id) as qtde_clientes
from
  sem_ofertas o 
-- 5 clientes não receberam ofertas

qtde_clientes
5


In [0]:
%sql
select avg(credit_card_limit) from main.raw.customer_offer_transaction --where account_id = '3a4874d8f0ef42b9a1b72294902afea9'

avg(credit_card_limit)
64815.000471


In [0]:
%sql
WITH base as
(select
  account_id,
  count(*) as qtde,
  sum(amount) as valor
from
  main.raw.customer_offer_transaction
where
  event = 'transaction'
group by
  1)

SELECT avg(qtde),avg(valor) from base

avg(qtde),avg(valor)
9.212812160694895,127.934619


In [0]:
%sql select count(*) from main.raw.offer_engagement

count(*)
306106


In [0]:
%sql select count(*) from main.raw.customer_offer_transaction

count(*)
320311


In [0]:
%sql
select 
  case when 20 > age then '1. Abaixo de 20' 
       when age between 20 and 29 then '2. Faixa 20 a 29'
       when age between 30 and 39 then '3. Faixa 30 a 39'
       when age between 40 and 49 then '4. Faixa 40 a 49'
       when age between 50 and 59 then '5. Faixa 50 a 59'
       when age between 60 and 69 then '6. Faixa 60 a 69'
       when age between 70 and 79 then '7. Faixa 70 a 79'
       when age >= 80 then '8. Acima de 80'
       else 'N/A' end as faixa,
  count(account_id) as qtde_clientes
from 
  main.raw.customer_offer_transaction
group by
  1

faixa,qtde_clientes
5. Faixa 50 a 59,67311
4. Faixa 40 a 49,44404
6. Faixa 60 a 69,57057
3. Faixa 30 a 39,31328
2. Faixa 20 a 29,27729
7. Faixa 70 a 79,33356
8. Acima de 80,55049
1. Abaixo de 20,4077


In [0]:
%sql
select
  distinct event
from
  main.raw.offer_engagement

event
offer received
offer viewed
transaction
offer completed


In [0]:
%sql
select
  --account_id, count(*)
  *
from
  main.raw.offer_engagement
where
  account_id = '24115a61df25473e84a8a03f3c98de1a'
  --event = 'offer completed'
--group by 
  --1

transaction_id,event,amount,reward,time_since_test_start,account_id,age,gender,credit_card_limit,registered_on,offer_id,offer_type,channels,discount_value,duration,min_value
213a0e5a-6d87-4b38-b3fc-80846c09d1e8,transaction,7.16,,2,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,,,,,,
214b1aa7-2b0e-4b6c-8dc0-138798632c40,transaction,12.35,,19,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,,,,,,
318914de-7c5e-46f0-b031-3de1fc7a685b,offer viewed,,,7,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,4d5c57ea9a6940dd891ad53e9dbe8da0,bogo,"List(web, email, mobile, social)",10.0,5.0,10.0
f69a7fed-a8c0-4cda-a4cd-a3e9c51cc46f,transaction,8.88,,11,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,,,,,,
f0951421-0086-4f0a-bc75-1a97a3cbc4b8,offer received,,,7,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,4d5c57ea9a6940dd891ad53e9dbe8da0,bogo,"List(web, email, mobile, social)",10.0,5.0,10.0
55c2ad56-f344-4035-a7f1-4fb61452b29b,offer completed,,10.0,25,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,ae264e3637204a6fb9bb56bc8210ddfd,bogo,"List(email, mobile, social)",10.0,7.0,10.0
610dff04-5fd4-48ef-820f-76bed0e21b50,transaction,6.77,,26,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,,,,,,
b35995ad-dce5-46cf-aaa0-5992410825a7,transaction,3.85,,7,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,,,,,,
e878262d-a320-46c8-93cb-8eb6a1646282,transaction,8.92,,24,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,,,,,,
9e6ef80a-e88f-4c1c-a70f-0b1f09f7b597,transaction,4.44,,4,24115a61df25473e84a8a03f3c98de1a,54,M,39000.0,2018-05-23,,,,,,


In [0]:
%sql
WITH offer_received AS (
    SELECT
        account_id,
        offer_id,
        offer_type,
        time_since_test_start AS received_time,
        duration,
        min_value
    FROM main.raw.offer_engagement
    WHERE event = 'offer received'
),

offer_completed AS (
    SELECT
        account_id,
        offer_id,
        time_since_test_start AS completed_time,
        reward
    FROM main.raw.offer_engagement
    WHERE event = 'offer completed'
),

transactions AS (
    SELECT
        transaction_id,
        account_id,
        time_since_test_start AS transaction_time,
        amount
    FROM main.raw.offer_engagement
    WHERE event = 'transaction'
)

SELECT
  DISTINCT
    t.transaction_id,
    t.account_id,
    t.amount,
    oc.offer_id,
    orc.offer_type,
    oc.reward,
    t.transaction_time,
    orc.received_time,
    orc.duration
FROM transactions t
JOIN offer_received orc
    ON t.account_id = orc.account_id
   AND t.transaction_time BETWEEN orc.received_time
                              AND orc.received_time + orc.duration
   AND t.amount >= orc.min_value
JOIN offer_completed oc
    ON t.account_id = oc.account_id
   AND oc.offer_id = orc.offer_id
--WHERE 
  --t.account_id = '24115a61df25473e84a8a03f3c98de1a'


transaction_id,account_id,amount,offer_id,offer_type,reward,transaction_time,received_time,duration
3fb04635-f3f1-449c-9b09-71e79c12c77e,141e082d59ef48759823ab1ddcfe1f7a,17.66,fafdcd668e3743c1bb461111dcafc2a4,discount,2.0,24,21,10
3d76479a-d8bb-43ad-9262-7af9373e9abb,872a3cbeb41a47ccabbd7408ac774672,18.96,fafdcd668e3743c1bb461111dcafc2a4,discount,2.0,20,14,10
19832009-263f-4ade-8cd4-e297d89dd96c,6a6412f9489146c5ac95a6a96ad40381,32.62,9b98b8c7a33c4b65b9aebfe6a799e6d9,bogo,5.0,20,17,7
ef5eda56-0c1a-4b18-ac64-dcb6cf45dbb4,ea1ecddf4814464cbc8fd299230b5f3a,12.73,ae264e3637204a6fb9bb56bc8210ddfd,bogo,10.0,8,7,7
31bd4ae7-4487-44c9-8e54-8a46eb4cdedd,7f09dd8ae6994415ada38d63383cf086,10.65,ae264e3637204a6fb9bb56bc8210ddfd,bogo,10.0,16,14,7
ae4d4c5e-9844-4745-8137-6e2171c067fd,6f3d8c186bc6431f87c401446dc728d4,24.33,ae264e3637204a6fb9bb56bc8210ddfd,bogo,10.0,21,17,7
483f3561-4ca0-463c-8fdd-14888a909ddc,05298f5390c7417c8dbbcbef9f081800,5.38,f19421c1d4aa40978ebb69ca19b0e20d,bogo,5.0,10,7,5
1335a458-23c9-4dd8-a8af-740db395edbc,aacf19731906456ea5347f15787e6771,37.81,0b1e1539f2cc45b7b9fa7c272da2e1d7,discount,5.0,27,17,10
b6c26ed9-6ffd-4dff-b21e-ab1e7612469c,952f9040dd1c4c64a0a1ee4e42fe23eb,25.23,2906b810c7d4411798c6938adc9daaa5,discount,2.0,15,14,7
f6f991dd-e0f3-4bc0-922c-66cdc4d91d7b,5bb68945637b469ab9b46c3853085550,11.16,2298d6c36e964ae4a3e7e9706d1fb8c2,discount,3.0,9,7,7
