
# Notebook `1_data_processing.ipynb`

## Objetivo
Este notebook é responsável por todo o pipeline de processamento de dados do case, desde a ingestão dos dados brutos em formato JSON até a criação das tabelas finais de modelagem (ABTs). O processo segue a arquitetura Medallion, transformando os dados através das camadas Bronze, Silver e Gold.

## Instruções de Uso
**Configuração Obrigatória:** Antes de executar o notebook, defina a variável `PATH_VOLUME` na célula 6. Este caminho será o diretório raiz onde todas as tabelas das camadas Bronze, Silver e Gold serão salvas e lidas. Esta etapa foi desenvolvida no Databricks Free Edition, com o uso dos `Volumes`.

```PATH_VOLUME = "/Volumes/workspace/default/case_ifood"```

## Estrutura de Saída (Output)
Ao final da execução completa, as seguintes tabelas em formato Parquet serão geradas dentro do diretório definido em PATH_VOLUME:
```
.
└── PATH_VOLUME/
    ├── bronze/
    │   ├── profile/
    │   ├── offers/
    │   └── transactions/
    ├── silver/
    │   ├── profile/
    │   ├── offers/
    │   ├── offers_received/
    │   ├── offers_viewed/
    │   └── transactions/
    └── gold/
        ├── customer_daily_features/
        ├── daily_targets/
        ├── abt_offer/
        └── abt_baseline/
```


# Bibliotecas utilizadas

In [0]:
from pyspark.sql import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T


# Funções e variáveis

In [0]:
def save_as_parquet(df, path):
    df.write.mode("overwrite").parquet(path)

In [0]:
# Path das bases raw
PATH_VOLUME = "/Volumes/workspace/default/case_ifood"


# 0) Leitura das bases *raw*

In [0]:
profile_raw = spark.read.json(f"{PATH_VOLUME}/raw/profile.json")
offers_raw = spark.read.json(f"{PATH_VOLUME}/raw/offers.json")
transactions_raw = spark.read.json(f"{PATH_VOLUME}/raw/transactions.json")


# 1) Análise Exploratória dos Dados Brutos (*Raw*)

Antes de iniciar as transformações, vamos realizar uma análise exploratória nos dados brutos para entender sua estrutura, identificar inconsistências e validar algumas hipóteses iniciais. Esta análise guiará as decisões de limpeza e enriquecimento nas próximas etapas.

**Algumas premissas importantes para o restante do projeto serão definidas nesta etapa.**


## 1.1) Tabela `profile`

**Observações e Hipóteses:**

* **Idade:** A estatística descritiva mostra uma idade máxima de 118 anos, o que é atípico. Uma contagem específica revela que esse valor aparece 2175 vezes, sugerindo ser um valor padrão para dados faltantes ou inválidos, já que outras variáveis como gênero e limite do cartão possuem esta quantidade de nulos também.

* **Gênero:** Existe uma categoria 'O' (Outro) e um número significativo de valores nulos (2175). Para simplificar o modelo, podemos agrupar 'O' e nulos em uma única categoria, como 'Desconhecido'.

* **Limite do Cartão de Crédito:** Os valores parecem muito maiores do que valores reais e a contagem de nulos é considerável. Esta variável, assim como a idade, podem estar "anonimizadas" de certa forma.

---

**Premissas:**
- **Idade 118:** tudo indica que se trata de um placeholder para um valor nulo, portanto será tratado como tal.

- **Valores de Idade e Limite de Crédito:** Os valores de idade e limite de cartão de crédito não parecem realistas. Assumimos que os dados foram anonimizados, mas a ordem foi preservada. Isso significa que, se um cliente A é mais velho que o cliente B nos dados, ele também é na vida real. Essa premissa nos permite usar essas variáveis para encontrar padrões, mesmo sem saber os valores exatos.

In [0]:
print("Schema da tabela profile:")
profile_raw.printSchema()

print("\nEstatísticas descritivas da idade:")
profile_raw.describe("age").show()

print(f"\nContagem de idade igual a 118: {profile_raw.filter(F.col('age') == 118).count()}")

print("\nDistribuição de gênero:")
profile_raw.groupBy("gender").count().show()

print("\nEstatísticas do limite do cartão de crédito:")
profile_raw.describe("credit_card_limit").show()

Schema da tabela profile:
root
 |-- age: long (nullable = true)
 |-- credit_card_limit: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: string (nullable = true)
 |-- registered_on: string (nullable = true)


Estatísticas descritivas da idade:
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|             17000|
|   mean| 62.53141176470588|
| stddev|26.738579945767242|
|    min|                18|
|    max|               118|
+-------+------------------+


Contagem de idade igual a 118: 2175

Distribuição de gênero:
+------+-----+
|gender|count|
+------+-----+
|  NULL| 2175|
|     M| 8484|
|     O|  212|
|     F| 6129|
+------+-----+


Estatísticas do limite do cartão de crédito:
+-------+-----------------+
|summary|credit_card_limit|
+-------+-----------------+
|  count|            14825|
|   mean| 65404.9915682968|
| stddev|21598.29941022947|
|    min|          30000.0|
|    max|         120000.0|
+-------+---------


## 1.2) Tabela `offers`

**Observações e Hipóteses:**

* **Tipos de Oferta:** Existem três categorias de ofertas: `BOGO` (Compre Um, Leve Outro), `discount` (desconto direto) e `informational` (informativa, sem benefício monetário direto). As categorias `BOGO` e `discount` aparecem com a mesma frequência.

* **Duração:** As ofertas têm uma duração que varia de 3 a 10 dias. A análise de quartis pode ajudar a segmentar as ofertas entre curta e longa duração.

* **Canais de Veiculação:** As ofertas são distribuídas por quatro canais: `web`, `email`, `mobile`, e `social` e todos eles são utilizados com frequência.

* **Valores Monetários:** As colunas `min_value` (valor mínimo para ativação) e `discount_value` (valor do desconto) são centrais para as ofertas do tipo `discount` e `BOGO`. A relação entre essas duas variáveis pode ser utilizada para criar uma feature de "atratividade" da oferta (ex: `discount_value` / `min_value`).

In [0]:
print("Schema da tabela offers:")
offers_raw.printSchema()

print("\nDistribuição dos tipos de oferta:")
offers_raw.groupBy("offer_type").count().show()

print("\nEstatísticas descritivas da duração das ofertas (em dias):")
offers_raw.describe("duration").show()

print("\nAnálise dos canais de veiculação:")
offers_raw.withColumn("channel", F.explode("channels")).groupBy("channel").count().show()

print("\nEstatísticas do valor mínimo para ativação e do valor do desconto:")
offers_raw.describe("min_value", "discount_value").show()

Schema da tabela offers:
root
 |-- channels: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- discount_value: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- id: string (nullable = true)
 |-- min_value: long (nullable = true)
 |-- offer_type: string (nullable = true)


Distribuição dos tipos de oferta:
+-------------+-----+
|   offer_type|count|
+-------------+-----+
|         bogo|    4|
|informational|    2|
|     discount|    4|
+-------------+-----+


Estatísticas descritivas da duração das ofertas (em dias):
+-------+------------------+
|summary|          duration|
+-------+------------------+
|  count|                10|
|   mean|               6.5|
| stddev|2.3213980461973533|
|    min|               3.0|
|    max|              10.0|
+-------+------------------+


Análise dos canais de veiculação:
+-------+-----+
|channel|count|
+-------+-----+
| mobile|    9|
|  email|   10|
| social|    6|
|    web|    8|
+-------+-----+


Estatí


## 1.3) Tabela `transactions`

**Observações e Hipóteses:**

* **Distribuição de Eventos:** A base de transações registra quatro tipos de eventos: `transaction`, `offer received`, `offer viewed`, e `offer completed`. O evento `transaction` é o mais frequente, seguido por `offer received`.

