In [0]:
#Definição de bibliotecas
import pandas as pd
import pyspark.sql.functions as f
import sys
sys.path.append("../src")
from funcoes_pyspark import analisar_dataframe, filtra_mais_recente

###1 - Reading and Understanding Databases

Nosso objetivo: 1.Analisar os dados históricos de transações, ofertas e clientes

#### 1.1 - Offers Database

In [0]:
#Vamos ler a base de ofertas, reestruturar a coluna channels e organizar as colunas
offers = spark.createDataFrame(pd.read_json("../data/raw/offers.json"))\
    .withColumn("web",     f.when(f.array_contains("channels", "web"), 1).otherwise(0)) \
    .withColumn("email",   f.when(f.array_contains("channels", "email"), 1).otherwise(0)) \
    .withColumn("mobile",  f.when(f.array_contains("channels", "mobile"), 1).otherwise(0)) \
    .withColumn("social",  f.when(f.array_contains("channels", "social"), 1).otherwise(0))\
    .withColumnRenamed("id","offer_id")\
    .select("offer_id","offer_type","discount_value","min_value","duration","web","email","mobile","social").orderBy("offer_type")

In [0]:
#verificando volumetria e duplicatas de Id
analisar_dataframe(offers, "offer_id")

In [0]:
#Verificar saída da base de ofertas
offers.limit(5).display()

#####1.1.1 Data understanding

In [0]:
# offer_id: identificador único da oferta, usado para relacionar com eventos 
#           como recebimento, uso e expiração. É a chave para cruzar com outras bases.

# offer_type: tipo da oferta.
#   - "bogo" → Buy One Get One: compre um e ganhe outro, ou crédito equivalente.
#   - "discount" → desconto direto no valor da compra.
#   - "informational" → comunicação sem desconto, apenas informativa.

# discount_value: valor do benefício.
#   - Para "discount", é o desconto em moeda (ex.: 5 = R$ 5 de desconto). não é percentual!
#   - Para "bogo", discount_value = 10 → Ao comprar 1 item, o cliente recebe outro item ou crédito equivalente a R$ 10.
#   - Para "informational", normalmente é 0, pois não há desconto.

# min_value: valor mínimo de compra para ativar a oferta.
#            Ex.: min_value = 20 significa que a compra deve ser de pelo menos R$ 20
#            para o desconto ou brinde ser aplicado.

# duration: número de dias de validade da oferta a partir do envio ao cliente.

# web, email, mobile, social: canais de veiculação (1 = ativo, 0 = inativo).
# Permitem enviar a mesma oferta por múltiplos canais e medir qual funciona melhor.

# Essa tabela provavelmente é o catálogo de ofertas da empresa.
# Ela define todas as regras e parâmetros de cada promoção e serve de base 
# para cruzar com dados de eventos (quem recebeu, usou, etc.).
# Também permite análises como:
# - desempenho por tipo de oferta
# - eficácia por canal de comunicação
# - impacto de diferentes valores de desconto


#### 1.2 - Profiles Database

In [0]:
profile = spark.createDataFrame(pd.read_json("../data/raw/profile.json")).withColumnRenamed("id","profile_id").select("profile_id","registered_on","age","gender","credit_card_limit")

In [0]:
#verificando volumetria e duplicatas de Id
analisar_dataframe(profile, "profile_id")

In [0]:
profile.limit(5).display()

In [0]:
#Podemos considerar que é uma base de 2018, uma vez que existem clientes entrantes todos os dias
profile.withColumn("safra_registro", f.substring(f.col("registered_on").cast("string"), 1, 4)).groupBy("safra_registro").count().orderBy("safra_registro").display()

#####1.2.1 Data understanding

In [0]:
# profile_id: identificador único do cliente.
#             Usado para cruzar o perfil com outras tabelas (ex.: ofertas recebidas, transações, histórico de uso).

# registered_on: data de criação da conta (formato AAAAMMDD).
#                Representa o momento em que o cliente entrou na base.
#                Útil para calcular "tempo de relacionamento" e criar coortes.

# age: idade do cliente na data de criação da conta.
#      Valores extremamente altos (ex.: 118) indicam dado faltante ou inválido, será que é usado como código de "sem idade"? veremos mais na análise exploratória.
#      Pode ser ajustado via imputação ou categorização especial ("idade não informada").

# gender: gênero informado pelo cliente no cadastro.
#         Possíveis valores: "M" (masculino), "F" (feminino) ou null (não informado).
#         Null pode indicar falta de informação ou que o cliente optou por não declarar.
#         Nao posso inferir esse tipo de dados, mas pode ser usado para segmentação por gênero.

# credit_card_limit: limite de crédito do cartão registrado pelo cliente (float).
#                    Null indica ausência de cartão registrado ou dado indisponível.
#                    Pode ser usado como proxy de poder de compra ou perfil de risco.

# Observações gerais sobre a base:
# - São cerca de 17k clientes cadastrados.
# - Há campos com valores nulos ou inválidos, exigindo tratamento antes de análises/modelagem.
# - Não sei se essa idade na base cadastral é do momento do cadastro, ou se é atualizada. se for a data no momento do cadastro. A combinação "registered_on" + "age". Poderia nos dar a idade atual da pessoa, porém a base contém o campo credit_card_limit que deve ser atualizado, então vou tratar Age como a idade atual.
# - "credit_card_limit" pode ajudar em segmentações de clientes por capacidade de consumo.
# - A base serve como tabela de perfis, enriquecendo dados de ofertas e transações.


#### 1.3 - Transactions Database

In [0]:
from pyspark.sql.types import StructType, StringType, ArrayType
#Define o schema que representa o conteúdo da coluna "value"
#A coluna value contem o id da oferta. Os outros valores ("amount","reward") sao nulos em todo dataset
#Account_id deve ser análogo a profile_id entao vou renomear

transactions = spark.createDataFrame(pd.read_json("../data/raw/transactions.json")).withColumn("amount", f.col("value.amount")).withColumnRenamed("account_id","profile_id") \
    .withColumn("offer_id", f.col("value.`offer id`")).select("profile_id","offer_id","event","time_since_test_start")

In [0]:
analisar_dataframe(transactions, "profile_id")

In [0]:
transactions.limit(5).display()

#####1.3.1 Data understanding

In [0]:
# profile_id: id unico do cliente - originalmente era account_id
#             Serve pra ligar o evento a um cliente especifico.

# offer_id: id da oferta relacionada ao evento.
#           Aqui no exemplo já está como coluna, mas na base original vinha dentro da coluna "value" (json).
#           Usado pra cruzar com a tabela de ofertas e saber detalhes dela.

# event: tipo de evento que aconteceu.
#   - "transaction": cliente fez uma compra (pode ou não ter relação com oferta)
#   - "offer received": cliente recebeu uma oferta
#   - "offer viewed": cliente abriu/visualizou a oferta
#   - "offer completed": cliente cumpriu a condição da oferta (ex.: gastou o mínimo e ganhou o desconto)

# time_since_test_start: tempo desde o inicio do teste até o momento do evento, em dias.
#                        No exemplo tudo tá 0, então parece ser o registro inicial.

# value: na base completa, essa coluna guarda um json com informações adicionais:
#   - offer_id (quando é evento de oferta)
#   - reward/desconto (pra saber o valor do beneficio)
#   - amount (valor da transação quando é evento de compra)

# Essa tabela é um log de cerca de 300 mil eventos de clientes dentro do teste/campanha.
# A gte consegue seguir o "funil" de conversão: 
#   received → viewed → completed
#   e também ver transações que aconteceram com ou sem oferta vinculada.

In [0]:
transactions.groupBy("event","offer_id").count().orderBy("event").display()
#Essa coluna de evento junto com a "time_since_test_start" é capaz de definir a jornada do cliente após receber uma oferta!
#mas seguimos com algumas observações interessantes:

# - As "offer completed" sempre tem offer_id null → evento de conclusão sem oferta vinculada (provavelmente teremos que inferir pela janela de transação)
# - "transaction" com offer_id null → compra feita sem ligação com oferta ()
# - Em geral, número de "received" é bem maior que "viewed" e "completed"
# - Isso ajuda a achar ofertas que geram engajamento (view) mas não convertem em compra (completed)

# Possíveis usos dessa base:
# - Calcular taxas de conversão por oferta.
# - Avaliar quais ofertas têm melhor desempenho e quais flopam.
# - Cruzar com canais da oferta (web, email, mobile, social) pra ver qual canal traz mais resultado.

In [0]:
#Observando um cliente para tentar entender a jornada do cliente para diversas ofertas:
# Possíveis comportamentos por oferta (a partir do funil de eventos):
 # 1 - Na primeira oferta ele recebe > vê  > compra (da pra notar que o time_since_test_start de transaction == offer completed)
 # 2 - Ele faz uma transação sem vínculo a uma oferta
 # 3 - Ele recebe a oferta 9837 > vê > realiza 3 transações sem vínculo a oferta > deixa a oferta rolando
 # 4 - recebe oferta ddfd > vê > deixa rolando > recebe oferta e20d > nao le a oferta e20d > 1.25 dias depois ele transaciona (offer completed no mesmo dia, porem duplicada, pq se fossem duas comprar teríamos 2 transacoes no mesmo dia) alem disso nao sabemos se offer completed foi com relação  a ddfd ou  a 9837 (devemos olhar o tempo de vigência da oferta!!)
 # 5 - por fim ele finalmente visualiza a oferta e20d 

transactions.filter(f.col("profile_id")=="78afa995795e4d85b5d9ceeca43f5fef").orderBy("time_since_test_start").display()

#####1.3.2 - Unifying Databases

In [0]:
#Agora que entendemos individualmente o conceito das bases, vamos realizar o processso de enriquecimento de informações na base transacional para podermos avançar na construção da solução
#transactions, profile, offers
#Tomamos os valores distintos pois existem duplicidades na base,por exemplo temos 1 transacao para 2 ofertas completadas (mesma data hora), mesmo que seja possível finalizar 2 ofertas com uma transaçao, nao vamos considerar esses casos na solução.

#criando a coluna ID que concatena profile_id e offer_id
transactions_enriquecida = transactions\
    .join(offers, on=["offer_id"], how="left") \
    .join(profile, on=["profile_id"], how="left").withColumn("ID", f.concat_ws("_", "profile_id", "offer_id")).distinct()


