# MRO - MODELO DE RECOMENDAÇÃO DE OFERTA
## DATA PROCESSING



## Bibliotecas

In [8]:
# Manipulação de dados
import pandas as pd
import numpy as np
import os


# Datas
import calendar
from datetime import datetime

## Funções

In [10]:
# Função para ajustar datas de registro no perfil ---
def ajustar_data(data_str):
    ano = int(data_str[:4])
    dia = int(data_str[6:8])
    mes_atual = datetime.now().month

    # Corrigir dia para o último dia válido do mês atual
    _, ultimo_dia = calendar.monthrange(ano, mes_atual)
    dia_corrigido = min(dia, ultimo_dia)

    return datetime(year=ano, month=mes_atual, day=dia_corrigido)

# Função para calcular diferença em meses entre duas datas
def meses_de_diferenca(inicio, fim):
    return (fim.year - inicio.year) * 12 + (fim.month - inicio.month)

## Engenharia de Dados

In [11]:
# --- 1. Definindo caminho base para os arquivos JSON ---
caminho_entrada = os.path.join('..', 'data', 'raw')
caminho_saida = os.path.join('..', 'data', 'processed')

# --- 2. Carga de arquivos JSON em DataFrames ---
offers = pd.read_json(os.path.join(caminho_entrada, 'offers.json'))
profile = pd.read_json(os.path.join(caminho_entrada, 'profile.json'))
transactions = pd.read_json(os.path.join(caminho_entrada, 'transactions.json'))

# --- 3. Normalizando nomes das colunas para consistência entre DataFrames ---
offers.rename(columns={'id': 'offer_id'}, inplace=True)
profile.rename(columns={'id': 'account_id'}, inplace=True)

# --- 4. Expandir coluna 'value' no DataFrame transactions em múltiplas colunas ---
value_df = transactions['value'].apply(pd.Series)

# --- 5. Processar transactions ---
# 5.1 Substituir coluna 'value' pelas colunas expandidas
transactions_v2 = pd.concat([transactions.drop(columns=['value']), value_df], axis=1)

# 5.2 Corrigir inconsistência: quando 'offer_id' estiver vazio, usar 'offer id'
transactions_v2['offer_id'] = transactions_v2['offer id'].combine_first(transactions_v2['offer_id'])

# 5.3 Remover a coluna antiga 'offer id'
transactions_v2.drop(columns=['offer id'], inplace=True)

# --- 6. Processar canais (channels) no DataFrame offers ---
# 6.1 Converter string para lista, se necessário
offers['channels'] = offers['channels'].apply(
    lambda x: x.split(',') if isinstance(x, str) else x
)

# 6.2 Explodir a lista em várias linhas
offers_expanded = offers.explode('channels')

# 6.3 Criar variáveis dummies (colunas binárias) para cada canal
channels_dummies = pd.get_dummies(offers_expanded['channels'])

# 6.4 Agrupar por oferta e somar as dummies
channels_summary = channels_dummies.groupby(offers_expanded.index).sum().clip(upper=1)

# 6.5 Anexar variáveis dummy ao DataFrame original
offers = offers.join(channels_summary)

# --- 7. Calcular tempo de casa dos usuários (profile) ---
# 7.1 Garantir que 'registered_on' seja string
profile['registered_on'] = profile['registered_on'].astype(str)

# 7.2 Obter data atual
mes_atual = datetime.now().month
ano_atual = datetime.now().year
hoje = datetime.now()

# 7.3 Aplicar função para ajustar data de registro
profile['data_ajustada'] = profile['registered_on'].apply(ajustar_data)

# 7.4 Calcular tempo de casa em meses
profile['tempo_de_casa'] = profile['data_ajustada'].apply(lambda x: meses_de_diferenca(x, hoje))

# 7.5 Visualização parcial
#print(profile[['registered_on', 'data_ajustada', 'tempo_de_casa']].head())

# --- 8. Selecionar eventos relevantes em transactions ---
# 8.1 Transações (compras feitas)
transacoes = transactions_v2.query("event == 'transaction'")[['account_id', 'time_since_test_start', 'amount']].copy()

# 8.2 Ofertas recebidas
ofertas_recebidas = transactions_v2.query("event == 'offer received'")[['account_id', 'offer_id', 'time_since_test_start']].copy()

# 8.3 Ofertas completadas
ofertas_completas = transactions_v2.query("event == 'offer completed'")[['account_id', 'offer_id', 'time_since_test_start']].copy()

# --- 9. Relacionar transações com ofertas recebidas e verificar se foram utilizadas ---
# 9.1 Filtrar apenas clientes que receberam ofertas
clientes_com_oferta = ofertas_recebidas['account_id'].unique()

