# Processamento dos dados

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.window import Window

In [0]:
df_offers = spark.read.json('/Volumes/workspace/raw/dados_brutos/offers.json')
df_profile = spark.read.json('/Volumes/workspace/raw/dados_brutos/profile.json')
df_trans = spark.read.json('/Volumes/workspace/raw/dados_brutos/transactions.json')

## 

## Manipulação e limpeza dos dados

In [0]:
def verificar_nulos(df: DataFrame) -> DataFrame:

    """
    Função para verificar a quantidade de dados nulos por coluna. 
    Parâmetros:
        df: Spark DataFrame
    """

    df_nulos = df.select([
        F.sum(F.col(c).isNull().cast("int")).alias(c)
        for c in df.columns
    ])

    return df_nulos


def verificar_duplicatas(df: DataFrame, coluna: str = 'id') -> DataFrame:

    """
    Função para verificar a quantidade de duplicatas assumindo a granularidade id
    Parâmetros:
        df: Spark DataFrame
    """

    df_duplicatas = df.groupBy(coluna).count().filter(F.col('count') > 1).count()

    return df_duplicatas

### 1 - Offers

In [0]:
df_offers.limit(5).display()

In [0]:
df_offers.printSchema()

In [0]:
print(f"Quantidade de duplicatas: {verificar_duplicatas(df_offers)}")

In [0]:
verificar_nulos(df_offers).display()

In [0]:
df_offers.display()

### Criando novas colunas

In [0]:
df_offers.describe().display()

In [0]:
offers_transformado = (
    df_offers
    .withColumn(
        "categoria_duracao",
        F.when(F.col("duration") <= 3, "curta")
        .when((F.col("duration") > 3) & (F.col("duration") <= 5), "media")
        .otherwise("longa")
    )
)

offers_transformado.groupBy(F.col('categoria_duracao')).count().display()

In [0]:
offers_transformado = (
    offers_transformado
    .withColumn("num_channels", F.size(F.col("channels")))
)


Vamos iniciar transformando a coluna `channels`

In [0]:
df_explo = offers_transformado.withColumn("channel_explo", F.explode("channels"))

df_pivoted = df_explo.groupBy("id") \
                        .pivot("channel_explo") \
                        .agg(F.lit(1)) \
                        .na.fill(0)

df_offers_final = offers_transformado.join(df_pivoted, on="id", how="left")

### 2 - Profile

In [0]:
df_profile.limit(5).display()

In [0]:
df_profile.printSchema()

In [0]:
verificar_nulos(df_profile).display()

Vamos verificar se os indices dos valores presentes em `credit_card_limit` são os mesmos de `gender`. Olhando as 5 primeiras linhas, parece que há um padrão que toda vez que a idade é 118, vamos verificar isso. 

In [0]:
qtd_dados_nulos = df_profile.filter(
    (F.col("age") == 118) & 
    (F.col("gender").isNull()) & 
    (F.col("credit_card_limit").isNull())
).count()

print(f" Quantidade de de dados nulos para idade 118 e nulos para gender e credit_card_limit: {qtd_dados_nulos}")

Confirmou-se a suspeita de que todos os dados que aparecem como nulos em `credit_card_limit` e `gender` possuem idade igual a 118. Isso indica que esses registros não representam clientes "reais" com cadastro completo.

Podemos interpretar esse padrão de diferentes formas:

1. **Valores fictícios/teste**: a idade 118 sugere um valor sentinela usado pelo sistema.
2. **Cadastro incompleto**: esses usuários podem ter interrompido o processo antes de informar dados pessoais e financeiros.
3. **Tratamento de dados ausentes**: em vez de deixar a idade em branco, o sistema pode ter utilizado 118 como um “flag”.

Também existem duas possibilidades de tratamento para esses dados:

1. **Remover os registros**

Se o objetivo é a modelagem preditiva, é necessário trabalhar com dados limpos e realistas, evitando inserções artificiais como a idade 118. Como o percentual de nulos na base é de apenas 12,79%, a remoção desses usuários não comprometerá a amostra.
A grande vantagem dessa abordagem é gerar modelos mais consistentes, sem ruídos, além de evitar o enviesamento das estatísticas. A desvantagem, por outro lado, é a perda dessas informações.

2. **Tratar como um "grupo especial"**

Se a pergunta a ser respondida for algo como *“como se comportam os usuários sem informações cadastradas”*, ou se o objetivo for analisar a jornada de perfis incompletos, o melhor tratamento é considerar esses casos como um grupo específico.
A principal vantagem é manter a visibilidade desse segmento; já a desvantagem é que ele pode ser apenas “ruído de sistema”, sem representar clientes reais ou comportamentos fidedignos.

---

**Como nosso objetivo é a modelagem preditiva, seguiremos com a primeira abordagem.**