In [0]:
analisar_dataframe(transactions_enriquecida, "profile_id")

In [0]:
#vamos analisar o cliente com mais transacoes na base
transactions.groupBy("profile_id").count().orderBy("count", ascending=False).limit(5).display()


In [0]:
transactions_enriquecida.filter(f.col("profile_id")=="94de646f7b6041228ca7dec82adb97d2").orderBy("time_since_test_start").limit(5).display()

In [0]:
#Vamos fazer uma análise parecida mas agora na base já enriquecida com informações de oferta e perfil

transactions_enriquecida.filter(f.col("profile_id")=="94de646f7b6041228ca7dec82adb97d2").orderBy("time_since_test_start").display()

In [0]:
#Como esperado, esse tipo de oferta não tem desconto nem valor mínimo, mas sim uma informação
transactions_enriquecida.filter(f.col("offer_type")=="informational").groupBy("discount_value","min_value").count().display()

In [0]:
#Os números em geral parecem fazer sentido:

# Ofertas Recebidas: Um total de 76,277 ofertas foram enviadas aos clientes.

#Ofertas Visualizadas: Dessas, 57,725 ofertas foram visualizadas pelos clientes, representando uma taxa de visualização de 75.6%. Isso #mostra que uma grande parte das ofertas chamou a atenção do público.

#Ofertas Completadas: Dentre as ofertas visualizadas, 30,617 ofertas resultaram em uma ação completada pelos clientes, gerando uma taxa #de conversão de 40.1%. Isso indica que muitos dos clientes que visualizaram as ofertas decidiram aproveitar alguma delas.

#Transações Totais: Considerando todas as transações realizadas, um total de 138,953 transações foi registrado. Dentre essas, 30,617 #transações foram associadas a ofertas, o que representa 22.1% das transações totais.
transactions_enriquecida.groupBy("event").count().display()


In [0]:
transactions_enriquecida.groupBy("event","offer_type").count().display()


In [0]:
#descrever a tabela nos permite verificar alguns problemas como idade 118
#verificar o range dos testes (30 dias)
#Os tratamentos de outliers, missings e possíveis problemas conceituais serao identificados e tratados posteriormente
transactions_enriquecida.describe().display()

In [0]:
#Encontramos 2175 clientes com idade 118, isso sugere que 118  deve ter um significado como ("nao informado")
#Vamos tratar essa informacao na criaçao do book.
profile.groupBy("age").count().orderBy("age").display()

###2 - Plan, Audience and Target

Nosso objetivo: 2. Desenvolver uma técnica/modelo que auxilie na decisão de qual oferta enviar para cada cliente

####2.1 - Plan

Agora que temos uma base transacional enriquecida e já compreendemos as informações precisamos definir o que faremos para resolver o problema proposto "Qual oferta enviar para cada cliente?". <br><br>
Proponho então um **modelo supervisionado de target binário** que preveja a probabilidade de conversão.<br>

**Público**: Cada linha representará um cliente recebendo uma oferta específica, ou seja o mesmo cliente poderá<br>
aparecer diversas vezes para cada tipo de oferta.<br>

**Target**: mede o sucesso da oferta, houve conversao 1 (sim) 0 (não)?<br>

**Variáveis explicativas**: Todas as variáveis respeitam o corte “até o momento da oferta”.<br>Nenhuma coluna usa dados posteriores ao offer received.

**Treino**: Conjunto aleatório de cliente-oferta (70%)<br>
**Teste**:  Restante do conjunto (30%)
    
Na hora de escorar vamos simular todas as ofertas possíveis e ver qual tem maior probabilidade de converter o cliente.<br>

Transações sem vínculo de oferta serão descartadas.


####2.2 - Audience

In [0]:
transactions.limit(3).display()

In [0]:
#Vamos definir nosso público, composto na granularidade de clientes e oferta
#Clientes que receberam pelo menos uma opferta
publico = transactions.filter(f.col("offer_id").isNotNull()).filter(f.col("event")=="offer received").withColumn("ID", f.concat_ws("_", "profile_id", "offer_id")).select("ID","profile_id","offer_id").distinct()

In [0]:
publico.limit(5).display()

In [0]:
analisar_dataframe(publico, "ID")

In [0]:
#Salvar o público para uso futuro
publico.toPandas().to_csv("../data/processed/publico.csv", sep=",",header=True,index=False)

####2.2 - Target

In [0]:
#Vamos precisar criar uma regra para vincular a oferta a conversão pois são valores ausentes
#Regra 1: Uma conversão é elegível para ser vinculada a uma oferta se no instante t,  ti ≤ t ≤ ti + duration, existe uma transaction == offer_completed em t, assim consideramos conversao bem sucedida target 1 caso contrário 0.

#Ainda poderemos ter mais de uma oferta em aberto no intervalo proposto,neste caso vamos verificar qual a oferta tem offer_viewed mais recente
#Caso ainda continue o empate, vamos considerar a offer received mais recente 

#Se continuar o empate vou descartar a oferta, pois não temos como decidir qual oferta foi a mais relevante.

In [0]:
from pyspark.sql import functions as f, Window

# =========================
# 0) Normalização de schema
# =========================
# Esperado: transactions_enriquecida com colunas:
# ID, profile_id, offer_id, event, time_since_test_start, offer_type, discount_value,
# min_value, duration, web, email, mobile, social, registered_on, age, gender, credit_card_limit

events = (transactions_enriquecida
          .withColumnRenamed("profile_id", "cliente_id")
          .withColumn("t", f.col("time_since_test_start").cast("double"))
          .withColumn("event", f.lower(f.col("event"))))

# ==========================================
# 1) Ofertas recebidas (âncora + janela)
# ==========================================
recv = (events.filter(f.col("event") == "offer received")
        .select(
            "cliente_id", "offer_id", "offer_type", "discount_value", "min_value",
            "duration", "web", "email", "mobile", "social", "t"
        )
        .withColumnRenamed("t", "start_t")
        .withColumn("end_t", f.col("start_t") + f.col("duration").cast("double")))

# ==========================================
# 2) Eventos-chave (carregar IDs para rastreio)
# ==========================================
tx = (events.filter(f.col("event") == "transaction")
      .select(f.col("ID").alias("tx_id"), "cliente_id", f.col("t").alias("t_tx")))

cmplt = (events.filter(f.col("event") == "offer completed")
         .select(f.col("ID").alias("cmplt_id"), "cliente_id", f.col("t").alias("t_cmplt")))

view = (events.filter(f.col("event") == "offer viewed")
        .select(f.col("ID").alias("view_id"), "cliente_id", "offer_id", f.col("t").alias("t_view")))

# ==========================================================
# 3) Parear transaction e completed no MESMO instante (t exato)
#    (se precisar lidar com ruído numérico, troque == por |t_tx - t_cmplt| <= 1e-6)
# ==========================================================
pairs = (cmplt.alias("c")
         .join(tx.alias("x"),
               on=[f.col("c.cliente_id") == f.col("x.cliente_id"),
                   f.col("c.t_cmplt") == f.col("x.t_tx")],
               how="inner")
         .select(
             f.col("c.cliente_id").alias("cliente_id"),
             f.col("c.t_cmplt").alias("t"),
             "cmplt_id", "tx_id"
         ))

# ==========================================================
# 4) Atribuir a uma oferta elegível no instante t (start_t ≤ t ≤ end_t)
# ==========================================================
elig = (pairs.alias("p")
        .join(recv.alias("o"),
              on=[f.col("p.cliente_id") == f.col("o.cliente_id"),
                  f.col("p.t").between(f.col("o.start_t"), f.col("o.end_t"))],
              how="inner")
        .select(
            f.col("p.cliente_id").alias("cliente_id"),
            f.col("p.t").alias("t"),
            "cmplt_id", "tx_id",
            f.col("o.offer_id").alias("offer_id"),
            "offer_type", "discount_value", "min_value", "duration",
            "web", "email", "mobile", "social", "start_t", "end_t"
        ))

# ==========================================================
# 5) Desempate:
#    1) última VIEW da MESMA oferta antes de t (mais próxima)
#    2) offer received mais recente (start_t maior)
#    3) (quebra determinística) offer_id asc
# ==========================================================
elig_v = (elig.alias("e")
          .join(view.alias("v"),
                on=[f.col("e.cliente_id") == f.col("v.cliente_id"),
                    f.col("e.offer_id") == f.col("v.offer_id"),
                    f.col("v.t_view") <= f.col("e.t")],
                how="left")
          .select(
              f.col("e.cliente_id").alias("cliente_id"),
              f.col("e.t").alias("t"),
              "cmplt_id", "tx_id",
              f.col("e.offer_id").alias("offer_id"),
              "offer_type", "discount_value", "min_value", "duration",
              "web", "email", "mobile", "social",
              "start_t", "end_t",
              f.col("v.t_view").alias("t_view")
          )
          .withColumn("has_view", f.when(f.col("t_view").isNotNull(), f.lit(1)).otherwise(f.lit(0)))
          .withColumn("dview", f.when(f.col("t_view").isNotNull(), f.col("t") - f.col("t_view")).otherwise(f.lit(None))))

w_rank = Window.partitionBy("cliente_id", "t").orderBy(
    f.col("has_view").desc(),
    f.col("dview").asc_nulls_last(),
    f.col("start_t").desc(),
    f.col("offer_id").asc()
)

ranked = (elig_v
          .withColumn("rank_elig", f.row_number().over(w_rank))
          .withColumn(  # tuple do score para detectar empates reais
              "score_tuple",
              f.array(
                  f.col("has_view").cast("double"),
                  f.coalesce(f.col("dview"), f.lit(1e9)).cast("double"),
                  f.col("start_t").cast("double")
              )
          ))

dups = (ranked
        .groupBy("cliente_id", "t", "score_tuple")
        .agg(f.count("*").alias("n_same_score")))

ranked = (ranked.alias("l")
          .join(dups.alias("r"),
                on=[f.col("l.cliente_id") == f.col("r.cliente_id"),
                    f.col("l.t") == f.col("r.t"),
                    f.col("l.score_tuple") == f.col("r.score_tuple")],
                how="left")
          .select("l.*", f.col("r.n_same_score"))
          .withColumn("ambiguous", f.when(f.col("n_same_score") > 1, f.lit(1)).otherwise(f.lit(0))))