* **Simultaneidade de Eventos:** Uma observação relevante surge ao analisar eventos que ocorrem para o mesmo cliente no mesmo instante de tempo (`time_since_test_start`). Nota-se uma alta frequência da combinação dos eventos `['offer completed', 'transaction']`.

* **Hipótese de Vinculação:** Esta coocorrência sugere uma forte hipótese de que, para cada oferta completada, um evento de transação correspondente é gerado.

---

**Premissas:**

* **Utilização de cupom:** a coluna `value` não mostra se a transação usou cupom, mas todo `offer completed` ocorre junto de uma única transação do cliente. Assim, vou considerar que essa transação usou o cupom correspondente.


In [0]:
print("Schema da tabela transactions:")
transactions_raw.printSchema()

print("Contagem por tipo de evento:")
transactions_raw.groupBy('event').count().orderBy(F.desc('count')).show()

event_pairs_df = transactions_raw.groupBy("account_id", "time_since_test_start").agg(
    F.sort_array(F.collect_set("event")).alias("events_at_same_time")
)

print("\nCombinações de eventos que ocorrem simultaneamente:")
event_pairs_df.groupBy("events_at_same_time").count().orderBy(F.desc('count')).show(truncate=False)

count_multiple = (
    transactions_raw
    .filter(F.col('event').isin(['offer completed', 'transaction']))
    .groupBy("account_id", "time_since_test_start")
    .agg(
        F.sum(F.when(F.col("event") == "transaction", 1).otherwise(0)).alias("transaction_count"),
        F.sum(F.when(F.col("event") == "offer completed", 1).otherwise(0)).alias("offer_completed_count")
    )
    .filter((F.col("offer_completed_count") > 0))
    .select('transaction_count')
    .distinct()
)

print(f"Quantidade de transações que ocorrem no mesmo instante que uma 'offer completed':")
count_multiple.show()