# 9.2 Filtrar transações apenas desses clientes
transacoes_com_oferta = transacoes[transacoes['account_id'].isin(clientes_com_oferta)].copy()

# 9.3 Adicionar duração da oferta recebida
ofertas_recebidas = ofertas_recebidas.merge(
    offers[['offer_id', 'duration']],
    on='offer_id',
    how='left'
)

# 9.4 Calcular validade da oferta
ofertas_recebidas.rename(columns={'time_since_test_start': 'offer_start'}, inplace=True)
ofertas_recebidas['offer_end'] = ofertas_recebidas['offer_start'] + ofertas_recebidas['duration']

# 9.5 Juntar transações com as ofertas recebidas por cliente
trans_com_oferta = transacoes_com_oferta.merge(
    ofertas_recebidas,
    on='account_id',
    how='left'
)

# 9.6 Manter apenas transações dentro do período de validade da oferta
trans_com_oferta = trans_com_oferta[
    (trans_com_oferta['time_since_test_start'] >= trans_com_oferta['offer_start']) &
    (trans_com_oferta['time_since_test_start'] <= trans_com_oferta['offer_end'])
]

# 9.7 Renomear coluna de conclusão da oferta
ofertas_completas.rename(columns={'time_since_test_start': 'completion_time'}, inplace=True)

# 9.8 Verificar se a transação está associada a uma oferta completada
trans_com_oferta = trans_com_oferta.merge(
    ofertas_completas,
    on=['account_id', 'offer_id'],
    how='left'
)

# 9.9 Criar coluna indicando se a oferta foi utilizada
trans_com_oferta['oferta_utilizada'] = (
    trans_com_oferta['completion_time'].notna() &
    (trans_com_oferta['time_since_test_start'] <= trans_com_oferta['completion_time'])
)

# 9.10 Preencher valores nulos com False (não completadas)
trans_com_oferta['oferta_utilizada'] = trans_com_oferta['oferta_utilizada'].fillna(False)

# 9.11 Selecionar colunas finais
transacoes_final = trans_com_oferta[[
    'account_id',
    'time_since_test_start',
    'amount',
    'offer_id',
    'oferta_utilizada'
]].copy()

# --- 10. Cruzar com dados do perfil dos usuários ---
base_final_filtrada = transacoes_final.merge(
    profile,
    on='account_id',
    how='left'  # Use 'inner' para manter apenas quem tem perfil
)

# --- 11. Exportar para json ---
#base_final_filtrada.to_csv(caminho_saida + 'data_processed.csv', index=False, sep=';')
base_final_filtrada.to_json(os.path.join(caminho_saida, 'data_processed.json'), orient='records', lines=True)
transactions_v2.to_json(os.path.join(caminho_saida, 'transactions_v2.json'), orient='records', lines=True)
ofertas_recebidas.to_json(os.path.join(caminho_saida, 'ofertas_recebidas.json'), orient='records', lines=True)
# --- 12. Visualização final (opcional) ---
# print(base_final_filtrada.head())

  registered_on data_ajustada  tempo_de_casa
0      20170212    2017-09-12             96
1      20170715    2017-09-15             96
2      20180712    2018-09-12             84
3      20170509    2017-09-09             96
4      20170804    2017-09-04             96


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, expr, when, lit, udf, concat_ws
from pyspark.sql.types import IntegerType, DateType, BooleanType
from datetime import datetime
import calendar

spark = SparkSession.builder.appName("ConversaoPandasParaPySpark").getOrCreate()

# --- 1. Caminhos dos arquivos ---
caminho_entrada = '../data/raw'
caminho_saida = '../data/processed'

# --- 2. Leitura dos JSON em DataFrames PySpark ---
offers = spark.read.json(f"{caminho_entrada}/offers.json")
profile = spark.read.json(f"{caminho_entrada}/profile.json")
transactions = spark.read.json(f"{caminho_entrada}/transactions.json")

# --- 3. Renomear colunas ---
offers = offers.withColumnRenamed("id", "offer_id")
profile = profile.withColumnRenamed("id", "account_id")

# --- 4. Expandir coluna 'value' em múltiplas colunas ---
# 'value' é um struct? Supondo que sim:
value_cols = transactions.select("value.*").columns
for c in value_cols:
    transactions = transactions.withColumn(c, col(f"value.{c}"))
transactions = transactions.drop("value")

# --- 5. Corrigir inconsistência entre 'offer_id' e 'offer id' ---
transactions = transactions.withColumn(
    "offer_id",
    when(col("offer_id").isNull(), col("offer id")).otherwise(col("offer_id"))
).drop("offer id")