top_unique = (ranked
              .filter(f.col("rank_elig") == 1)
              .filter(f.col("ambiguous") == 0)
              .select(
                  "cliente_id", "t", "offer_id",
                  "cmplt_id", "tx_id"  # rastreabilidade do match
              ))

# ==========================================================
# 6) Target por (cliente, oferta)
# ==========================================================
labels = (top_unique
          .groupBy("cliente_id", "offer_id")
          .agg(f.count("*").alias("qtd_matches"))
          .withColumn("target_sucesso", f.when(f.col("qtd_matches") > 0, f.lit(1)).otherwise(f.lit(0))))

# ==========================================================
# 7) Aplicar ao PÚBLICO
#    publico_df esperado com colunas: ID (do público), profile_id, offer_id
# ==========================================================
publico = (publico
           .withColumnRenamed("profile_id", "cliente_id")
           .select("ID", "cliente_id", "offer_id"))

publico_com_target = (publico
                      .join(labels, ["cliente_id", "offer_id"], "left")
                      .na.fill({"qtd_matches": 0, "target_sucesso": 0}))

# (opcional) conferir
# publico_com_target.show(truncate=False)


In [0]:
publico_com_target.limit(5).display()

In [0]:
#qtd matches é simplesmente a contagem de conversões únicas que conseguimos vincular àquele par (cliente, oferta) seguindo todas as regras que definimos.
#Na grande maioria das vezes encontramos o par cliente-oferta convertendo uma única vez!

from pyspark.sql import functions as f

df_com_significado = (publico_com_target
    .groupBy("target_sucesso", "qtd_matches")
    .count()
    .withColumn(
        "significado",
        f.when((f.col("target_sucesso") == 0) & (f.col("qtd_matches") == 0),
               f.lit("Recebeu oferta mas não converteu"))
         .when((f.col("target_sucesso") == 1) & (f.col("qtd_matches") == 1),
               f.lit("1 conversão vinculada à oferta"))
         .when((f.col("target_sucesso") == 1) & (f.col("qtd_matches") == 2),
               f.lit("2 conversões para a mesma oferta"))
         .when((f.col("target_sucesso") == 1) & (f.col("qtd_matches") == 3),
               f.lit("3 conversões para a mesma oferta"))
         .when((f.col("target_sucesso") == 1) & (f.col("qtd_matches") == 4),
               f.lit("4 conversões para a mesma oferta"))
         .when((f.col("target_sucesso") == 1) & (f.col("qtd_matches") == 5),
               f.lit("5 conversões para a mesma oferta"))
         .otherwise(f.lit("Outro caso"))
    )
).orderBy("count",ascending=False)

df_com_significado.display()


In [0]:
publico_com_target.columns

In [0]:
publico_com_target.select("ID","cliente_id","offer_id","target_sucesso").toPandas().to_csv("../data/processed/publico_com_target.csv", sep=",",header=True,index=False)

###3 - Predictive Variables

Vamos criar books de variáveis com conceitos variados para tentar explicar o comportamento de conversao por oferta!

**Book Perfil**: Características fixas ou de baixa variabilidade, que ajudam a diferenciar padrões de resposta a ofertas.<br>
**Book Compra**: Variáveis que resumem o histórico de transações do cliente antes da oferta.<br>
**Book interacao**: Métricas de engajamento com ofertas no passado.<br>
**Book Oferta Atual**: Informações da própria oferta que está sendo enviada — tipo (bogo, discount,informational).<br>
**Book Recência e Frequeência**: Informações<br>
**Book Cliente-Oferta**: Variáveis gerais na granularidade cliente-oferta,taxa tipo, canais etc.<br>
**Book Comunicacao**: Preferências históricas de engajamento com base nos canais disponíveis.<br>


####3.1 - Book Perfil

In [0]:
# idade                -> Idade declarada no cadastro do cliente, característica fixa.
# genero               -> Gênero informado no cadastro; manter valores nulos, sem imputar nesta fase.
# limite_credito       -> Limite do cartão de crédito registrado no cadastro; proxy de poder aquisitivo.
# safra_registro       -> Mede o tempo de relacionamento relativo entre o cliente e o ifood (2013 mais antigo, 2018 mais novo)
# historico_conversao  -> Proporção de ofertas anteriores convertidas; engajamento histórico pré-oferta.

In [0]:
from pyspark.sql import functions as f, Window

# Normalização e flags
events = (transactions_enriquecida
    .withColumnRenamed("profile_id", "cliente_id")
    .withColumn("time_since_test_start", f.col("time_since_test_start").cast("double"))
    .withColumn("is_offer_received", f.when(f.col("event")=="offer received", 1).otherwise(0))
    .withColumn("is_offer_completed", f.when(f.col("event")=="offer completed", 1).otherwise(0))
)

# Janela cumulativa por cliente até antes do evento atual
w_cli_time = (Window.partitionBy("cliente_id")
              .orderBy(f.col("time_since_test_start"))
              .rowsBetween(Window.unboundedPreceding, -1))

events_cum = (events
    .withColumn("cum_ofertas_recebidas", f.sum("is_offer_received").over(w_cli_time))
    .withColumn("cum_ofertas_completas", f.sum("is_offer_completed").over(w_cli_time))
)

# Selecionar apenas ofertas recebidas (ponto de corte para features)
ofertas_base = (events_cum
    .filter(f.col("event")=="offer received")
    .select(
        "ID", "cliente_id", "offer_id", "time_since_test_start",
        "age", "gender", "credit_card_limit", "registered_on",
        (f.col("cum_ofertas_recebidas") - 1).alias("qtd_ofertas_anteriores"),
        f.col("cum_ofertas_completas").alias("qtd_conv_anteriores")
    )
)

# Construir book_perfil
book_perfil = (ofertas_base
    .withColumn("idade", f.when(f.col("age") == 118, None).otherwise(f.col("age"))) #idade 118 se tornou null.
    .withColumn("genero", f.col("gender"))
    .withColumn("limite_credito", f.col("credit_card_limit"))
    .withColumn("safra_registro", f.substring(f.col("registered_on").cast("string"), 1, 4))  # de 2013 a 2018 clientes mais antigos
    .withColumn("historico_conversao",
                f.when(f.col("qtd_ofertas_anteriores") > 0,
                       f.col("qtd_conv_anteriores") / f.col("qtd_ofertas_anteriores"))
                 .otherwise(f.lit(0.0)))
    .select("ID", "cliente_id", "offer_id","time_since_test_start",
            "idade","genero","limite_credito","safra_registro",
            "historico_conversao")
)

book_perfil = filtra_mais_recente(book_perfil)

In [0]:
analisar_dataframe(filtra_mais_recente(book_perfil),"ID")

In [0]:
book_perfil.limit(5).display()

In [0]:
book_perfil.toPandas().to_csv("../data/processed/book_perfil.csv", sep=",",header=True,index=False)

####3.2 - Book Compras

In [0]:
# qtd_transacoes_anteriores    -> Número total de transações realizadas pelo cliente antes da oferta; indica histórico de consumo.
# dias_desde_ultima_transacao  -> Intervalo de dias desde a última transação até o envio da oferta; mede recência de compra.
# media_tempo_entre_transacoes -> Média de dias entre transações anteriores; indica regularidade no padrão de compra.
# transacoes_distintas_dias    -> Número de dias distintos em que o cliente fez transações antes da oferta; mede frequência de uso.

In [0]:
from pyspark.sql import functions as f, Window

# Criar flags para identificar transações
events_tx = (transactions_enriquecida
    .withColumnRenamed("profile_id", "cliente_id")
    .withColumn("time_since_test_start", f.col("time_since_test_start").cast("double"))
    .withColumn("is_transaction", f.when(f.col("event")=="transaction", 1).otherwise(0))
)

# Janela cumulativa até antes do evento atual, por cliente
w_cli_time = (Window.partitionBy("cliente_id")
              .orderBy(f.col("time_since_test_start"))
              .rowsBetween(Window.unboundedPreceding, -1))

# Janela de cliente para cálculo de diferença entre transações
w_cli_order = (Window.partitionBy("cliente_id")
               .orderBy(f.col("time_since_test_start")))

# Calcular acumulados e métricas temporais
events_cum = (events_tx
    .withColumn("qtd_transacoes_anteriores", f.sum("is_transaction").over(w_cli_time))
    .withColumn("ultima_tx_t",
                f.max(f.when(f.col("is_transaction")==1, f.col("time_since_test_start"))).over(w_cli_time))
    .withColumn("lag_t", f.lag("time_since_test_start").over(w_cli_order))
)

# Isolar linhas "offer received" (momento âncora)
ofertas_base = (events_cum
    .filter(f.col("event")=="offer received")
    .select(
        "ID", "cliente_id", "offer_id", "time_since_test_start",
        "qtd_transacoes_anteriores",
        # dias desde última transação
        (f.col("time_since_test_start") - f.col("ultima_tx_t")).alias("dias_desde_ultima_transacao")
    )
)

# Calcular média de tempo entre transações e dias distintos de compra
# Primeiro, todas as transações do cliente
tx_historico = (events_tx
    .filter(f.col("is_transaction")==1)
    .withColumn("lag_tx_t", f.lag("time_since_test_start").over(w_cli_order))
    .withColumn("tempo_entre_tx", f.col("time_since_test_start") - f.col("lag_tx_t"))
)

# Média de tempo entre transações e contagem de dias distintos (antes da oferta)
w_cli_before_offer = (Window.partitionBy("cliente_id","offer_id","ID"))

book_compra = (ofertas_base
    .join(
        tx_historico.groupBy("cliente_id")
            .agg(f.avg("tempo_entre_tx").alias("media_tempo_entre_transacoes"),
                 f.countDistinct(f.floor("time_since_test_start")).alias("transacoes_distintas_dias")),
        on="cliente_id",
        how="left"
    )
    .select("ID", "cliente_id", "offer_id","time_since_test_start",
            "qtd_transacoes_anteriores",
            "dias_desde_ultima_transacao",
            "media_tempo_entre_transacoes",
            "transacoes_distintas_dias")
)