---

In [0]:

print(f"Percentual da base de usuários que é nula: {qtd_dados_nulos/df_profile.count()*100:.2f}%")

In [0]:
df_profile_sn = df_profile.na.drop(subset=['credit_card_limit', 'gender'])

In [0]:
verificar_nulos(df_profile_sn).display()

In [0]:
df_profile_sn = df_profile_sn.withColumn("registered_on", F.to_date(F.col("registered_on"), "yyyyMMdd"))

In [0]:
print(f"Quantidade de duplicatas: {verificar_duplicatas(df_profile_sn)}")

### Criando novas colunas

Vamos criar uma coluna que represente o engajamento com a plataforma. A coluna que vamos criar é o **tempo de registro** do usuário.

In [0]:
df_profile_transformado = df_profile_sn.withColumn(
    "tempo_de_registro",
    F.datediff(F.current_date(), F.col("registered_on"))
)


### 3 - Transactions

In [0]:
df_trans.limit(5).display()

In [0]:
df_trans.printSchema()

In [0]:
colunas_selecionadas = ['account_id', 'event', 'time_since_test_start']

df_trans = df_trans.select(
    *colunas_selecionadas,
    "value.*"
)

In [0]:
verificar_nulos(df_trans).display()

Temos duas colunas que parecem carregar o mesmo significado, sendo elas `offer id` e `offer_id`, porém percebe-se que a quantidade de dados nulos em cada uma diferente. Vamos verificar essas colunas com mais calma. 


In [0]:
eh_offerid_nulo = F.col("offer id").isNull()
eh_offer_id_nulo = F.col("offer_id").isNull()

resumo = df_trans.agg(
    F.count('*').alias('total_linhas'),
    F.sum(F.when(eh_offerid_nulo & eh_offer_id_nulo, 1).otherwise(0)).alias('ambos_nulos'),
    F.sum(F.when(~eh_offerid_nulo & eh_offer_id_nulo, 1).otherwise(0)).alias('apenas_underscore_nulo'),
    F.sum(F.when(eh_offerid_nulo & ~eh_offer_id_nulo, 1).otherwise(0)).alias('apenas_espaco_nulo'),
    F.sum(F.when(~eh_offerid_nulo & ~eh_offer_id_nulo, 1).otherwise(0)).alias('ambos_nao_nulos')
    

).first()

total_linhas = resumo['total_linhas']
ambos_nulos = resumo['ambos_nulos']
apenas_underscore_nulo = resumo['apenas_underscore_nulo']
apenas_espaco_nulo = resumo['apenas_espaco_nulo']
ambos_n_nulos = resumo['ambos_nao_nulos']

if total_linhas > 0:
    print(
        f"Ambos nulos: {ambos_nulos} / {total_linhas} = {ambos_nulos/total_linhas*100:.2f}%"
    )
    print(
        f"Apenas 'offer id' nulo: {apenas_espaco_nulo} / {total_linhas} = {apenas_espaco_nulo/total_linhas*100:.2f}%"
    )
    print(
        f"Apenas 'offer_id' nulo: {apenas_underscore_nulo} / {total_linhas} = {apenas_underscore_nulo/total_linhas*100:.2f}%"
    )

    print(
        f"Ambos não nulos: {ambos_n_nulos} / {total_linhas} = {ambos_n_nulos/total_linhas*100:.2f}%"
    )
else:
    print("O DataFrame está vazio.")


Observa-se que quase metade dos registros não tem nenhuma informação de oferta associada, esses registros provavelmente são referentes a **eventos de transação normal**, ou seja, compras feitas sem cupons ou ofertas. 

Era esperado que as colunas `offer id` e `offer_id` fossem duplicadas, mas a análise mostra que não são. 

* Há casos em que só `offer id` está preenchido (~11%)

* E casos em que só `offer_id` está preenchido (~44%)

Indicando que os dados de oferta estão espalhados entre as duas colunas. 

Outra informação que obtemos é que ambos não nulo é 0%, indicando que nunca houve uma linha com os dois preenchidos ao mesmo tempo, uma ou a outra é usada, mas nunca as duas. Vamos unificar essas colunas posteriormente.

Outro detalhe que percebemos é que `reward` possui a mesma quantidade de dados nulos que a coluna `offer_id`, indicando que essas duas colunas possuem certa relação. Vamos analisar essa relação. 



In [0]:
df_trans.select(
    F.when(F.col("offer id").isNotNull(), "tem_offer_id_espaco")
     .when(F.col("offer_id").isNotNull(), "tem_offer_id_underscore")
     .otherwise("sem_offer").alias("origem_offer"),
    F.when(F.col("reward").isNotNull(), 1).otherwise(0).alias("tem_reward")
).groupBy("origem_offer").agg(
    F.count("*").alias("total"),
    F.sum("tem_reward").alias("com_reward")
).display()