# --- 6. Processar canais (channels) no offers ---
# 6.1 Converter string para array, se necessário
offers = offers.withColumn(
    "channels",
    when(col("channels").isNull(), lit([]))
    .when(col("channels").startswith("["), expr("channels"))  # já array json?
    .otherwise(split(col("channels"), ","))
)

# 6.2 Explodir lista de canais
offers_expanded = offers.select(
    "*", explode("channels").alias("channel")
)

# 6.3 Criar dummies (colunas binárias) para canais
channels_list = offers_expanded.select("channel").distinct().rdd.flatMap(lambda x: x).collect()

for ch in channels_list:
    offers_expanded = offers_expanded.withColumn(
        ch,
        when(col("channel") == ch, lit(1)).otherwise(lit(0))
    )

# 6.4 Agrupar por offer_id e somar as dummies, limitando a 1
agg_exprs = [expr(f"min({ch}) as {ch}") for ch in channels_list]
channels_summary = offers_expanded.groupBy("offer_id").agg(*agg_exprs)

# 6.5 Juntar dummies ao DataFrame original offers
offers = offers.drop("channels").join(channels_summary, "offer_id", "left")

# --- 7. Ajustar datas e calcular tempo de casa ---

# Função para ajustar data (UDF)
def ajustar_data_udf(data_str):
    if not data_str or len(data_str) < 8:
        return None
    ano = int(data_str[:4])
    dia = int(data_str[6:8])
    mes_atual = datetime.now().month
    _, ultimo_dia = calendar.monthrange(ano, mes_atual)
    dia_corrigido = min(dia, ultimo_dia)
    return datetime(year=ano, month=mes_atual, day=dia_corrigido)

ajustar_data = udf(ajustar_data_udf, DateType())

# Função para calcular meses de diferença (UDF)
def meses_de_diferenca_udf(inicio, fim=datetime.now()):
    if inicio is None:
        return None
    return (fim.year - inicio.year) * 12 + (fim.month - inicio.month)

meses_de_diferenca = udf(meses_de_diferenca_udf, IntegerType())

profile = profile.withColumn("registered_on_str", col("registered_on").cast("string"))
profile = profile.withColumn("data_ajustada", ajustar_data(col("registered_on_str")))

hoje = datetime.now()
profile = profile.withColumn("tempo_de_casa", meses_de_diferenca(col("data_ajustada")))

# --- 8. Selecionar eventos relevantes em transactions ---

transacoes = transactions.filter(col("event") == "transaction") \
    .select("account_id", "time_since_test_start", "amount")

ofertas_recebidas = transactions.filter(col("event") == "offer received") \
    .select("account_id", "offer_id", "time_since_test_start")

ofertas_completas = transactions.filter(col("event") == "offer completed") \
    .select("account_id", "offer_id", "time_since_test_start")

# --- 9. Relacionar transações com ofertas recebidas ---

clientes_com_oferta = ofertas_recebidas.select("account_id").distinct()

transacoes_com_oferta = transacoes.join(clientes_com_oferta, "account_id")

ofertas_recebidas = ofertas_recebidas.join(
    offers.select("offer_id", "duration"),
    "offer_id",
    "left"
).withColumnRenamed("time_since_test_start", "offer_start")

ofertas_recebidas = ofertas_recebidas.withColumn(
    "offer_end",
    col("offer_start") + col("duration")
)

trans_com_oferta = transacoes_com_oferta.join(
    ofertas_recebidas,
    "account_id",
    "left"
).filter(
    (col("time_since_test_start") >= col("offer_start")) &
    (col("time_since_test_start") <= col("offer_end"))
)

ofertas_completas = ofertas_completas.withColumnRenamed("time_since_test_start", "completion_time")

trans_com_oferta = trans_com_oferta.join(
    ofertas_completas,
    ["account_id", "offer_id"],
    "left"
)

# --- 9.9 Criar coluna oferta_utilizada ---
trans_com_oferta = trans_com_oferta.withColumn(
    "oferta_utilizada",
    (col("completion_time").isNotNull()) & (col("time_since_test_start") <= col("completion_time"))
).fillna({"oferta_utilizada": False})

# --- 9.11 Selecionar colunas finais ---
transacoes_final = trans_com_oferta.select(
    "account_id", "time_since_test_start", "amount", "offer_id", "oferta_utilizada"
)

# --- 10. Cruzar com dados do perfil ---
base_final_filtrada = transacoes_final.join(profile, "account_id", "left")

# --- 11. Exportar para JSON ---
base_final_filtrada.write.json(f"{caminho_saida}/data_processed.json", mode="overwrite", lineSep="\n")

# --- 12. (Opcional) Mostrar resultado ---
base_final_filtrada.show(5)