book_compra = filtra_mais_recente(book_compra)


In [0]:
analisar_dataframe(filtra_mais_recente(book_compra),"ID")

In [0]:
book_compra.limit(5).display()

In [0]:
book_compra.toPandas().to_csv("../data/processed/book_compra.csv", sep=",",header=True,index=False)

####3.3 - Book Interação

In [0]:
# qtd_ofertas_anteriores        -> Número de ofertas recebidas antes da oferta atual; mede exposição prévia a campanhas.
# qtd_ofertas_visualizadas      -> Número de ofertas visualizadas antes da oferta atual; indica interesse em abrir ofertas.
# taxa_visualizacao             -> Proporção de ofertas recebidas que foram visualizadas antes da oferta atual; engajamento inicial.
# tempo_medio_visualizacao      -> Tempo médio (dias) entre receber e visualizar ofertas anteriores; rapidez na interação.
# qtd_ofertas_completas_validas -> Número de ofertas com conversão válida antes da oferta atual; medida conservadora de sucesso prévio.
# taxa_conversao                -> Proporção de ofertas anteriores com conversão válida; engajamento efetivo.
# tempo_medio_conversao         -> Tempo médio (dias) entre visualizar e converter (válido) ofertas anteriores; rapidez para concluir.

In [0]:
# ============================================
# BOOK 3 — Histórico de interação com ofertas
# ============================================
# Saída: book_interacao com as colunas:
# - qtd_ofertas_anteriores, qtd_ofertas_visualizadas, taxa_visualizacao
# - tempo_medio_visualizacao, tempo_medio_conversao
# - qtd_ofertas_completas_validas, taxa_conversao
#
# Observação:
# - Conversão válida = (transaction == offer completed no mesmo t) + oferta elegível na janela
# - Tempos médios calculados apenas a partir de instâncias anteriores (sem olhar o futuro)
# - Não usamos 'amount' (não existe nesta base)

from pyspark.sql import functions as f, Window

# =========================
# 0) Normalização base
# =========================
ev = (transactions_enriquecida
      .withColumnRenamed("profile_id","cliente_id")
      .withColumn("time_since_test_start", f.col("time_since_test_start").cast("double"))
      .withColumn("event", f.lower(f.col("event"))))

# Tabelas por tipo de evento
recv = (ev.filter(f.col("event")=="offer received")
          .select("ID","cliente_id","offer_id","time_since_test_start",
                  "offer_type","discount_value","min_value","duration","web","email","mobile","social")
          .withColumnRenamed("time_since_test_start","start_t")
          .withColumn("end_t", f.col("start_t") + f.col("duration").cast("double")))

view = (ev.filter(f.col("event")=="offer viewed")
          .select("ID","cliente_id","offer_id", f.col("time_since_test_start").alias("t_view")))

tx   = (ev.filter(f.col("event")=="transaction")
          .select("cliente_id", f.col("time_since_test_start").alias("t_tx")))

cmpl = (ev.filter(f.col("event")=="offer completed")
          .select("cliente_id", f.col("time_since_test_start").alias("t_cmplt")))

# ======================================================
# 1) Parear transaction e completed no MESMO instante t
#    (prova de conversão) — estrito, sem epsilon
# ======================================================
pairs = (cmpl.alias("c")
         .join(tx.alias("x"),
               on=[f.col("c.cliente_id")==f.col("x.cliente_id"),
                   f.col("c.t_cmplt")==f.col("x.t_tx")],
               how="inner")
         .select(
             f.col("c.cliente_id").alias("cliente_id"),
             f.col("c.t_cmplt").alias("t")
         ))

# ===========================================
# 2) Ofertas elegíveis no instante t (janela)
# ===========================================
elig = (pairs.alias("p")
        .join(recv.alias("o"),
              on=[f.col("p.cliente_id")==f.col("o.cliente_id"),
                  f.col("p.t").between(f.col("o.start_t"), f.col("o.end_t"))],
              how="inner")
        .select(
            f.col("p.cliente_id").alias("cliente_id"),
            f.col("p.t").alias("t"),
            # Instância da oferta (ID do received) — importante p/ acumular históricos por instância
            f.col("o.ID").alias("offer_instance_id"),
            f.col("o.offer_id").alias("offer_id"),
            f.col("o.start_t").alias("start_t"),
            f.col("o.end_t").alias("end_t")
        ))

# =========================================================
# 3) Desempate por VIEW da MESMA oferta antes de t, depois
#    recência do received (start_t) e offer_id determinístico
# =========================================================
elig_v = (elig.alias("e")
          .join(view.alias("v"),
                on=[f.col("e.cliente_id")==f.col("v.cliente_id"),
                    f.col("e.offer_id")==f.col("v.offer_id"),
                    f.col("v.t_view") <= f.col("e.t")],
                how="left")
          .select(
              f.col("e.cliente_id").alias("cliente_id"),
              f.col("e.t").alias("t"),
              f.col("e.offer_instance_id").alias("offer_instance_id"),
              f.col("e.offer_id").alias("offer_id"),
              f.col("e.start_t").alias("start_t"),
              f.col("e.end_t").alias("end_t"),
              f.col("v.t_view").alias("t_view")
          )
          .withColumn("has_view", f.when(f.col("t_view").isNotNull(), f.lit(1)).otherwise(f.lit(0)))
          .withColumn("dview", f.when(f.col("t_view").isNotNull(),
                                      f.col("t") - f.col("t_view")).otherwise(f.lit(None))))

w_rank = Window.partitionBy("cliente_id","t").orderBy(
    f.col("has_view").desc(),
    f.col("dview").asc_nulls_last(),
    f.col("start_t").desc(),
    f.col("offer_id").asc(),
    f.col("offer_instance_id").asc()
)

ranked = (elig_v
          .withColumn("rank_elig", f.row_number().over(w_rank))
          .withColumn("score_tuple",
                      f.array(
                          f.col("has_view").cast("double"),
                          f.coalesce(f.col("dview"), f.lit(1e9)).cast("double"),
                          f.col("start_t").cast("double")
                      )))

dups = (ranked.groupBy("cliente_id","t","score_tuple")
        .agg(f.count("*").alias("n_same_score")))

conv_valid = (ranked.alias("l")
              .join(dups.alias("r"),
                    on=[f.col("l.cliente_id")==f.col("r.cliente_id"),
                        f.col("l.t")==f.col("r.t"),
                        f.col("l.score_tuple")==f.col("r.score_tuple")],
                    how="left")
              .filter(f.col("l.rank_elig")==1)
              .filter((f.col("r.n_same_score").isNull()) | (f.col("r.n_same_score")==1))
              .select(
                  f.col("l.cliente_id").alias("cliente_id"),
                  f.col("l.t").alias("t_conv_valid"),
                  f.col("l.offer_id").alias("offer_id"),
                  f.col("l.offer_instance_id").alias("offer_instance_id")
              ))

# =========================================================
# 4) Tempos por instância anterior (received/viewed/conv válida)
#    - delta_view = first_t_view - start_t
#    - delta_conv = first_t_conv_valid - first_t_view
# =========================================================
# a) primeiro view por instância (se houver)
first_view_by_instance = (view.groupBy("ID")
                          .agg(f.min("t_view").alias("first_t_view")))

# b) primeiro t de conversão válida por instância (se houver)
first_conv_by_instance = (conv_valid.groupBy("offer_instance_id")
                          .agg(f.min("t_conv_valid").alias("first_t_conv_valid"))
                          .withColumnRenamed("offer_instance_id","ID"))

# c) tabela de instâncias (received) com start_t
instances = recv.select("ID","cliente_id","offer_id","start_t")

# d) calcular deltas por instância
instances_with_times = (instances
    .join(first_view_by_instance, on="ID", how="left")
    .join(first_conv_by_instance, on="ID", how="left")
    .withColumn("delta_view",
                f.when(f.col("first_t_view").isNotNull(),
                       f.col("first_t_view") - f.col("start_t"))
                 .otherwise(f.lit(None)))
    .withColumn("delta_conv",
                f.when(f.col("first_t_conv_valid").isNotNull() & f.col("first_t_view").isNotNull(),
                       f.col("first_t_conv_valid") - f.col("first_t_view"))
                 .otherwise(f.lit(None)))
)

# =========================================================
# 5) Acumulados por cliente até ANTES da oferta atual
#    (médias históricas de delta_view/delta_conv)
# =========================================================
w_cum_delta = Window.partitionBy("cliente_id").orderBy("start_t") \
                    .rowsBetween(Window.unboundedPreceding, -1)

inst_agg_prior = (instances_with_times
    .withColumn("cum_sum_delta_view",
                f.sum(f.when(f.col("delta_view").isNotNull(), f.col("delta_view"))
                       .otherwise(f.lit(0.0))).over(w_cum_delta))
    .withColumn("cum_cnt_delta_view",
                f.sum(f.when(f.col("delta_view").isNotNull(), f.lit(1))
                       .otherwise(f.lit(0))).over(w_cum_delta))
    .withColumn("cum_sum_delta_conv",
                f.sum(f.when(f.col("delta_conv").isNotNull(), f.col("delta_conv"))
                       .otherwise(f.lit(0.0))).over(w_cum_delta))
    .withColumn("cum_cnt_delta_conv",
                f.sum(f.when(f.col("delta_conv").isNotNull(), f.lit(1))
                       .otherwise(f.lit(0))).over(w_cum_delta))
    .withColumn("tempo_medio_visualizacao",
                f.when(f.col("cum_cnt_delta_view")>0,
                       f.col("cum_sum_delta_view")/f.col("cum_cnt_delta_view"))
                 .otherwise(f.lit(None)))
    .withColumn("tempo_medio_conversao",
                f.when(f.col("cum_cnt_delta_conv")>0,
                       f.col("cum_sum_delta_conv")/f.col("cum_cnt_delta_conv"))
                 .otherwise(f.lit(None)))
    .select("ID","cliente_id","start_t","tempo_medio_visualizacao","tempo_medio_conversao")
)

# =========================================================
# 6) Cumulativos "simples" de oferta/view até ANTES da oferta atual
# =========================================================
ev_flags = (ev
    .withColumn("is_offer_received", f.when(f.col("event")=="offer received", 1).otherwise(0))
    .withColumn("is_offer_viewed",   f.when(f.col("event")=="offer viewed", 1).otherwise(0))
)