1. **`tem_offer_id_underscore` (33.579 eventos)**

   * Todos esses eventos têm **`offer_id`**.
   * E **todos eles têm `reward` associado** (33.579/33.579).
     Isso significa que **nessa versão do schema, o campo `reward` foi registrado corretamente**.

2. **`tem_offer_id_espaco` (134.002 eventos)**

   * Todos têm uma oferta vinculada (`offer id`).
   * Mas **nenhum deles tem `reward` preenchido**.
     Isso indica que quando a ingestão usou a chave `offer id`, a informação de recompensa não veio junto. **Ou seja, há uma perda de informação.**

3. **`sem_offer` (138.953 eventos)**

   * Eventos que não têm nenhuma oferta associada.
   * Naturalmente não têm `reward`.
     Faz sentido, pois são **transações normais**, sem cupom.

Isso indica que o campo `reward` só é confiável para as ofertas registras em `offer_id`. As ofertas que vieram de `offer id` (atenção para o nome da coluna sem _) ficaram sem `reward`, mesmo quando provavelmente havia uma recompensa. Isso indica uma inconsistência nos dados. Aparentemente duas versões do mesmo campo foram mescladas, mas só uma preservou o reward. 

In [0]:
df_trans_final = df_trans.withColumn(
    "offer_id_final",
    F.coalesce(F.col("offer_id"), F.col("offer id"))
).drop("offer_id", "offer id")

A última coluna que faz sentido analisarmos o motivo de vir nulos é `amount`. Na tabela **transactions**, o campo `value` é um dicionário que varia conforme o `event`. Ele pode guardar o valor da compra (`amount`) ou dados da oferta, como o ID da oferta ou o `reward`, que é o valor do desconto obtido.

Mas a resposta do motivo de vindo nulo é a própria configuração dos dados. `amount` só faz sentido quando o `event` é *transaction*. Para eventos como *offer received*, *offer viewed* e *offer completed*, `value` trará informações da oferta e não o valor pago; por esse motivo, `amount` vem nulo. Logo, isso não é uma anomalia dos dados, mas um nulo “estrutural”. Assim, manteremos esses nulos.


In [0]:
df_trans_final.select('event').distinct().display()

### Transformando a base de transações

A base de dados `df_trans_final` está estruturada de uma forma que conta a história de um determinado usuário quando recebe uma oferta. Por exemplo o caso abaixo:

| account_id                       | event          | time_since_test_start | amount | reward | offer_id_final                     |
|----------------------------------|----------------|-----------------------|--------|--------|------------------------------------|
| 533342f427824eb59adc4aa9949fe666 | offer received | 0                     | null   | null   | 2906b810c7d4411798c6938adc9daaa5   |
| 533342f427824eb59adc4aa9949fe666 | offer viewed   | 1.25                  | null   | null   | 2906b810c7d4411798c6938adc9daaa5   |
| 533342f427824eb59adc4aa9949fe666 | transaction    | 2.75                  | 12.36  | null   | null                               |
| 533342f427824eb59adc4aa9949fe666 | offer completed| 2.75                  | null   | 2      | 2906b810c7d4411798c6938adc9daaa5   |

Esse exemplo, extraído da base de dados, mostra que o usuário recebeu uma oferta, visualizou-a após 1 dia e 6 horas do início do teste e, depois de 2 dias e 18 horas, realizou uma transação que completou os requisitos para receber as recompensas da oferta. Ou seja, ele efetuou transações que atingiram os critérios definidos.

Portanto, a base de dados está nos contando uma história. Como nosso objetivo final é desenvolver um modelo que auxilie na decisão de qual oferta enviar para cada cliente, precisamos transformar esses dados em uma estrutura mais adequada para a aplicação dos modelos, além de criar novas variáveis por meio de feature engineering. É exatamente isso que vamos fazer nesta seção.


In [0]:
df_trans_final.select('event').distinct().show()

In [0]:
# Cada jornada é uma oferta recebida
df_jornada = df_trans_final.filter(F.col("event") == "offer received") \
    .select(
        F.col("account_id"),
        F.col("offer_id_final"),
        F.col("time_since_test_start").alias("receive_time")
    )

In [0]:
janela = Window.partitionBy("account_id").orderBy("receive_time")
df_jor_jane = df_jornada.withColumn(
    "fim_jornada",
    F.lead("receive_time").over(janela)
)

# Encontra todas as jornadas cujo fim_jornada é nulo, ou seja, as últimas jornadas de cada cliente.
df_jor_jane = df_jor_jane.na.fill({'fim_jornada': float('inf')})

In [0]:
df_transacoes = df_trans_final.filter(F.col("event") == "transaction")

