In [0]:
import sys
import os

# 1. Configura Path para importar 'src'
repo_root = os.getcwd()
repo_root = '/'.join(repo_root.split('/')[:-1])
if repo_root not in sys.path:
    sys.path.append(repo_root)

In [0]:
%pip install -r ../requirements.txt

In [0]:
from pyspark.sql import functions as F
from datetime import timedelta
from src.utility.environment import Environment
from src.infrastructure.data_manager import DataManager
from src.domain.feature_engineering import RFMTransformer

In [0]:
manager = DataManager(spark)

# 1. Leitura (Camada Bronze)
print("Reading Bronze Layer...")
path_prefix = Environment.DATABRICKS_CATALOG

df_orders = manager.read_csv(f"{path_prefix}/olist_orders_dataset.csv")
df_payments = manager.read_csv(f"{path_prefix}/olist_order_payments_dataset.csv")
df_customer = manager.read_csv(f"{path_prefix}/olist_customers_dataset.csv")
df_order_items = manager.read_csv(f"{path_prefix}/olist_order_items_dataset.csv")
df_order_reviews = manager.read_csv(f"{path_prefix}/olist_order_reviews_dataset.csv")



In [0]:
# 2.1 Limpando dados brutos e agregando
print("Processing Bronze -> Silver...")
df_orders = df_orders.dropna(subset=['order_purchase_timestamp', 'order_id'])

df_payments = df_payments.groupBy('order_id').agg(
    F.sum("payment_value").alias("total_order_payment"),
    F.max("payment_installments").alias("max_installments"),
    F.count("payment_sequential").alias("payment_method_count"),
    F.first("payment_type").alias("primary_payment_type")
)

df_order_items = df_order_items.groupBy('order_id').agg(
    F.sum("price").alias("total_price"),
    F.sum("freight_value").alias("total_freight"),
    F.count("product_id").alias("total_items"),
    F.first("shipping_limit_date").alias("shipping_limit_date")
)


df_order_reviews = df_order_reviews.groupBy("order_id").agg(
    F.avg("review_score").alias("Avg_review_score"),
    F.min("review_score").alias("Min_review_score"),
    F.count("review_id").alias("Total_Review_Number")
)

In [0]:
# 2.2 Unificando os dados

df_master = df_customer.join(df_orders, on='customer_id', how='inner')
df_master = df_master.join(df_payments, on='order_id', how='left')
df_master = df_master.join(df_order_items, on='order_id', how='left')
df_master = df_master.join(df_order_reviews, on='order_id', how='left')


In [0]:
# 3.1: Setup do filtro temporal

max_date = df_orders.select(F.max("order_purchase_timestamp")).collect()[0][0]
split_date = max_date - timedelta(days=Environment.churn_window_days)
start_date = split_date - timedelta(days=365)

print(f"Dataset End: {max_date}")
print(f"Split Point (Prediction Start): {split_date}")
print(f"Feature History Start: {start_date}")

# 3.2: Aplicando o Filtro
df_master = df_master.filter(
    (F.col("order_purchase_timestamp") >= start_date) & 
    (F.col("order_purchase_timestamp") < max_date)
)

In [0]:
# 4. Transformação (Domínio)
print("Processing Silver -> Gold (RFM)...")
transformer = RFMTransformer(df_master)
df_gold = transformer.transform(split_date)

display(df_gold)

In [0]:
# 5. Escrita (Feature Store)
print("Saving to Feature Store...")
manager.save_delta(df_gold, f"{Environment.feature_store_path}/churn")