w_hist = Window.partitionBy("cliente_id").orderBy("time_since_test_start") \
               .rowsBetween(Window.unboundedPreceding, -1)

cum_before = (ev_flags
    .withColumn("qtd_ofertas_anteriores",   f.sum("is_offer_received").over(w_hist))
    .withColumn("qtd_ofertas_visualizadas", f.sum("is_offer_viewed").over(w_hist))
)

# Âncoras (ofertas atuais) com cumulativos antes
anchors = (cum_before.filter(f.col("event")=="offer received")
           .select(
               f.col("ID").alias("ID"),
               f.col("cliente_id").alias("cliente_id"),
               f.col("offer_id").alias("offer_id"),
               f.col("time_since_test_start").alias("t_oferta"),
               f.col("qtd_ofertas_anteriores").alias("qtd_ofertas_anteriores"),
               f.col("qtd_ofertas_visualizadas").alias("qtd_ofertas_visualizadas")
           ))

# =========================================================
# 7) Juntar âncoras com históricos anteriores e calcular métricas
# =========================================================
# (a) tempos médios históricos até antes da oferta atual
prior = inst_agg_prior.select("cliente_id","start_t","tempo_medio_visualizacao","tempo_medio_conversao")

left1 = (anchors.alias("A")
         .join(prior.alias("H"),
               (f.col("A.cliente_id")==f.col("H.cliente_id")) & (f.col("H.start_t") < f.col("A.t_oferta")),
               "left"))

agg1 = (left1
        .groupBy("A.ID","A.cliente_id","A.offer_id","A.t_oferta",
                 "A.qtd_ofertas_anteriores","A.qtd_ofertas_visualizadas")
        .agg(
            f.max("H.tempo_medio_visualizacao").alias("tempo_medio_visualizacao"),
            f.max("H.tempo_medio_conversao").alias("tempo_medio_conversao")
        ))

# (b) conversões válidas acumuladas até antes da oferta atual
cv_prior = conv_valid.select("cliente_id","t_conv_valid","offer_instance_id")

left2 = (agg1.alias("A")
         .join(cv_prior.alias("CV"),
               (f.col("A.cliente_id")==f.col("CV.cliente_id")) & (f.col("CV.t_conv_valid") < f.col("A.t_oferta")),
               "left"))

book_interacao = (left2
    .groupBy("A.ID","A.cliente_id","A.offer_id","A.t_oferta",
             "A.qtd_ofertas_anteriores","A.qtd_ofertas_visualizadas",
             "A.tempo_medio_visualizacao","A.tempo_medio_conversao")
    .agg(f.count("CV.offer_instance_id").alias("qtd_ofertas_completas_validas"))
    .withColumn("taxa_visualizacao",
                f.when(f.col("A.qtd_ofertas_anteriores")>0,
                       f.col("A.qtd_ofertas_visualizadas")/f.col("A.qtd_ofertas_anteriores"))
                 .otherwise(f.lit(0.0)))
    .withColumn("taxa_conversao",
                f.when(f.col("A.qtd_ofertas_anteriores")>0,
                       f.col("qtd_ofertas_completas_validas")/f.col("A.qtd_ofertas_anteriores"))
                 .otherwise(f.lit(0.0)))
    .select(
        f.col("A.ID").alias("ID"),
        f.col("A.cliente_id").alias("cliente_id"),
        f.col("A.t_oferta").alias("time_since_test_start"),
        f.col("A.offer_id").alias("offer_id"),
        f.col("A.qtd_ofertas_anteriores").alias("qtd_ofertas_anteriores"),
        f.col("A.qtd_ofertas_visualizadas").alias("qtd_ofertas_visualizadas"),
        "tempo_medio_visualizacao","tempo_medio_conversao",
        "qtd_ofertas_completas_validas",
        "taxa_visualizacao","taxa_conversao"
    )
)

book_interacao = filtra_mais_recente(book_interacao)

In [0]:
analisar_dataframe(filtra_mais_recente(book_interacao),"ID")

In [0]:
# sanidade da base
print("anchors:", anchors.count(),
      "| inst_agg_prior:", inst_agg_prior.count(),
      "| conv_valid:", conv_valid.count())
book_interacao.limit(10).display()

In [0]:
book_interacao.toPandas().to_csv("../data/processed/book_interacao.csv", sep=",",header=True,index=False)

####3.4 - Book Oferta Atual

In [0]:
# offer_type        -> Tipo da oferta enviada (bogo, discount, informational); define a mecânica do incentivo.
# discount_value    -> Valor do desconto ou crédito oferecido; proxy de atratividade da oferta.
# min_value         -> Valor mínimo de compra para ativar a oferta; afeta a barreira de conversão.
# duration          -> Duração total da oferta em dias, a partir do recebimento.
# web               -> Flag indicando se a oferta foi enviada pelo canal web; impacto no alcance.
# email             -> Flag indicando se a oferta foi enviada por e-mail; impacto no alcance.
# mobile            -> Flag indicando se a oferta foi enviada pelo canal mobile; impacto no alcance.
# social            -> Flag indicando se a oferta foi enviada via mídias sociais; impacto no alcance.

In [0]:
from pyspark.sql import functions as f

# Partimos de transactions_enriquecida filtrando apenas eventos 'offer received'
book_oferta = (transactions_enriquecida
    .withColumnRenamed("profile_id", "cliente_id")
    .withColumn("event", f.lower(f.col("event")))
    .filter(f.col("event") == "offer received")
    .select(
        "ID", "cliente_id", "offer_id", "time_since_test_start",
        "offer_type", "discount_value", "min_value", "duration",
        "web", "email", "mobile", "social"
    )
)

book_oferta = filtra_mais_recente(book_oferta)

In [0]:
analisar_dataframe(filtra_mais_recente(book_oferta),"ID")

In [0]:
book_oferta.toPandas().to_csv("../data/processed/book_oferta.csv", sep=",",header=True,index=False)

####3.5 - Book Recência e Frequência

In [0]:
# dias_desde_ultima_transacao        -> Dias desde a última transação até o envio da oferta; mede recência de compra.
# dias_desde_ultima_oferta_recebida  -> Dias desde a última oferta recebida (antes da atual); mede cadência de comunicação.
# dias_desde_ultima_oferta_visualizada -> Dias desde a última visualização de oferta; sinal de engajamento recente.
# dias_desde_ultima_conversao_valida -> Dias desde a última conversão válida (transaction==completed no mesmo t); sucesso recente.
# n_transacoes_7d                    -> Nº de transações nos 7 dias anteriores ao envio da oferta; frequência recente de compra.
# n_transacoes_14d                   -> Nº de transações nos 14 dias anteriores; frequência recente mais ampla.
# n_ofertas_recebidas_14d             -> Nº de ofertas recebidas nos 14 dias anteriores; saturação recente de comunicação.
# n_ofertas_visualizadas_14d          -> Nº de ofertas visualizadas nos 14 dias anteriores; engajamento recente com promoções.
# n_conversoes_validas_14d            -> Nº de conversões válidas nos 14 dias anteriores; sucesso recente com promoções.

In [0]:
from pyspark.sql import functions as f, Window

# =========================
# 0) Normalização base
# =========================
ev = (transactions_enriquecida
      .withColumnRenamed("profile_id","cliente_id")
      .withColumn("time_since_test_start", f.col("time_since_test_start").cast("double"))
      .withColumn("event", f.lower(f.col("event"))))

recv = (ev.filter(f.col("event")=="offer received")
          .select("ID","cliente_id","offer_id","time_since_test_start",
                  "offer_type","discount_value","min_value","duration",
                  "web","email","mobile","social")
          .withColumnRenamed("time_since_test_start","start_t")
          .withColumn("end_t", f.col("start_t") + f.col("duration").cast("double")))

view = (ev.filter(f.col("event")=="offer viewed")
          .select("ID","cliente_id","offer_id", f.col("time_since_test_start").alias("t_view")))

tx   = ev.filter(f.col("event")=="transaction") \
         .select("cliente_id", f.col("time_since_test_start").alias("t_tx"))

cmpl = ev.filter(f.col("event")=="offer completed") \
          .select("cliente_id", f.col("time_since_test_start").alias("t_cmplt"))

# =========================
# 1) Conversão VÁLIDA (recriada)
# =========================
pairs = (cmpl.alias("c")
         .join(tx.alias("x"),
               on=[f.col("c.cliente_id")==f.col("x.cliente_id"),
                   f.col("c.t_cmplt")==f.col("x.t_tx")],
               how="inner")
         .select(f.col("c.cliente_id").alias("cliente_id"),
                 f.col("c.t_cmplt").alias("t")))

elig = (pairs.alias("p")
        .join(recv.alias("o"),
              on=[f.col("p.cliente_id")==f.col("o.cliente_id"),
                  f.col("p.t").between(f.col("o.start_t"), f.col("o.end_t"))],
              how="inner")
        .select(
            f.col("p.cliente_id").alias("cliente_id"),
            f.col("p.t").alias("t"),
            f.col("o.ID").alias("offer_instance_id"),
            f.col("o.offer_id").alias("offer_id"),
            f.col("o.start_t").alias("start_t")
        ))

elig_v = (elig.alias("e")
          .join(view.alias("v"),
                on=[f.col("e.cliente_id")==f.col("v.cliente_id"),
                    f.col("e.offer_id")==f.col("v.offer_id"),
                    f.col("v.t_view") <= f.col("e.t")],
                how="left")
          .select(
              f.col("e.cliente_id").alias("cliente_id"),
              f.col("e.t").alias("t"),
              f.col("e.offer_instance_id").alias("offer_instance_id"),
              f.col("e.offer_id").alias("offer_id"),
              f.col("e.start_t").alias("start_t"),
              f.col("v.t_view").alias("t_view")
          )
          .withColumn("has_view", f.when(f.col("t_view").isNotNull(), 1).otherwise(0))
          .withColumn("dview", f.when(f.col("t_view").isNotNull(),
                                      f.col("t") - f.col("t_view")).otherwise(f.lit(None))))