Schema da tabela transactions:
root
 |-- account_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- time_since_test_start: double (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- amount: double (nullable = true)
 |    |-- offer id: string (nullable = true)
 |    |-- offer_id: string (nullable = true)
 |    |-- reward: double (nullable = true)

Contagem por tipo de evento:
+---------------+------+
|          event| count|
+---------------+------+
|    transaction|138953|
| offer received| 76277|
|   offer viewed| 57725|
|offer completed| 33579|
+---------------+------+


Combinações de eventos que ocorrem simultaneamente:
+------------------------------------------------------------+------+
|events_at_same_time                                         |count |
+------------------------------------------------------------+------+
|[transaction]                                               |101895|
|[offer received]                                      


# 2) Estruturação de Dados (*Raw → Bronze*)

O objetivo desta etapa é converter os dados brutos (`JSON`) para um formato estruturado e otimizado (`parquet`), estabelecendo uma base de dados limpa e rastreável para as fases de processamento subsequentes.

As operações principais são:

* **Conversão para Parquet:** Os dados são persistidos em Parquet, um formato colunar que otimiza a performance de leitura e a compressão em ambientes de processamento distribuído como o Spark.

* **Padronização do Schema:** As colunas são convertidas para tipos de dados consistentes (e.g., `string`, `double`, `date`). Esta padronização é essencial para garantir a integridade dos dados e prevenir erros nas próximas etapas.

* **Rastreabilidade da Ingestão:** Uma coluna `execution_date` é inserida para registrar a data da carga, permitindo o versionamento e a governança sobre os dados ingeridos.

In [0]:
profile_bronze = (
    profile_raw
    .withColumn("age", F.col("age").cast("int"))
    .withColumn("credit_card_limit", F.col("credit_card_limit").cast("double"))
    .withColumn("registered_on", F.to_date(F.col("registered_on"), format='yyyyMMdd').cast("date"))
    .withColumn("id", F.col("id").cast("string"))
    .withColumn("gender", F.col("gender").cast("string"))
    .withColumn('execution_date', F.current_date())
    .withColumnRenamed('id', 'account_id')
    .select('account_id', 'registered_on', 'age', 'gender', 'credit_card_limit', 'execution_date')
)

offers_bronze = (
    offers_raw
    .withColumn('channels', F.col('channels').cast(T.ArrayType(T.StringType())))
    .withColumn('discount_value', F.col('discount_value').cast("double"))
    .withColumn('duration', F.col('duration').cast("double"))
    .withColumn('id', F.col('id').cast("string"))
    .withColumn('min_value', F.col('min_value').cast("double"))
    .withColumn('offer_type', F.col('offer_type').cast("string"))
    .withColumn('execution_date', F.current_date())
    .withColumnRenamed('id', 'offer_id')
    .select('offer_id', 'channels', 'duration', 'discount_value', 'min_value', 'offer_type', 'execution_date')
)

transactions_bronze = (
    transactions_raw
    .withColumn('time_since_test_start', F.col('time_since_test_start').cast("double"))
    .withColumn('event', F.col('event').cast("string"))
    .withColumn('account_id', F.col('account_id').cast("string"))
    .withColumn('amount', F.col('value.amount').cast("double"))
    .withColumn('offer id', F.col('value.offer id').cast("string"))
    .withColumn('offer_id', F.col('value.offer_id').cast("string"))
    .withColumn('reward', F.col('value.reward').cast("double"))
    .withColumn('execution_date', F.current_date())
    .select('event', 'account_id', 'time_since_test_start', 'amount', 'offer_id', 'offer id', 'reward', 'execution_date')
)

In [0]:
save_as_parquet(profile_bronze, f"{PATH_VOLUME}/bronze/profile")
save_as_parquet(offers_bronze, f"{PATH_VOLUME}/bronze/offers")
save_as_parquet(transactions_bronze, f"{PATH_VOLUME}/bronze/transactions")


## 2.1) Checagem dos dados


### 2.1.1) Tabela `bronze/profile`

In [0]:
total_profiles = profile_bronze.count()
print(f"Total de cadastros na tabela: {total_profiles:,}")

total_unique_profiles = profile_bronze.select('account_id').distinct().count()
print(f"Total de cadastros unicos na tabela: {total_unique_profiles:,}")

null_ids = profile_bronze.filter(F.col("account_id").isNull()).count()
print(f"Número de cadastros nulos: {null_ids}")

null_dates = profile_bronze.filter(F.col("registered_on").isNull()).count()
print(f"Número de datas de cadastro nulos: {null_dates}")

date_range = profile_bronze.select(
    F.min("registered_on").alias("min_registration_date"),
    F.max("registered_on").alias("max_registration_date")
).show()

profile_bronze.describe("age").show()
profile_bronze.groupBy("gender").count().show()
profile_bronze.describe("credit_card_limit").show()

Total de cadastros na tabela: 17,000
Total de cadastros unicos na tabela: 17,000
Número de cadastros nulos: 0
Número de datas de cadastro nulos: 0
+---------------------+---------------------+
|min_registration_date|max_registration_date|
+---------------------+---------------------+
|           2013-07-29|           2018-07-26|
+---------------------+---------------------+

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|             17000|
|   mean| 62.53141176470588|
| stddev|26.738579945767242|
|    min|                18|
|    max|               118|
+-------+------------------+

+------+-----+
|gender|count|
+------+-----+
|  NULL| 2175|
|     M| 8484|
|     O|  212|
|     F| 6129|
+------+-----+

+-------+-----------------+
|summary|credit_card_limit|
+-------+-----------------+
|  count|            14825|
|   mean| 65404.9915682968|
| stddev|21598.29941022947|
|    min|          30000.0|
|    max|         120000.0|
+-------+------


### 2.1.2) Tabela `bronze/transactions`

In [0]:
(
    transactions_bronze
    .groupby('event')
    .agg(
        F.count('*').alias('total_events'),
        F.countDistinct('account_id').alias('total_customers'),
        F.min(F.col('time_since_test_start')).alias('min_time'),
        F.max(F.col('time_since_test_start')).alias('max_time'),
        F.min(F.col('amount')).alias('min_amount'),
        F.max(F.col('amount')).alias('max_amount'),
        F.avg(F.when(F.col('offer_id').isNull(), 1).otherwise(0)).alias('offer_id_nulls'),
        F.avg(F.when(F.col('offer id').isNull(), 1).otherwise(0)).alias('offer id_nulls'),
        F.avg(F.when(F.col('reward').isNull(), 1).otherwise(0)).alias('reward_nulls'),
        F.min(F.col('reward')).alias('min_reward'),
        F.max(F.col('reward')).alias('max_reward'),
    )
).show()

+---------------+------------+---------------+--------+--------+----------+----------+--------------+--------------+------------+----------+----------+
|          event|total_events|total_customers|min_time|max_time|min_amount|max_amount|offer_id_nulls|offer id_nulls|reward_nulls|min_reward|max_reward|
+---------------+------------+---------------+--------+--------+----------+----------+--------------+--------------+------------+----------+----------+
| offer received|       76277|          16994|     0.0|    24.0|      NULL|      NULL|           1.0|           0.0|         1.0|      NULL|      NULL|
|   offer viewed|       57725|          16834|     0.0|   29.75|      NULL|      NULL|           1.0|           0.0|         1.0|      NULL|      NULL|
|offer completed|       33579|          12774|     0.0|   29.75|      NULL|      NULL|           0.0|           1.0|         0.0|       2.0|      10.0|
|    transaction|      138953|          16578|     0.0|   29.75|      0.05|   1062.28|  


# 3) Enriquecimento e Regras de Negócio (*Bronze → Silver*)

O objetivo da camada Silver é transformar os dados limpos da camada Bronze em tabelas enriquecidas e prontas para a engenharia de features. Nesta fase, aplicamos as regras de negócio e as premissas levantadas durante a Análise Exploratória de Dados.

As principais transformações incluem:
- **Limpeza de Dados:** Tratamento de valores inconsistentes ou nulos com base em regras ou premissas predefinidas.
- **Engenharia de Features Básica:** Criação de novas colunas que agregam valor e significado de negócio aos dados brutos.
- **Reorganização Estrutural:** A tabela centralizada de eventos da camada Bronze é segmentada em tabelas especializadas para maior clareza e eficiência no processamento.

Ao final desta etapa, teremos cinco tabelas refinadas: `profile`, `offers`, `offers_received`, `offers_viewed` e `transactions`.

In [0]:
profile_bronze = spark.read.parquet(f"{PATH_VOLUME}/bronze/profile")
offers_bronze = spark.read.parquet(f"{PATH_VOLUME}/bronze/offers")
transactions_bronze = spark.read.parquet(f"{PATH_VOLUME}/bronze/transactions")


## 3.1) Tabela `silver/profile`

**Objetivo:** Aplicar as regras de limpeza identificadas na EDA e enriquecer os dados cadastrais do cliente.

**Premissas e Ações:**
- **Idade:** Com base na premissa de que a idade `118` é um valor inválido, ela é substituída por `None`. A coluna é renomeada para `age_at_registration` para maior clareza, já que não conseguiremos relacionar a idade do cliente no momento da transação por ser um tempo relativo.

- **Gênero:** Para simplificar a feature, os valores `O` e `null` são agrupados em uma única categoria, `Desconhecido`.

- **Criação de Feature (*Tenure*):** É criada a coluna `relative_customer_tenure_days`, que calcula o tempo de vida do cliente em dias (tenure) em relação à data de cadastro mais recente na base. Como todos os clientes estão na base de transações, faz sentido assumirmos que as transações iniciaram em algum momento **após** o cadastro do cliente mais recente. Então, apesar de não sabermos o tempo de vida do cliente no momento da transação, podemos ter uma ordenação fiel de quais clientes tem mais tempo de casa.

In [0]:
# Data de cadastro mais recente (o tenure é calculado em cima desta data)
max_reg_date = profile_bronze.select(F.max("registered_on")).first()[0]

profile_silver = (
    profile_bronze
    .withColumn('age', F.when(F.col('age') == 118, None).otherwise(F.col('age')))
    .withColumn('gender', F.when((F.col('gender') == 'O') | (F.col('gender').isNull()), 'Desconhecido').otherwise(F.col('gender')))
    .withColumn("relative_customer_tenure_days", F.datediff(F.lit(max_reg_date), F.col("registered_on")))
    .withColumnRenamed('age', 'age_at_registration')
    .select('account_id', 'age_at_registration', 'gender', 'credit_card_limit', 'relative_customer_tenure_days', 'execution_date')
)


## 3.2) Tabela `silver/offers`

**Objetivo:** Transformar os metadados das ofertas em features mais informativas e adequadas para o uso em modelos de machine learning.

**Premissas e Ações:**
- **Criação de Feature (Taxa de Incentivo):** É calculada a coluna `incentive_ratio` (`discount_value / min_value`) para quantificar a "atratividade" de uma oferta de desconto.

- **Categorização de Features:** São criadas colunas categóricas como `duration_range` (classificando a duração da oferta) e `incentive_type` (um resumo melhor do que a `incentive_ratio`). A premissa é que estas categorias têm maior poder preditivo do que os valores numéricos contínuos, além de gerar menos combinações possíveis na nossa simulação final.

- **One-Hot Encoding de Canais:** A coluna `channels`, que é um array, é desmembrada em colunas binárias (`channel_web`, `channel_email`, etc.). Isso transforma os canais em um formato numérico que pode ser diretamente utilizado pelo modelo.

In [0]:
offers_silver = (
    offers_bronze
    .withColumn('duration_range', F.when(F.col('duration') <= 5, 'Até 5 dias').otherwise('Acima de 5 dias'))
    .withColumn("channel_web", F.array_contains(F.col("channels"), "web").cast("integer"))
    .withColumn("channel_email", F.array_contains(F.col("channels"), "email").cast("integer"))
    .withColumn("channel_mobile", F.array_contains(F.col("channels"), "mobile").cast("integer"))
    .withColumn("channel_social", F.array_contains(F.col("channels"), "social").cast("integer"))
    .withColumn(
        "incentive_ratio",
        F.when(F.col("min_value") > 0, F.col("discount_value") / F.col("min_value")).otherwise(0)
    )
    .withColumn(
        'incentive_type',
        F.when(F.col('offer_type') == 'bogo', 'BOGO')
        .when(F.col('offer_type') == 'informational', 'INFO')
        .when(F.col('incentive_ratio') <= 0.25, 'Até 25%')
        .when(F.col('incentive_ratio') <= 0.5, 'Até 50%')
        .when(F.col('incentive_ratio') <= 0.75, 'Até 75%')
        .when(F.col('incentive_ratio') <= 1, 'Até 100%')
    )
    .select('offer_id', 'duration', 'duration_range', 'channel_web', 'channel_email', 'channel_mobile', 'channel_social', 'discount_value', 'min_value', 'offer_type', 'incentive_type', 'execution_date')
)


## 3.3) Tabelas `silver/offers_received` e `silver/offers_viewed`

**Objetivo:** Isolar os eventos de recebimento e visualização de ofertas em tabelas dedicadas para simplificar a análise de funil e a criação de features de comportamento.

**Premissa e Ação:**
- A premissa central é que segmentar a tabela de eventos original melhora a organização dos dados. A tabela `transactions_bronze` é filtrada para criar duas tabelas distintas:
    - `offers_received`: Registra o momento em que cada cliente **recebeu** uma oferta.
    - `offers_viewed`: Registra o momento em que cada cliente **visualizou** uma oferta.

**Importante notar que para ambos os eventos, não há preenchimento de `amount` nem `reward`, apenas o `offer id`.**

In [0]:
offers_received_silver = (
    transactions_bronze
    .filter(F.col('event') == 'offer received')
    .select('account_id', 'time_since_test_start', F.col('offer id').alias('offer_id'), 'execution_date')
)

In [0]:
offers_viewed_silver = (
    transactions_bronze
    .filter(F.col('event') == 'offer viewed')
    .select('account_id', 'time_since_test_start', F.col('offer id').alias('offer_id'), 'execution_date')
)


## 3.4) Tabela `silver/transactions`

**Objetivo:** Criar uma tabela consolidada de transações, vinculando de forma explícita as compras ao uso de ofertas.

**Premissa e Ação:**
- **Premissa Chave:** A solução se baseia na premissa validada na EDA de que os eventos `offer completed` e `transaction` coocorrem no tempo. Isso nos permite unificá-los em um único registro.

- **Ação:** O código agrupa os dados por `account_id` e `time_since_test_start`. Em seguida, utiliza funções de agregação para consolidar as informações, preenchendo o `offer_id` e o `reward` (do evento `offer completed`) na mesma linha do `amount` da transação. O resultado é uma visão clara de cada transação, indicando se ela foi ou não impulsionada por uma oferta.

**Na EDA também foi validado que não existe o evento `offer completed` sem uma `transaction` envolvida.**

In [0]:
transactions_silver = (
    transactions_bronze
    .filter(F.col('event').isin(['offer completed', 'transaction']))
    .groupby('account_id', 'time_since_test_start')
    .agg(
        F.max('amount').alias('amount'),
        F.first('offer_id', ignorenulls=True).alias('offer_id'),
        F.max('reward').alias('reward'),
        F.max('execution_date').alias('execution_date')
    )
)


## 3.5) Salvando bases na camada *Silver*

In [0]:
save_as_parquet(profile_silver, f"{PATH_VOLUME}/silver/profile")
save_as_parquet(offers_silver, f"{PATH_VOLUME}/silver/offers")
save_as_parquet(offers_received_silver, f"{PATH_VOLUME}/silver/offers_received")
save_as_parquet(offers_viewed_silver, f"{PATH_VOLUME}/silver/offers_viewed")
save_as_parquet(transactions_silver, f"{PATH_VOLUME}/silver/transactions")


# 4) Engenharia de Features (*Silver → Gold*)

Nesta etapa, transformamos os dados enriquecidos da camada Silver em uma *Feature Store* na camada Gold. O objetivo é criar um conjunto abrangente de variáveis que descrevam o comportamento histórico de cada cliente em um determinado dia.

A estratégia central se baseia na criação de uma tabela `spine` (`account_id`, `day`), que serve como base para a construção de features de janela móvel. Essas features agregarão o comportamento do cliente em diferentes janelas de tempo passadas (últimos 5, 7 e 10 dias), capturando padrões de Recência, Frequência e Valor (RFM), além de outras métricas comportamentais.

In [0]:
profile = spark.read.parquet(f"{PATH_VOLUME}/silver/profile")
offers = spark.read.parquet(f"{PATH_VOLUME}/silver/offers")
offers_received = spark.read.parquet(f"{PATH_VOLUME}/silver/offers_received")
offers_viewed = spark.read.parquet(f"{PATH_VOLUME}/silver/offers_viewed")
transactions = spark.read.parquet(f"{PATH_VOLUME}/silver/transactions")


## 4.1) Enriquecimento da Tabela de Transações

**Objetivo:** Consolidar informações relevantes em nível de transação para facilitar a criação de features agregadas.

**Ações Executadas:**
- **Cálculo do Tempo de Conversão:** Para cada transação com oferta, é calculada a feature `time_from_receive_to_use`, que mede o tempo decorrido desde o recebimento da oferta até a sua utilização. A lógica associa a transação à oferta correspondente mais recente recebida pelo cliente.

- **Criação do Valor Bruto do Pedido:** É criada a feature `gross_order_value`, que representa o valor total da compra antes do desconto (`amount` + `reward`).

- **Junção com Metadados da Oferta:** A tabela é enriquecida com os detalhes da oferta utilizada, juntando-se à tabela `offers_silver`.

In [0]:
used_offer_transactions = transactions.filter(F.col("offer_id").isNotNull())

t = used_offer_transactions.alias("t")
r = offers_received.alias("r")

matched_receive_to_use = t.join(
    r,
    on = (
        (t.account_id == r.account_id) &
        (t.offer_id == r.offer_id) &
        (t.time_since_test_start >= r.time_since_test_start)
    ),
    how = "left"
)

window_spec_rtu = Window.partitionBy(
    "t.account_id",
    "t.time_since_test_start",
    "t.offer_id"
).orderBy(F.desc("r.time_since_test_start"))

time_to_use_from_receive = (
    matched_receive_to_use
    .withColumn("rank", F.row_number().over(window_spec_rtu))
    .filter(F.col("rank") == 1)
    .withColumn(
        "time_from_receive_to_use",
        F.col("t.time_since_test_start") - F.col("r.time_since_test_start")
    )
    .select("t.account_id", "t.time_since_test_start", "time_from_receive_to_use")
)

transactions_enriched = (
    transactions
    .withColumn("gross_order_value", F.col("amount") + F.coalesce(F.col("reward"), F.lit(0)))
    .withColumn("used_offer", F.when(F.col("offer_id").isNotNull(), 1).otherwise(0))
    .join(
        time_to_use_from_receive,
        on = ['account_id', 'time_since_test_start'],
        how = "left"
    )
    .join(offers, on='offer_id', how="left")
    .drop('execution_date')
    .select('account_id', 'time_since_test_start', 'gross_order_value', 'amount', 'reward', 'used_offer', 'offer_id', 'duration', 'duration_range', 'channel_web', 'channel_email', 'channel_mobile', 'channel_social', 'discount_value', 'min_value', 'offer_type', 'incentive_type', 'time_from_receive_to_use')
    .withColumn('execution_date', F.current_date())
)


## 4.2) Criação da Tabela 'Spine'

**Objetivo:** Criar a estrutura de dados base para a engenharia de features temporais.

**Metodologia:**
- A tabela 'spine' é o resultado de um `crossJoin` entre todos os `account_id` únicos e uma série contínua de dias do período de análise.

- A sua função é garantir a existência de um registro para cada cliente em cada dia, mesmo em dias sem atividade. Isso é fundamental para o cálculo preciso de features de recência ("dias desde a última compra") e para o correto preenchimento de valores nulos em janelas de tempo.

- A base `transactions` vai até o dia 29.75, o que indica que o último dia disponível que teremos para criação de features será o 29, por isso a escolha.

In [0]:
TOTAL_DAYS = 30
distinct_accounts = profile.select("account_id").distinct()
days_df = spark.range(0, TOTAL_DAYS).toDF("day")

spine_df = distinct_accounts.crossJoin(days_df)


## 4.3) Features Históricas

Nesta seção, o foco é transformar o histórico de interações de cada cliente em um conjunto de features que descrevem seu comportamento ao longo do tempo. A estratégia consiste em analisar três tipos de comportamento principais: a **visualização de ofertas**, o **recebimento de ofertas** e as **transações realizadas**.

Para cada um desses comportamentos, serão criadas métricas que quantificam a **Frequência** (quantas vezes?), a **Recência** (há quanto tempo?) e a **Preferência/Valor** do cliente (quais tipos de interação e quanto foi gasto?).

Ao final, todas as features comportamentais geradas serão unificadas em uma única tabela (*Feature Store*). Esta tabela será enriquecida com os dados de perfil do cliente e com taxas de conversão do funil, resultando em um dataset completo e pronto para a etapa de modelagem.


In [0]:
WINDOWS = [5, 7, 10]


### 4.3.1) Visualização de Ofertas

**Exemplos de Features Criadas:**
- **Frequência:** Contagem total e de ofertas únicas visualizadas na janela (`count_offers_viewed_last_5d`).
- **Preferência:** Contagem de visualizações por tipo de oferta (duração, incentivo, etc.).
- **Recência:** Dias desde a última visualização (`days_since_last_view`).
- **Moda:** Tipo de oferta mais frequentemente visualizado na janela.

In [0]:
# Adiciona informações da oferta na tabela de ofertas visualizadas
offers_viewed_details = offers_viewed.join(offers, "offer_id", "left")

# Agregamos as informações por dia
daily_view_summary_df = (
    offers_viewed_details
    .withColumn("day", F.floor(F.col("time_since_test_start")).cast("integer"))
    .groupBy("account_id", "day")
    .agg(
        F.count("*").alias("offers_viewed_daily"),
        F.sum(F.when(F.col("offer_type") == "informational", 1).otherwise(0)).alias("info_offers_viewed_daily"),
        F.collect_set("offer_id").alias("unique_offers_viewed_daily"),
        F.sum(F.when(F.col("duration_range") == "Até 5 dias", 1).otherwise(0)).alias("short_offers_viewed_daily"),
        F.sum(F.when(F.col("duration_range") == "Acima de 5 dias", 1).otherwise(0)).alias("long_offers_viewed_daily"),
        F.sum(F.when(F.col("incentive_type") == "BOGO", 1).otherwise(0)).alias("bogo_offers_viewed_daily"),
        F.sum(F.when(F.col("incentive_type") == "Até 25%", 1).otherwise(0)).alias("low_inc_offers_viewed_daily"),
        F.sum(F.when(F.col("incentive_type") == "Até 50%", 1).otherwise(0)).alias("high_inc_offers_viewed_daily")
    )
)

# Pegamos a spine para considerar todas as datas, não só os dias onde houve visualização
features_df = spine_df.join(daily_view_summary_df, on=["account_id", "day"], how="left")

# Construção de features que olham para os dias anteriores
window_specs = {f"{w}d": Window.partitionBy("account_id").orderBy("day").rangeBetween(-w, -1) for w in WINDOWS}
for w_name, w_spec in window_specs.items():
    features_df = (
        features_df
        .withColumn(f"count_offers_viewed_last_{w_name}", F.sum("offers_viewed_daily").over(w_spec))
        .withColumn(f"count_info_offers_viewed_last_{w_name}", F.sum("info_offers_viewed_daily").over(w_spec))
        .withColumn(
            f"count_unique_offers_viewed_last_{w_name}",
            F.size(F.array_distinct(F.flatten(F.collect_list("unique_offers_viewed_daily").over(w_spec))))
        )
        .withColumn(f"count_short_offers_viewed_last_{w_name}", F.sum("short_offers_viewed_daily").over(w_spec))
        .withColumn(f"count_long_offers_viewed_last_{w_name}", F.sum("long_offers_viewed_daily").over(w_spec))
        .withColumn(f"count_bogo_offers_viewed_last_{w_name}", F.sum("bogo_offers_viewed_daily").over(w_spec))
        .withColumn(f"count_low_inc_offers_viewed_last_{w_name}", F.sum("low_inc_offers_viewed_daily").over(w_spec))
        .withColumn(f"count_high_inc_offers_viewed_last_{w_name}", F.sum("high_inc_offers_viewed_daily").over(w_spec))
    )

# Feature de recência
last_view_day_window = Window.partitionBy("account_id").orderBy("day").rowsBetween(Window.unboundedPreceding, -1)
features_df = (
    features_df
    .withColumn(
        "last_view_day",
        F.last(F.when(F.col("offers_viewed_daily") > 0, F.col("day")), ignorenulls=True).over(last_view_day_window)
    )
    .withColumn(
        "days_since_last_view",
        F.col("day") - F.col("last_view_day")
    )
)

# Features de moda
for w in WINDOWS:
    w_name = f"{w}d"
    features_df = (
        features_df
        .withColumn(
            f"most_frequent_duration_viewed_last_{w_name}",
            F.when(F.col(f"count_short_offers_viewed_last_{w_name}") > F.col(f"count_long_offers_viewed_last_{w_name}"), "Até 5 dias")
             .when(F.col(f"count_long_offers_viewed_last_{w_name}") > F.col(f"count_short_offers_viewed_last_{w_name}"), "Acima de 5 dias")
             .otherwise("Empate/Nenhum")
        )
        .withColumn(
            f"most_frequent_incentive_viewed_last_{w_name}",
            F.when((F.col(f"count_bogo_offers_viewed_last_{w_name}") >= F.col(f"count_low_inc_offers_viewed_last_{w_name}")) & (F.col(f"count_bogo_offers_viewed_last_{w_name}") >= F.col(f"count_high_inc_offers_viewed_last_{w_name}")), "BOGO")
             .when((F.col(f"count_low_inc_offers_viewed_last_{w_name}") > F.col(f"count_bogo_offers_viewed_last_{w_name}")) & (F.col(f"count_low_inc_offers_viewed_last_{w_name}") >= F.col(f"count_high_inc_offers_viewed_last_{w_name}")), "Até 25%")
             .when((F.col(f"count_high_inc_offers_viewed_last_{w_name}") > F.col(f"count_bogo_offers_viewed_last_{w_name}")) & (F.col(f"count_high_inc_offers_viewed_last_{w_name}") > F.col(f"count_low_inc_offers_viewed_last_{w_name}")), "Até 50%")
             .otherwise("Nenhum")
        )
    )

# Retirando features que não serão usadas na modelagem
interim_cols = [c for c in features_df.columns if "_daily" in c or "last_view_day" in c]
features_viewed_df = features_df.drop(*interim_cols)

# Features de contagem podem ter os nulos imputados com 0
count_cols = [c for c in features_viewed_df.columns if "count" in c]
features_viewed_df = features_viewed_df.fillna(0, subset=count_cols)


### 4.3.2) Recebimento de Ofertas

**Exemplos de Features Criadas:**
- **Frequência:** Contagem total e de ofertas únicas recebidas na janela (`count_offers_received_last_5d`).
- **Preferência:** Contagem de recebimentos por tipo de oferta (duração, incentivo, etc.).
- **Recência:** Dias desde o último recebimento (`days_since_last_offer_received`).
- **Moda:** Tipo de oferta mais frequentemente recebida na janela.

In [0]:
# Adiciona informações da oferta na tabela de ofertas recebidas
offers_received_details = offers_received.join(offers, on="offer_id", how="left")

# Agregamos as informações por dia
daily_received_summary_df = (
    offers_received_details
    .withColumn("day", F.floor(F.col("time_since_test_start")).cast("integer"))
    .groupBy("account_id", "day")
    .agg(
        F.count("*").alias("offers_received_on_day"),
        F.sum(F.when(F.col("offer_type") == "informational", 1).otherwise(0)).alias("info_offers_received_daily"),
        F.collect_set("offer_id").alias("unique_offers_received_daily"),
        F.sum(F.when(F.col("duration_range") == "Até 5 dias", 1).otherwise(0)).alias("short_offers_received_daily"),
        F.sum(F.when(F.col("duration_range") == "Acima de 5 dias", 1).otherwise(0)).alias("long_offers_received_daily"),
        F.sum(F.when(F.col("incentive_type") == "BOGO", 1).otherwise(0)).alias("bogo_offers_received_daily"),
        F.sum(F.when(F.col("incentive_type") == "Até 25%", 1).otherwise(0)).alias("low_inc_offers_received_daily"),
        F.sum(F.when(F.col("incentive_type") == "Até 50%", 1).otherwise(0)).alias("high_inc_offers_received_daily")
    )
)

# Pegamos a spine para considerar todas as datas, não só os dias onde houve recebimento
features_df = spine_df.join(daily_received_summary_df, on=["account_id", "day"], how="left")

# Construção de features que olham para os dias anteriores
window_specs = {f"{w}d": Window.partitionBy("account_id").orderBy("day").rangeBetween(-w, -1) for w in WINDOWS}
for w_name, w_spec in window_specs.items():
    features_df = (
        features_df
        .withColumn(f"count_offers_received_last_{w_name}", F.sum("offers_received_on_day").over(w_spec))
        .withColumn(f"count_info_offers_received_last_{w_name}", F.sum("info_offers_received_daily").over(w_spec))
        .withColumn(
            f"count_unique_offers_received_last_{w_name}",
            F.size(F.array_distinct(F.flatten(F.collect_list("unique_offers_received_daily").over(w_spec))))
        )
        .withColumn(f"count_short_offers_received_last_{w_name}", F.sum("short_offers_received_daily").over(w_spec))
        .withColumn(f"count_long_offers_received_last_{w_name}", F.sum("long_offers_received_daily").over(w_spec))
        .withColumn(f"count_bogo_offers_received_last_{w_name}", F.sum("bogo_offers_received_daily").over(w_spec))
        .withColumn(f"count_low_inc_offers_received_last_{w_name}", F.sum("low_inc_offers_received_daily").over(w_spec))
        .withColumn(f"count_high_inc_offers_received_last_{w_name}", F.sum("high_inc_offers_received_daily").over(w_spec))
    )

# Feature de recência
last_received_day_window = Window.partitionBy("account_id").orderBy("day").rowsBetween(Window.unboundedPreceding, -1)
features_df = (
    features_df
    .withColumn(
        "last_received_day",
        F.last(F.when(F.col("offers_received_on_day") > 0, F.col("day")), ignorenulls=True).over(last_received_day_window)
    )
    .withColumn(
        "days_since_last_offer_received",
        F.col("day") - F.col("last_received_day")
    )
)

# Features de moda
for w in WINDOWS:
    w_name = f"{w}d"
    features_df = (
        features_df
        .withColumn(
            f"most_frequent_duration_received_last_{w_name}",
            F.when(F.col(f"count_short_offers_received_last_{w_name}") > F.col(f"count_long_offers_received_last_{w_name}"), "Até 5 dias")
             .when(F.col(f"count_long_offers_received_last_{w_name}") > F.col(f"count_short_offers_received_last_{w_name}"), "Acima de 5 dias")
             .otherwise("Empate/Nenhum")
        )
        .withColumn(
            f"most_frequent_incentive_received_last_{w_name}",
            F.when((F.col(f"count_bogo_offers_received_last_{w_name}") >= F.col(f"count_low_inc_offers_received_last_{w_name}")) & (F.col(f"count_bogo_offers_received_last_{w_name}") >= F.col(f"count_high_inc_offers_received_last_{w_name}")), "BOGO")
             .when((F.col(f"count_low_inc_offers_received_last_{w_name}") > F.col(f"count_bogo_offers_received_last_{w_name}")) & (F.col(f"count_low_inc_offers_received_last_{w_name}") >= F.col(f"count_high_inc_offers_received_last_{w_name}")), "Até 25%")
             .when((F.col(f"count_high_inc_offers_received_last_{w_name}") > F.col(f"count_bogo_offers_received_last_{w_name}")) & (F.col(f"count_high_inc_offers_received_last_{w_name}") > F.col(f"count_low_inc_offers_received_last_{w_name}")), "Até 50%")
             .otherwise("Nenhum")
        )
    )

# Retirando features que não serão usadas na modelagem
interim_cols = [c for c in features_df.columns if "_daily" in c or "last_received_day" in c]
features_received_df = features_df.drop(*interim_cols)

# Features de contagem podem ter os nulos imputados com 0
count_cols = [c for c in features_received_df.columns if "count" in c]
features_received_df = features_received_df.fillna(0, subset=count_cols)


### 4.3.3) Transações Realizadas

**Exemplos de Features Criadas:**
- **Frequência:** Número de transações totais, com oferta e orgânicas.
- **Valor Monetário:** Soma do GOV (`gross_order_value`), soma dos descontos e ticket médio (`avg_ticket`).
- **Preferência de Oferta:** Proporção de transações por tipo de oferta utilizada (`share_bogo_transactions`).
- **Recência:** Dias desde a última transação (`days_since_last_transaction`).

In [0]:
# Agregamos as informações por dia
daily_transactions_summary_df = (
    transactions_enriched
    .withColumn("day", F.floor(F.col("time_since_test_start")).cast("integer"))
    .groupBy("account_id", "day")
    .agg(
        # Contagens
        F.count("*").alias("transaction_count_daily"),
        F.sum("used_offer").alias("offer_transaction_count_daily"),
        
        # Agregações de Valor
        F.sum("gross_order_value").alias("gov_sum_daily"),
        F.sum(F.when(F.col("used_offer") == 1, F.col("gross_order_value"))).alias("gov_offer_sum_daily"),
        F.sum("reward").alias("reward_sum_daily"),

        # Contagens por tipo de oferta usada
        F.sum(F.when(F.col("offer_type") == "bogo", 1).otherwise(0)).alias("bogo_transaction_count_daily"),
        F.sum(F.when(F.col("offer_type") == "discount", 1).otherwise(0)).alias("discount_transaction_count_daily"),
        
        # Agregados para a média de tempo de conversão
        F.max("time_from_receive_to_use").alias("max_time_to_completion_daily")
    )
)

# Pegamos a spine para considerar todas as datas, não só os dias onde houve transação
features_df = spine_df.join(daily_transactions_summary_df, on=["account_id", "day"], how="left")

# Construção de features que olham para os dias anteriores
window_specs = {f"{w}d": Window.partitionBy("account_id").orderBy("day").rangeBetween(-w, -1) for w in WINDOWS}
for w_name, w_spec in window_specs.items():
    features_df = (
        features_df
        # Contagens
        .withColumn(f"transaction_count_last_{w_name}", F.sum("transaction_count_daily").over(w_spec))
        .withColumn(f"offer_transaction_count_last_{w_name}", F.sum("offer_transaction_count_daily").over(w_spec))
        
        # Somas de Valor
        .withColumn(f"gov_sum_last_{w_name}", F.sum("gov_sum_daily").over(w_spec))
        .withColumn(f"reward_sum_last_{w_name}", F.sum("reward_sum_daily").over(w_spec))
        
        # Contagens por tipo
        .withColumn(f"bogo_transaction_count_last_{w_name}", F.sum("bogo_transaction_count_daily").over(w_spec))
        .withColumn(f"discount_transaction_count_last_{w_name}", F.sum("discount_transaction_count_daily").over(w_spec))
        
        # Agregados para tempo de conversão
        .withColumn(f"sum_time_to_completion_last_{w_name}", F.sum("max_time_to_completion_daily").over(w_spec))
    )

# Feature de recência
last_transaction_day_window = Window.partitionBy("account_id").orderBy("day").rowsBetween(Window.unboundedPreceding, -1)
features_df = (
    features_df
    .withColumn(
        "last_transaction_day",
        F.last(F.when(F.col("transaction_count_daily") > 0, F.col("day")), ignorenulls=True).over(last_transaction_day_window)
    )
    .withColumn(
        "days_since_last_transaction",
        F.col("day") - F.col("last_transaction_day")
    )
)

# Features de ticket médio e share
for w in WINDOWS:
    w_name = f"{w}d"
    features_df = (
        features_df
        .withColumn(
            f"organic_transaction_count_last_{w_name}",
            F.col(f"transaction_count_last_{w_name}") - F.col(f"offer_transaction_count_last_{w_name}")
        )
        .withColumn(
            f"avg_ticket_last_{w_name}",
            F.when(F.col(f"transaction_count_last_{w_name}") > 0, F.col(f"gov_sum_last_{w_name}") / F.col(f"transaction_count_last_{w_name}"))
        )
        .withColumn(
            f"avg_time_to_completion_last_{w_name}",
            F.when(F.col(f"offer_transaction_count_last_{w_name}") > 0, F.col(f"sum_time_to_completion_last_{w_name}") / F.col(f"offer_transaction_count_last_{w_name}"))
        )
        .withColumn(
            f"share_bogo_transactions_last_{w_name}",
            F.when(
                F.col(f"offer_transaction_count_last_{w_name}") > 0,
                F.coalesce(F.col(f"bogo_transaction_count_last_{w_name}"), F.lit(0)) / F.col(f"offer_transaction_count_last_{w_name}")
            )
        )
        .withColumn(
            f"share_discount_transactions_last_{w_name}",
            F.when(
                F.col(f"offer_transaction_count_last_{w_name}") > 0,
                F.coalesce(F.col(f"discount_transaction_count_last_{w_name}"), F.lit(0)) / F.col(f"offer_transaction_count_last_{w_name}")
             )
        )
    )

# Retirando features que não serão usadas na modelagem
interim_cols = [c for c in features_df.columns if "_daily" in c or "last_transaction_day" in c]
features_transactions_df = features_df.drop(*interim_cols)

# Features de contagem podem ter os nulos imputados com 0
count_sum_cols = [c for c in features_transactions_df.columns if ("count" in c or "sum" in c or "share" in c)]
features_transactions_df = features_transactions_df.fillna(0, subset=count_sum_cols)


### 4.3.4) Consolidação da *Feature Store*

**Objetivo:** Unificar todas as features criadas (comportamentais e de perfil) em uma única tabela final.

**Ações Executadas:**
- **Unificação das Features:** As tabelas de features (`transactions`, `received`, `viewed`) são unidas sobre a `spine`.

- **Criação de Taxas de Conversão:** São calculadas métricas de funil, como `view_rate` (visualizações / recebimentos) e `usage_rate` (usos / visualizações).

- **Adição dos Dados de Perfil:** As features estáticas da tabela `profile_silver` são adicionadas a cada registro diário.

- **Finalização:** O resultado é a tabela `customer_daily_features`, que constitui a Feature Store completa, pronta para ser utilizada na criação das tabelas de modelagem (ABTs).

In [0]:
# Junção de todas as features criadas
final_features_df = (
    features_transactions_df
    .join(features_received_df, on=["account_id", "day"], how="left")
    .join(features_viewed_df, on=["account_id", "day"], how="left")
)

# Criação de features derivadas como taxa de abertura
for w in WINDOWS:
    w_name = f"{w}d"
    
    received_col = f"count_offers_received_last_{w_name}"
    viewed_col = f"count_offers_viewed_last_{w_name}"
    used_col = f"offer_transaction_count_last_{w_name}"

    final_features_df = (
        final_features_df
        .withColumn(
            f"view_rate_last_{w_name}",
            F.when(
                F.col(received_col) > 0,
                F.coalesce(F.col(viewed_col), F.lit(0)) / F.col(received_col)
            )
        )
        .withColumn(
            f"usage_rate_from_received_last_{w_name}",
            F.when(
                F.col(received_col) > 0,
                F.coalesce(F.col(used_col), F.lit(0)) / F.col(received_col)
            )
        )
        .withColumn(
            f"usage_rate_from_viewed_last_{w_name}",
            F.when(
                F.col(viewed_col) > 0,
                F.coalesce(F.col(used_col), F.lit(0)) / F.col(viewed_col)
            )
        )
    )

# Adição de features cadastrais
customer_daily_features = final_features_df.join(profile.drop('execution_date'), on="account_id", how="left")

# Imputando com 0 os nulos em features de contagem
all_count_sum_cols = [c for c in customer_daily_features.columns if ("count" in c or "sum" in c)]
customer_daily_features = customer_daily_features.fillna(0, subset=all_count_sum_cols)

customer_daily_features = customer_daily_features.withColumn("execution_date", F.current_date())


### 4.3.5) Salvando a *Feature Store* na camada *Gold*

In [0]:
save_as_parquet(customer_daily_features, f"{PATH_VOLUME}/gold/customer_daily_features")


# 5) Definição e Construção da Variável Alvo (*Target*)
A escolha da variável alvo é uma das decisões mais críticas do projeto, pois ela define o que o modelo tentará prever e, consequentemente, como o sucesso de uma campanha de ofertas será medido.

### Discussão sobre a Escolha da Métrica
Uma métrica inicial considerada foi a conversão direta da oferta, ou seja, se o cliente **usou ou não o cupom** enviado. Embora seja uma métrica válida, ela oferece uma visão limitada do impacto real da campanha. O sucesso de uma estratégia de marketing não se resume a uma única transação, mas sim à sua capacidade de **influenciar positivamente o comportamento de compra do cliente a médio e longo prazo**.  
Por exemplo, um cliente pode não utilizar o cupom específico que recebeu, mas o estímulo da oferta pode tê-lo lembrado da marca, levando-o a realizar mais compras orgânicas nos dias seguintes. Este é um efeito positivo e desejável da campanha que a simples métrica de uso de cupom não capturaria.


### Variável Alvo Adotada: Gasto Futuro
Com base nessa perspectiva, a variável alvo adotada foi o **valor total que o cliente gasta (`gross_oder_value`) em uma janela de tempo futura**.  
**Como é calculada:** Para cada cliente e para cada dia no nosso dataset, nós "olhamos para o futuro" e somamos todo o valor que ele gastou nos próximos `X` dias.

In [0]:
TARGET_WINDOWS = [5, 10]

# Se o cliente recebeu oferta num dia específico (servirá para separar as ABTs)
received_offer_next_days_df = (
    features_received_df
    .select('account_id', 'day', 'offers_received_on_day')
)

# Total gasto no dia
daily_outcomes_summary_df = (
    transactions_enriched
    .withColumn("day", F.floor(F.col("time_since_test_start")).cast("integer"))
    .groupBy("account_id", "day")
    .agg(
        F.sum("gross_order_value").alias("gov_on_day")
    )
)

# Junção das informações acima
features_df = (
    spine_df
    .join(daily_outcomes_summary_df, on=["account_id", "day"], how="left")
    .join(received_offer_next_days_df, on=["account_id", "day"], how="left")
)

# Percorre a janela futura e calcula o gasto total e se recebeu oferta
for w in TARGET_WINDOWS:
    w_spec_future = Window.partitionBy("account_id").orderBy("day").rangeBetween(0, w)
    
    features_df = (
        features_df
        .withColumn(f"spend_next_{w}d", F.sum("gov_on_day").over(w_spec_future))
        .withColumn(f"received_offer_next_{w}d", F.sum("offers_received_on_day").over(w_spec_future))
    )

# Tabela final de target
daily_targets = (
    features_df
    .select(
        "account_id",
        "day",
        *[f"spend_next_{w}d" for w in TARGET_WINDOWS],
        *[f"received_offer_next_{w}d" for w in TARGET_WINDOWS],
    )
)

# Se não transacionou nos próximos X dias a target é 0
daily_targets = daily_targets.fillna(0)

In [0]:
save_as_parquet(daily_targets, f"{PATH_VOLUME}/gold/daily_targets")


# 6) Construção das ABTs

Para resolver o desafio de qual oferta enviar a cada cliente, nossa estratégia se baseia em comparar o resultado de duas situações possíveis. Em vez de construir um único modelo genérico, optamos por desenvolver **dois modelos preditivos especialistas**, cada um focado em um cenário de negócio distinto.

A questão fundamental que queremos responder é: **Para um determinado cliente, qual é o impacto financeiro esperado se enviarmos uma oferta versus se não fizermos nada?**

Para isso, precisamos prever o comportamento do cliente em cada um desses dois cenários.

### A Estratégia: Comparação de Cenários Preditivos

A abordagem consiste em:

1.  **Construir um modelo especialista em prever o gasto de um cliente quando ele recebe uma oferta.** Este modelo aprenderá como diferentes perfis de clientes reagem a diferentes tipos de incentivos.
2.  **Construir um segundo modelo especialista em prever o gasto "orgânico" de um cliente**, ou seja, seu comportamento de compra natural, sem a influência de uma campanha. Este modelo nos dará uma linha de base (baseline) do que esperar de cada cliente.

A decisão de qual oferta enviar (ou se não devemos enviar nenhuma) virá da comparação das predições desses dois modelos. Buscaremos a ação que maximize o retorno financeiro esperado.

`Impacto Financeiro Estimado = (Gasto Previsto COM Oferta) - (Gasto Previsto SEM Oferta)`

Para treinar esses modelos especialistas, precisamos de conjuntos de dados (ABTs) que representem fielmente cada um desses cenários.

---

### ABT 1: `abt_offer` (Dados para o Modelo de Cenário COM Oferta)

**Objetivo:** Criar um conjunto de dados para treinar um modelo que aprenda a prever o gasto de um cliente **após receber o estímulo de uma oferta específica**.

**Composição:**
- **População:** A tabela é formada por registros de clientes nos dias em que eles **receberam uma oferta**.
- **Features:** Inclui todas as features históricas do cliente no momento do recebimento, além das **características da própria oferta** (tipo, duração, valor, etc.). As features da oferta são cruciais aqui, pois o modelo precisa aprender seu impacto.
- **Target:** O gasto futuro do cliente (`spend_next_X_days`).

---

### ABT 2: `abt_baseline` (Dados para o Modelo de Cenário SEM Oferta)

**Objetivo:** Criar um conjunto de dados para treinar um modelo que preveja o comportamento de gasto **orgânico e natural** de um cliente.

**Composição:**
- **População:** A tabela é composta por registros de clientes em dias que eles **não receberam nenhuma oferta**.
- **Features:** Contém apenas as features históricas do cliente. Não há features de oferta, pois este é um cenário sem estímulos.
- **Target:** O gasto futuro do cliente (`spend_next_X_days`).

Com essa estrutura, cada modelo se torna um especialista em seu domínio. Ao combiná-los, criamos um sistema de decisão robusto que compara cenários futuros para tomar a melhor ação no presente, focando sempre em maximizar o valor gerado para o negócio.

In [0]:
# ABT Modelo 1: Prever gasto total dado que recebeu cupom
window_spec_first_offer = Window.partitionBy("account_id", "day").orderBy("time_since_test_start")

first_offer_of_the_day_df = (
    offers_received
    .withColumn("day", F.floor(F.col("time_since_test_start")).cast("integer"))
    .withColumn("rank", F.row_number().over(window_spec_first_offer))
    .filter(F.col("rank") == 1)
    .select("account_id", "day", "offer_id")
)

abt_offer = (
    first_offer_of_the_day_df
    .join(customer_daily_features, on=["account_id", "day"], how="inner")
    .join(daily_targets, on=["account_id", "day"], how="left")
    .join(offers, on="offer_id", how="left")
    .drop('execution_date')
    .withColumn('execution_date', F.current_date())
)

# ABT Modelo 2: Prever gasto total dado que não recebeu oferta naquele dia
SAMPLING_FRACTION = 0.2 # Pegar apenas uma amostra da população
abt_baseline = (
    customer_daily_features
    .filter(F.col("offers_received_on_day").isNull())
    .sample(withReplacement=False, fraction=SAMPLING_FRACTION, seed=42)
    .join(daily_targets, on=["account_id", "day"], how="left")
    .drop('execution_date')
    .withColumn('execution_date', F.current_date())
)

In [0]:
save_as_parquet(abt_offer, f"{PATH_VOLUME}/gold/abt_offer")
save_as_parquet(abt_baseline, f"{PATH_VOLUME}/gold/abt_baseline")


# 7) Discussão sobre Limitações e Melhorias Futuras
Como a análise foi feita a partir de dados observacionais, sem um ambiente experimental totalmente controlado, é importante reconhecer algumas limitações da abordagem que adotei e apontar como eu poderia evoluir a solução em versões futuras.


## 7.1) Ruído na Janela de Cálculo da Variável Alvo
A variável alvo (`spend_next_X_days`) foi calculada em um contexto de negócio real, onde os clientes podem ser impactados por outras campanhas ou fatores externos durante a janela de observação. Isso torna difícil isolar o efeito de uma única oferta.

Mesmo assim, optei por seguir com essa abordagem por ela refletir melhor o cenário real. Meu objetivo não era prever em um ambiente artificial de laboratório, mas entender se, **mesmo diante de múltiplas interações e ruídos**, o envio de uma oferta específica ainda poderia aumentar o gasto do cliente. Dessa forma, o modelo aprende a capturar padrões no ecossistema complexo em que o cliente está inserido.


## 7.2) Potencial Viés de Seleção no Envio de Ofertas
Ao comparar cenários "com" e "sem" oferta, existe uma suposição implícita de que os grupos são equivalentes. Porém, sei que o envio de ofertas não acontece de forma aleatória. Perfis específicos como novos clientes ou usuários em risco de churn têm maior chance de receber incentivos, o que gera um **viés de seleção**.

Minha ideia inicial era implementar o **Propensity Score Matching (PSM)** justamente para mitigar esse problema, criando grupos de comparação mais justos. Porém, por questão de tempo, deixei essa implementação planejada como um incremento futuro caso sobrasse tempo, o que não foi possível executar dentro do escopo deste case.


## 7.3) Possível Influência de Ofertas Anteriores na Baseline
A tabela `abt_baseline` foi construída considerando os dias em que o cliente não recebeu novas ofertas. No entanto, pode acontecer de uma oferta enviada anteriormente ainda estar influenciando o comportamento do cliente nesse período, o que afeta a noção de um dia totalmente "sem impacto".

Optei por seguir com essa definição mais simples de baseline, pois ela já captura o comportamento médio do cliente em dias sem estímulos recentes, mesmo que ainda exista alguma influência de interações passadas.


## 7.4) Segmentação de Clientes
Outra melhoria que eu considerei foi criar uma segmentação de clientes para permitir comparações entre diferentes perfis de comportamento. Isso poderia ser feito, por exemplo, a partir de uma análise **RFM** (Recência, Frequência e Valor Monetário) usando as features já construídas, ou ainda a partir das informações cadastrais disponíveis em `profile`.

Essa segmentação possibilitaria entender melhor como diferentes tipos de clientes reagem às ofertas, trazendo insights adicionais além da modelagem preditiva individual. No entanto, por questão de tempo, não consegui implementar essa etapa dentro do escopo atual.


## 7.5) Explorar mais *feature engineering*
Por limitação de tempo, não foi possível criar muitas variáveis, já que o aumento de features eleva o tempo de modelagem. No entanto, seria viável gerar variáveis adicionais, como *lags* (comportamento dos dias anteriores) e combinar features para criar indicadores de tendência, por exemplo: 

*ticket_medio_last_3d / ticket_medio_last_7d*, que indicaria se o ticket médio do cliente tem aumentado ou diminuído nos últimos dias.