agg_features_jorn = df_jor_jane.alias("j") \
    .join(
        df_transacoes.alias("t"),
        on=[
            F.col("j.account_id") == F.col("t.account_id"),
            (F.col("t.time_since_test_start") >= F.col("j.receive_time")) &
            (F.col("t.time_since_test_start") < F.col("j.fim_jornada"))
        ],
        how="left"
    ) \
    .groupBy("j.account_id", "j.offer_id_final", "j.receive_time") \
    .agg(
        F.sum("t.amount").alias("valor_gasto_na_jornada"),
        F.count("t.amount").alias("qtd_transacoes_na_jornada"),
        F.avg("t.amount").alias("ticket_medio_na_jornada")
    )

In [0]:
df_eventos_esp = df_trans_final.filter(F.col("event").isin(["offer viewed", "offer completed"]))

eventos_candidatos = df_eventos_esp.alias("e") \
    .join(
        df_jornada.alias("j"),
        on=[
            F.col("e.account_id") == F.col("j.account_id"),
            F.col("e.offer_id_final") == F.col("j.offer_id_final"),
            F.col("e.time_since_test_start") >= F.col("j.receive_time") # O evento deve ser depois do início da jornada
        ],
        how="inner"
    )

# Encontrando a jornada correta, que será considerada a mais recente antes do evento
janela_evento = Window.partitionBy(
    "e.account_id", "e.offer_id_final", "e.time_since_test_start"
).orderBy(F.desc("j.receive_time"))

# Rankeando e filtrando pela jornada correta, que no caso é com rank=1
df_att_correta = eventos_candidatos.withColumn("rank", F.rank().over(janela_evento)) \
    .filter(F.col("rank") == 1) \
    .select(
        F.col("j.account_id"),
        F.col("j.offer_id_final"),
        F.col("j.receive_time"),
        F.col("e.event")
    )

# Pivotando os eventos para criar as flags
flags_eventos = df_att_correta.groupBy("account_id", "offer_id_final", "receive_time") \
    .pivot("event", ["offer viewed", "offer completed"]) \
    .agg(F.lit(1)) \
    .na.fill(0) \
    .withColumnRenamed("offer completed", "target") \
    .withColumnRenamed("offer viewed", "offer_viewed")


In [0]:
df_final = df_jornada \
    .join(
        flags_eventos,
        on=["account_id", "offer_id_final", "receive_time"],
        how="left"
    ) \
    .join(
        agg_features_jorn,
        on=["account_id", "offer_id_final", "receive_time"],
        how="left"
    )

df_final = df_final \
    .withColumn("offer_received", F.lit(1)) \
    .na.fill(0) \
    .orderBy("account_id", "receive_time")

df_final = df_final.select(
    'account_id', 'offer_id_final', 'receive_time', 'target',
    'offer_received', 'offer_viewed', 'valor_gasto_na_jornada',
    'qtd_transacoes_na_jornada', 'ticket_medio_na_jornada'
)

In [0]:
%sql
select * from workspace.raw.transacoes
where account_id = '3001246acc004275b4a08a37fa8785fd'

In [0]:
df_final.filter(F.col('account_id')=='3001246acc004275b4a08a37fa8785fd').display()

In [0]:
%sql
select * from workspace.raw.transacoes
where account_id = 'dca5e3cb4b4a4a399933444bc2a1fbd5'

In [0]:
df_final.filter(F.col('account_id')=='dca5e3cb4b4a4a399933444bc2a1fbd5').display()

As transformações acima já englobam algumas particularidades da base de dados. Por exemplo, há casos em que uma transação se sobrepõe a outra, da seguinte forma:  

* Cliente recebeu a oferta 1 -> Cliente visualizou a oferta 1 -> Cliente recebeu a oferta 2 -> Cliente fez uma transação -> Cliente completou a oferta 1  

Nesses casos, é considerada a oferta recebida mais recente como início da jornada e para onde vão os valores das transações.  

Há também situações em que o cliente recebeu a oferta, não a visualizou e mesmo assim a completou. Esses casos já estão englobados nas transformações, pois consideramos o início da jornada dessa transação independentemente de ele ter visualizado ou não.  

A base de dados também contempla todas as ofertas recebidas, pois consideramos o tempo de início da oferta recebida. Dessa forma, é possível visualizar todo o processo da oferta.  


## Preparação de dataset unificado

In [0]:
df_join = (
    df_final
    .join(df_profile_transformado, df_final.account_id == df_profile_transformado.id, "inner")
    .join(df_offers_final, df_final.offer_id_final == df_offers_final.id, "inner")
    .drop(df_profile_transformado.id)
    .drop(df_offers_final.id)
)

In [0]:
df_join.write.format("delta") \
  .option("mergeSchema", "true") \
  .mode("overwrite") \
  .saveAsTable("workspace.processed.base_processada")