w_rank = Window.partitionBy("cliente_id","t").orderBy(
    f.col("has_view").desc(),
    f.col("dview").asc_nulls_last(),
    f.col("start_t").desc(),
    f.col("offer_id").asc(),
    f.col("offer_instance_id").asc()
)

ranked = (elig_v
          .withColumn("rank_elig", f.row_number().over(w_rank))
          .withColumn("score_tuple", f.array(
              f.col("has_view").cast("double"),
              f.coalesce(f.col("dview"), f.lit(1e9)).cast("double"),
              f.col("start_t").cast("double")
          )))

dups = (ranked.groupBy("cliente_id","t","score_tuple")
        .agg(f.count("*").alias("n_same_score")))

conv_valid = (ranked.alias("l")
              .join(dups.alias("r"),
                    on=[f.col("l.cliente_id")==f.col("r.cliente_id"),
                        f.col("l.t")==f.col("r.t"),
                        f.col("l.score_tuple")==f.col("r.score_tuple")],
                    how="left")
              .filter(f.col("l.rank_elig")==1)
              .filter((f.col("r.n_same_score").isNull()) | (f.col("r.n_same_score")==1))
              .select(
                  f.col("l.cliente_id").alias("cliente_id"),
                  f.col("l.t").alias("t_conv_valid"),
                  f.col("l.offer_id").alias("offer_id"),
                  f.col("l.offer_instance_id").alias("offer_instance_id")
              ))

# =========================
# 2) Timeline com flags
# =========================
ev_flags = (ev
    .withColumn("is_offer_received", f.when(f.col("event")=="offer received", 1).otherwise(0))
    .withColumn("is_offer_viewed",   f.when(f.col("event")=="offer viewed",   1).otherwise(0))
    .withColumn("is_transaction",    f.when(f.col("event")=="transaction",    1).otherwise(0))
    .select("ID","cliente_id","offer_id","time_since_test_start","event",
            "is_offer_received","is_offer_viewed","is_transaction")
)

conv_events = (conv_valid
    .select(f.col("cliente_id"),
            f.col("t_conv_valid").alias("time_since_test_start"))
    .withColumn("is_conv_valid", f.lit(1)))

timeline = (ev_flags
    .join(conv_events, on=["cliente_id","time_since_test_start"], how="left")
    .na.fill({"is_conv_valid": 0})
)

# =========================
# 3) Janelas
# =========================
w_hist = Window.partitionBy("cliente_id").orderBy("time_since_test_start") \
               .rowsBetween(Window.unboundedPreceding, -1)

# janelas móveis (excluindo o instante atual)
t_sec = (f.col("time_since_test_start") * f.lit(86400)).cast("long")

w7  = Window.partitionBy("cliente_id").orderBy(t_sec).rangeBetween(-7*86400, -1)
w14 = Window.partitionBy("cliente_id").orderBy(t_sec).rangeBetween(-14*86400, -1)

# =========================
# 4) Métricas
# =========================
timeline_fe = (timeline
    .withColumn("last_tx_t",   f.max(f.when(f.col("is_transaction")==1,    f.col("time_since_test_start"))).over(w_hist))
    .withColumn("last_rcv_t",  f.max(f.when(f.col("is_offer_received")==1, f.col("time_since_test_start"))).over(w_hist))
    .withColumn("last_view_t", f.max(f.when(f.col("is_offer_viewed")==1,   f.col("time_since_test_start"))).over(w_hist))
    .withColumn("last_conv_t", f.max(f.when(f.col("is_conv_valid")==1,     f.col("time_since_test_start"))).over(w_hist))
    .withColumn("qtd_transacoes_7d",              f.sum(f.col("is_transaction")).over(w7))
    .withColumn("qtd_transacoes_14d",             f.sum(f.col("is_transaction")).over(w14))
    .withColumn("qtd_ofertas_recebidas_14d",      f.sum(f.col("is_offer_received")).over(w14))
    .withColumn("qtd_ofertas_visualizadas_14d",   f.sum(f.col("is_offer_viewed")).over(w14))
    .withColumn("qtd_conversoes_validas_14d",     f.sum(f.col("is_conv_valid")).over(w14))
)

# =========================
# 5) Saída final no instante da oferta
# =========================
book_recencia_freq = (timeline_fe
    .filter(f.col("event")=="offer received")
    .select(
        "ID","cliente_id","offer_id","time_since_test_start",
        (f.col("time_since_test_start") - f.col("last_tx_t")).alias("dias_desde_ultima_transacao"),
        (f.col("time_since_test_start") - f.col("last_rcv_t")).alias("dias_desde_ultima_oferta_recebida"),
        (f.col("time_since_test_start") - f.col("last_view_t")).alias("dias_desde_ultima_oferta_visualizada"),
        (f.col("time_since_test_start") - f.col("last_conv_t")).alias("dias_desde_ultima_conversao_valida"),
        "qtd_transacoes_7d","qtd_transacoes_14d","qtd_ofertas_recebidas_14d",
        "qtd_ofertas_visualizadas_14d","qtd_conversoes_validas_14d"
    )
)

book_recencia_freq = filtra_mais_recente(book_recencia_freq)

In [0]:
analisar_dataframe(filtra_mais_recente(book_recencia_freq),"ID")

In [0]:
book_recencia_freq.limit(5).display()

In [0]:
book_recencia_freq.toPandas().to_csv("../data/processed/book_recencia_freq.csv", sep=",",header=True,index=False)

####3.6 - Book Comunicação

In [0]:
# canais_recebidos_total   -> Quantidade total de canais distintos pelos quais a oferta foi recebida (web, email, mobile, social); indica variedade de contato.
# recebeu_web              -> Flag (0/1) indicando se a oferta foi enviada via canal web.
# recebeu_email            -> Flag (0/1) indicando se a oferta foi enviada via canal email.
# recebeu_mobile           -> Flag (0/1) indicando se a oferta foi enviada via canal mobile.
# recebeu_social           -> Flag (0/1) indicando se a oferta foi enviada via canal social.
# canais_preferidos        -> Quantidade de canais que o cliente já recebeu historicamente (antes da oferta atual); indica hábito de recepção.
# web_ratio_historico      -> Proporção de ofertas anteriores enviadas via web.
# email_ratio_historico    -> Proporção de ofertas anteriores enviadas via email.
# mobile_ratio_historico   -> Proporção de ofertas anteriores enviadas via mobile.
# social_ratio_historico   -> Proporção de ofertas anteriores enviadas via social.

In [0]:
from pyspark.sql import functions as f, Window

# Normalização inicial
events = (transactions_enriquecida
    .withColumnRenamed("profile_id", "cliente_id")
    .withColumn("time_since_test_start", f.col("time_since_test_start").cast("double"))
)

# Janela acumulada por cliente antes do evento atual
w_cli_time = (Window.partitionBy("cliente_id")
              .orderBy(f.col("time_since_test_start"))
              .rowsBetween(Window.unboundedPreceding, -1))

# Adiciona cumulativos de canais recebidos historicamente
events_cum = (events
    .withColumn("hist_web", f.sum(f.col("web")).over(w_cli_time))
    .withColumn("hist_email", f.sum(f.col("email")).over(w_cli_time))
    .withColumn("hist_mobile", f.sum(f.col("mobile")).over(w_cli_time))
    .withColumn("hist_social", f.sum(f.col("social")).over(w_cli_time))
)

# Seleciona apenas a oferta recebida (âncora para as variáveis)
ofertas_base = (events_cum
    .filter(f.col("event")=="offer received")
    .select(
        "ID", "cliente_id", "offer_id", "time_since_test_start",
        "web", "email", "mobile", "social",
        (f.col("hist_web") - f.col("web")).alias("hist_web_prev"),
        (f.col("hist_email") - f.col("email")).alias("hist_email_prev"),
        (f.col("hist_mobile") - f.col("mobile")).alias("hist_mobile_prev"),
        (f.col("hist_social") - f.col("social")).alias("hist_social_prev")
    )
)

# Monta book de comunicação
book_comunicacao = (ofertas_base
    # Flags do envio atual
    .withColumn("recebeu_web", f.col("web"))
    .withColumn("recebeu_email", f.col("email"))
    .withColumn("recebeu_mobile", f.col("mobile"))
    .withColumn("recebeu_social", f.col("social"))
    .withColumn("canais_recebidos_total", f.expr("web + email + mobile + social"))
    # Histórico antes da oferta atual
    .withColumn("canais_preferidos", 
                (f.col("hist_web_prev")>0).cast("int") +
                (f.col("hist_email_prev")>0).cast("int") +
                (f.col("hist_mobile_prev")>0).cast("int") +
                (f.col("hist_social_prev")>0).cast("int"))
    .withColumn("web_ratio_historico", 
                f.when((f.col("hist_web_prev")+f.col("hist_email_prev")+f.col("hist_mobile_prev")+f.col("hist_social_prev"))>0,
                       f.col("hist_web_prev") / (f.col("hist_web_prev")+f.col("hist_email_prev")+f.col("hist_mobile_prev")+f.col("hist_social_prev")))
                 .otherwise(f.lit(0.0)))
    .withColumn("email_ratio_historico", 
                f.when((f.col("hist_web_prev")+f.col("hist_email_prev")+f.col("hist_mobile_prev")+f.col("hist_social_prev"))>0,
                       f.col("hist_email_prev") / (f.col("hist_web_prev")+f.col("hist_email_prev")+f.col("hist_mobile_prev")+f.col("hist_social_prev")))
                 .otherwise(f.lit(0.0)))
    .withColumn("mobile_ratio_historico", 
                f.when((f.col("hist_web_prev")+f.col("hist_email_prev")+f.col("hist_mobile_prev")+f.col("hist_social_prev"))>0,
                       f.col("hist_mobile_prev") / (f.col("hist_web_prev")+f.col("hist_email_prev")+f.col("hist_mobile_prev")+f.col("hist_social_prev")))
                 .otherwise(f.lit(0.0)))
    .withColumn("social_ratio_historico", 
                f.when((f.col("hist_web_prev")+f.col("hist_email_prev")+f.col("hist_mobile_prev")+f.col("hist_social_prev"))>0,
                       f.col("hist_social_prev") / (f.col("hist_web_prev")+f.col("hist_email_prev")+f.col("hist_mobile_prev")+f.col("hist_social_prev")))
                 .otherwise(f.lit(0.0)))
    .select(
        "ID","cliente_id","offer_id","time_since_test_start",
        "canais_recebidos_total","recebeu_web","recebeu_email","recebeu_mobile","recebeu_social",
        "canais_preferidos","web_ratio_historico","email_ratio_historico","mobile_ratio_historico","social_ratio_historico"
    )
)

book_comunicacao = filtra_mais_recente(book_comunicacao)


In [0]:
analisar_dataframe(filtra_mais_recente(book_comunicacao),"ID")

In [0]:
book_comunicacao.toPandas().to_csv("../data/processed/book_comunicacao.csv", sep=",",header=True,index=False)

####3.7 - Book Cliente-Oferta

In [0]:
# taxa_visualizacao_historica_tipo       -> % de ofertas do MESMO tipo (bogo/discount/...) visualizadas pelo cliente ANTES desta oferta.
# taxa_conversao_historica_tipo         -> % de ofertas do MESMO tipo convertidas (válidas) pelo cliente ANTES desta oferta.
# taxa_visualizacao_historica_oferta    -> % de instâncias anteriores desta MESMA offer_id visualizadas pelo cliente.
# taxa_conversao_historica_oferta       -> % de instâncias anteriores desta MESMA offer_id convertidas (válidas) pelo cliente.
# taxa_conversao_historica_canal_web    -> % de ofertas anteriores com canal web que o cliente converteu (válidas).
# taxa_conversao_historica_canal_email  -> % de ofertas anteriores com canal email que o cliente converteu (válidas).
# taxa_conversao_historica_canal_mobile -> % de ofertas anteriores com canal mobile que o cliente converteu (válidas).
# taxa_conversao_historica_canal_social -> % de ofertas anteriores com canal social que o cliente converteu (válidas).
# afinidade_canais_conv                 -> Média das taxas de conversão históricas dos CANAIS ATIVOS desta oferta (ex.: média entre web/email se ambos=1).
# taxa_conversao_historica_bucket_duracao   -> % de ofertas anteriores no MESMO bucket de duração convertidas (válidas).
# taxa_conversao_historica_bucket_desconto  -> % de ofertas anteriores no MESMO bucket de desconto convertidas (válidas).
# taxa_conversao_historica_bucket_minimo    -> % de ofertas anteriores no MESMO bucket de valor mínimo convertidas (válidas).
# n_instancias_anteriores_mesma_oferta      -> Nº de vezes que o cliente já recebeu esta MESMA offer_id antes.
# n_conversoes_validas_anteriores_mesma     -> Nº de conversões válidas anteriores nesta MESMA offer_id.
# dias_desde_ultima_oferta_mesma            -> Dias desde a última vez que recebeu esta MESMA offer_id.
# dias_desde_ultima_conversao_valida_mesma  -> Dias desde a última conversão válida desta MESMA offer_id.


In [0]:
from pyspark.sql import functions as f, Window

# ============= 0) Normalização base =============
ev = (transactions_enriquecida
      .withColumnRenamed("profile_id","cliente_id")
      .withColumn("time_since_test_start", f.col("time_since_test_start").cast("double"))
      .withColumn("event", f.lower(f.col("event"))))

recv = (ev.filter(f.col("event")=="offer received")
          .select("ID","cliente_id","offer_id","offer_type","discount_value","min_value","duration",
                  "web","email","mobile","social", f.col("time_since_test_start").alias("start_t"))
          .withColumn("end_t", f.col("start_t") + f.col("duration").cast("double")))

view = ev.filter(f.col("event")=="offer viewed") \
         .select("ID","cliente_id","offer_id", f.col("time_since_test_start").alias("t_view"))

tx   = ev.filter(f.col("event")=="transaction") \
         .select("cliente_id", f.col("time_since_test_start").alias("t_tx"))

cmpl = ev.filter(f.col("event")=="offer completed") \
          .select("cliente_id", f.col("time_since_test_start").alias("t_cmplt"))

# ============= 1) Conversão VÁLIDA (mesma regra do target) =============
# (transaction == completed no mesmo t) + oferta elegível + desempate por view e recência
pairs = (cmpl.alias("c")
         .join(tx.alias("x"),
               on=[f.col("c.cliente_id")==f.col("x.cliente_id"),
                   f.col("c.t_cmplt")==f.col("x.t_tx")],
               how="inner")
         .select(f.col("c.cliente_id").alias("cliente_id"),
                 f.col("c.t_cmplt").alias("t")))

elig = (pairs.alias("p")
        .join(recv.alias("o"),
              on=[f.col("p.cliente_id")==f.col("o.cliente_id"),
                  f.col("p.t").between(f.col("o.start_t"), f.col("o.end_t"))],
              how="inner")
        .select("p.cliente_id", f.col("p.t").alias("t"),
                f.col("o.ID").alias("offer_instance_id"),
                "o.offer_id","o.start_t"))

elig_v = (elig.alias("e")
          .join(view.alias("v"),
                on=[f.col("e.cliente_id")==f.col("v.cliente_id"),
                    f.col("e.offer_id")==f.col("v.offer_id"),
                    f.col("v.t_view") <= f.col("e.t")],
                how="left")
          .select(
              f.col("e.cliente_id").alias("cliente_id"),
              f.col("e.t").alias("t"),
              f.col("e.offer_instance_id").alias("offer_instance_id"),
              f.col("e.offer_id").alias("offer_id"),
              f.col("e.start_t").alias("start_t"),
              f.col("v.t_view").alias("t_view")
          )
          .withColumn("has_view", f.when(f.col("t_view").isNotNull(), 1).otherwise(0))
          .withColumn("dview", f.when(f.col("t_view").isNotNull(), f.col("t")-f.col("t_view")).otherwise(f.lit(None))))

w_rank = Window.partitionBy("cliente_id","t").orderBy(
    f.col("has_view").desc(),
    f.col("dview").asc_nulls_last(),
    f.col("start_t").desc(),
    f.col("offer_id").asc(),
    f.col("offer_instance_id").asc()
)

ranked = (elig_v
          .withColumn("rank_elig", f.row_number().over(w_rank))
          .withColumn("score_tuple", f.array(
              f.col("has_view").cast("double"),
              f.coalesce(f.col("dview"), f.lit(1e9)).cast("double"),
              f.col("start_t").cast("double")
          )))

dups = (ranked.groupBy("cliente_id","t","score_tuple")
        .agg(f.count("*").alias("n_same_score")))

conv_valid = (ranked.alias("l")
              .join(dups.alias("r"),
                    on=[f.col("l.cliente_id")==f.col("r.cliente_id"),
                        f.col("l.t")==f.col("r.t"),
                        f.col("l.score_tuple")==f.col("r.score_tuple")],
                    how="left")
              .filter(f.col("l.rank_elig")==1)
              .filter((f.col("r.n_same_score").isNull()) | (f.col("r.n_same_score")==1))
              .select("l.cliente_id",
                      f.col("l.offer_id").alias("offer_id"),
                      f.col("l.offer_instance_id").alias("offer_instance_id"),
                      f.col("l.t").alias("t_conv_valid"))
)

# ============= 2) Instâncias com flags de view/conv por instância =============
# a) view por instância (se houve alguma view)
view_by_instance = (view.groupBy("ID").agg(f.min("t_view").alias("first_t_view")))
# b) conversão válida por instância
conv_by_instance = (conv_valid.groupBy("offer_instance_id")
                    .agg(f.min("t_conv_valid").alias("first_t_conv_valid"))
                    .withColumnRenamed("offer_instance_id","ID"))

instances = (recv
    .select("ID","cliente_id","offer_id","offer_type","discount_value","min_value","duration",
            "web","email","mobile","social","start_t")
    .join(view_by_instance, on="ID", how="left")
    .join(conv_by_instance, on="ID", how="left")
    .withColumn("view_flag", f.when(f.col("first_t_view").isNotNull(), 1).otherwise(0))
    .withColumn("conv_flag", f.when(f.col("first_t_conv_valid").isNotNull(), 1).otherwise(0))
)

# Buckets (simples e estáveis para o mês)
instances = (instances
    .withColumn("bucket_duracao",
                f.when(f.col("duration")<=5, f.lit("short"))
                 .when((f.col("duration")>=6) & (f.col("duration")<=7), f.lit("mid"))
                 .otherwise(f.lit("long")))
    .withColumn("bucket_desconto",
                f.when(f.col("discount_value")<=5, f.lit("low"))
                 .when(f.col("discount_value")<=10, f.lit("mid"))
                 .otherwise(f.lit("high")))
    .withColumn("bucket_minimo",
                f.when(f.col("min_value")<=5, f.lit("low"))
                 .when(f.col("min_value")<=10, f.lit("mid"))
                 .otherwise(f.lit("high")))
)

# ============= 3) Cumulativos por cliente e chave (só HISTÓRICO anterior) =============
# Janela por cliente ordenada por start_t, até a linha anterior
w_prev = Window.partitionBy("cliente_id").orderBy("start_t").rowsBetween(Window.unboundedPreceding, -1)
w_prev_type   = Window.partitionBy("cliente_id","offer_type").orderBy("start_t").rowsBetween(Window.unboundedPreceding, -1)
w_prev_offer  = Window.partitionBy("cliente_id","offer_id").orderBy("start_t").rowsBetween(Window.unboundedPreceding, -1)
w_prev_dur    = Window.partitionBy("cliente_id","bucket_duracao").orderBy("start_t").rowsBetween(Window.unboundedPreceding, -1)
w_prev_disc   = Window.partitionBy("cliente_id","bucket_desconto").orderBy("start_t").rowsBetween(Window.unboundedPreceding, -1)
w_prev_min    = Window.partitionBy("cliente_id","bucket_minimo").orderBy("start_t").rowsBetween(Window.unboundedPreceding, -1)

inst_hist = (instances
    # por TIPO
    .withColumn("rcv_tipo_prev",   f.count("*").over(w_prev_type))
    .withColumn("view_tipo_prev",  f.sum("view_flag").over(w_prev_type))
    .withColumn("conv_tipo_prev",  f.sum("conv_flag").over(w_prev_type))
    # por OFFER_ID
    .withColumn("rcv_offer_prev",  f.count("*").over(w_prev_offer))
    .withColumn("view_offer_prev", f.sum("view_flag").over(w_prev_offer))
    .withColumn("conv_offer_prev", f.sum("conv_flag").over(w_prev_offer))
    # por CANAL (condicional ao canal ativo)
    .withColumn("rcv_web_prev",    f.sum(f.when(f.col("web")==1, 1).otherwise(0)).over(w_prev))
    .withColumn("conv_web_prev",   f.sum(f.when((f.col("web")==1) & (f.col("conv_flag")==1), 1).otherwise(0)).over(w_prev))
    .withColumn("rcv_email_prev",  f.sum(f.when(f.col("email")==1, 1).otherwise(0)).over(w_prev))
    .withColumn("conv_email_prev", f.sum(f.when((f.col("email")==1) & (f.col("conv_flag")==1), 1).otherwise(0)).over(w_prev))
    .withColumn("rcv_mobile_prev", f.sum(f.when(f.col("mobile")==1, 1).otherwise(0)).over(w_prev))
    .withColumn("conv_mobile_prev",f.sum(f.when((f.col("mobile")==1) & (f.col("conv_flag")==1), 1).otherwise(0)).over(w_prev))
    .withColumn("rcv_social_prev", f.sum(f.when(f.col("social")==1, 1).otherwise(0)).over(w_prev))
    .withColumn("conv_social_prev",f.sum(f.when((f.col("social")==1) & (f.col("conv_flag")==1), 1).otherwise(0)).over(w_prev))
    # por BUCKETS
    .withColumn("rcv_dur_prev",    f.count("*").over(w_prev_dur))
    .withColumn("conv_dur_prev",   f.sum("conv_flag").over(w_prev_dur))
    .withColumn("rcv_disc_prev",   f.count("*").over(w_prev_disc))
    .withColumn("conv_disc_prev",  f.sum("conv_flag").over(w_prev_disc))
    .withColumn("rcv_min_prev",    f.count("*").over(w_prev_min))
    .withColumn("conv_min_prev",   f.sum("conv_flag").over(w_prev_min))
)

# ============= 4) Métricas finais na ÁNCORA (linha offer received atual) =============
# Afinidade de canais: média das taxas de conversão por canal entre os canais ATIVOS desta oferta
def safe_rate(num, den):
    return f.when(den>0, num/den).otherwise(f.lit(0.0))

book_cliente_oferta = (inst_hist
    # taxas por tipo/offer/buckets (histórico anterior)
    .withColumn("taxa_visualizacao_historica_tipo",  safe_rate(f.col("view_tipo_prev"),  f.col("rcv_tipo_prev")))
    .withColumn("taxa_conversao_historica_tipo",     safe_rate(f.col("conv_tipo_prev"),  f.col("rcv_tipo_prev")))
    .withColumn("taxa_visualizacao_historica_oferta",safe_rate(f.col("view_offer_prev"), f.col("rcv_offer_prev")))
    .withColumn("taxa_conversao_historica_oferta",   safe_rate(f.col("conv_offer_prev"), f.col("rcv_offer_prev")))
    .withColumn("taxa_conversao_historica_canal_web",   safe_rate(f.col("conv_web_prev"),   f.col("rcv_web_prev")))
    .withColumn("taxa_conversao_historica_canal_email", safe_rate(f.col("conv_email_prev"), f.col("rcv_email_prev")))
    .withColumn("taxa_conversao_historica_canal_mobile",safe_rate(f.col("conv_mobile_prev"),f.col("rcv_mobile_prev")))
    .withColumn("taxa_conversao_historica_canal_social",safe_rate(f.col("conv_social_prev"),f.col("rcv_social_prev")))
    .withColumn("taxa_conversao_historica_bucket_duracao", safe_rate(f.col("conv_dur_prev"),  f.col("rcv_dur_prev")))
    .withColumn("taxa_conversao_historica_bucket_desconto",safe_rate(f.col("conv_disc_prev"), f.col("rcv_disc_prev")))
    .withColumn("taxa_conversao_historica_bucket_minimo",  safe_rate(f.col("conv_min_prev"),  f.col("rcv_min_prev")))
    # afinidade canais da oferta atual (média das taxas dos canais ativos)
    .withColumn("soma_taxas_canais",
                f.coalesce(f.when(f.col("web")==1,    f.col("taxa_conversao_historica_canal_web")), f.lit(0.0)) +
                f.coalesce(f.when(f.col("email")==1,  f.col("taxa_conversao_historica_canal_email")), f.lit(0.0)) +
                f.coalesce(f.when(f.col("mobile")==1, f.col("taxa_conversao_historica_canal_mobile")), f.lit(0.0)) +
                f.coalesce(f.when(f.col("social")==1, f.col("taxa_conversao_historica_canal_social")), f.lit(0.0)))
    .withColumn("qtd_canais_ativos", f.col("web")+f.col("email")+f.col("mobile")+f.col("social"))
    .withColumn("afinidade_canais_conv",
                f.when(f.col("qtd_canais_ativos")>0, f.col("soma_taxas_canais")/f.col("qtd_canais_ativos")).otherwise(f.lit(0.0)))
    # métricas de repetição da MESMA oferta_id
    .withColumn("n_instancias_anteriores_mesma_oferta", f.col("rcv_offer_prev"))
    .withColumn("n_conversoes_validas_anteriores_mesma", f.col("conv_offer_prev"))
    # recências específicas da mesma offer_id
    .withColumn("last_start_same_offer",
                f.max("start_t").over(Window.partitionBy("cliente_id","offer_id").orderBy("start_t").rowsBetween(Window.unboundedPreceding, -1)))
    .withColumn("dias_desde_ultima_oferta_mesma", f.col("start_t") - f.col("last_start_same_offer"))
    # última conversão válida da mesma offer_id (via conv_valid)
    .join(conv_valid.select("cliente_id","offer_id", f.col("t_conv_valid")).alias("cv"),
          on=["cliente_id","offer_id"], how="left")
    .withColumn("last_t_conv_same_offer",
                f.max("t_conv_valid").over(Window.partitionBy("cliente_id","offer_id","start_t")
                                           .orderBy("t_conv_valid").rowsBetween(Window.unboundedPreceding, -1)))
    .withColumn("dias_desde_ultima_conversao_valida_mesma", f.col("start_t") - f.col("last_t_conv_same_offer"))
    # saída final apenas na âncora (offer received atual)
    .select(
        "ID","cliente_id","offer_id",
        f.col("start_t").alias("time_since_test_start"),
        "taxa_visualizacao_historica_tipo","taxa_conversao_historica_tipo",
        "taxa_visualizacao_historica_oferta","taxa_conversao_historica_oferta",
        "taxa_conversao_historica_canal_web","taxa_conversao_historica_canal_email",
        "taxa_conversao_historica_canal_mobile","taxa_conversao_historica_canal_social",
        "afinidade_canais_conv",
        "taxa_conversao_historica_bucket_duracao",
        "taxa_conversao_historica_bucket_desconto",
        "taxa_conversao_historica_bucket_minimo",
        "n_instancias_anteriores_mesma_oferta",
        "n_conversoes_validas_anteriores_mesma",
        "dias_desde_ultima_oferta_mesma",
        "dias_desde_ultima_conversao_valida_mesma"
    )
)
book_cliente_oferta = filtra_mais_recente(book_cliente_oferta)


In [0]:
analisar_dataframe(filtra_mais_recente(book_cliente_oferta),"ID")

In [0]:
book_cliente_oferta.limit(5).display()

In [0]:
book_cliente_oferta.toPandas().to_csv("../data/processed/book_cliente_oferta.csv", sep=",",header=True,index=False)

####3.8 - Book Full - Síntese

In [0]:
book_perfil = spark.createDataFrame(pd.read_csv("../data/processed/book_perfil.csv", sep=","))
book_compra = spark.createDataFrame(pd.read_csv("../data/processed/book_compra.csv", sep=","))
book_interacao = spark.createDataFrame(pd.read_csv("../data/processed/book_interacao.csv", sep=","))
book_oferta = spark.createDataFrame(pd.read_csv("../data/processed/book_oferta.csv", sep=","))
book_recencia_freq = spark.createDataFrame(pd.read_csv("../data/processed/book_recencia_freq.csv", sep=","))
book_comunicacao = spark.createDataFrame(pd.read_csv("../data/processed/book_comunicacao.csv", sep=","))
book_cliente_oferta = spark.createDataFrame(pd.read_csv("../data/processed/book_cliente_oferta.csv", sep=","))

In [0]:

book_full = book_perfil\
    .join(book_compra.drop("cliente_id","offer_id","time_since_test_start"), "ID", "left")\
    .join(book_interacao.drop("cliente_id","offer_id","time_since_test_start"), "ID", "left")\
    .join(book_oferta.drop("cliente_id","offer_id","time_since_test_start"), "ID", "left")\
    .join(book_recencia_freq.drop("cliente_id","offer_id","time_since_test_start"), "ID", "left")\
    .join(book_comunicacao.drop("cliente_id","offer_id","time_since_test_start"), "ID", "left")\
    .join(book_cliente_oferta.drop("cliente_id","offer_id","time_since_test_start"), "ID", "left")


In [0]:
#3 IDs ID,cliente_id e offer_id
#time_since_test_start - representa a linha do tempo
#59 variáveis explicativas
analisar_dataframe(filtra_mais_recente(book_full),"ID")

In [0]:
book_full.limit(5).display()

In [0]:
book_full.toPandas().to_csv("../data/processed/book_full.csv", sep=",",header=True,index=False)

###4 - Modeling Dataset

In [0]:
#Vamos juntar nosso publico + Book + target

In [0]:
publico_com_target = spark.createDataFrame(pd.read_csv("../data/processed/publico_com_target.csv", sep=",")).drop("cliente_id","offer_id")
book_full = spark.createDataFrame(pd.read_csv("../data/processed/book_full.csv", sep=","))

#Base de modelagem
base_modelagem = publico_com_target.join(book_full, "ID", "left")


In [0]:
analisar_dataframe(publico_com_target,"ID")

In [0]:
analisar_dataframe(book_full,"ID")

In [0]:
analisar_dataframe(base_modelagem,"ID")

In [0]:
base_modelagem.toPandas().to_csv("../data/processed/base_modelagem.csv", sep=",",header=True,index=False)