In [0]:
%sql
create schema if not exists ecommerce.silver

In [0]:
# Importando as funções necessárias
from pyspark.sql.functions import col, sum, avg, count, min, max, first

In [0]:
# Lendo tabelas na camada silver
clientes_df = spark.read.table("ecommerce.silver.clientes")
pedidos_df = spark.read.table("ecommerce.silver.pedidos")
itens_pedidos_df = spark.read.table("ecommerce.silver.itens_pedido")
produtos_df = (
    spark.read.table("ecommerce.silver.produtos")
    .withColumnRenamed("ativo", "produto_ativo")
    .withColumnRenamed("data_criacao", "produto_data_criacao")
)
categorias_df = (
    spark.read.table("ecommerce.silver.categorias")
    .withColumnRenamed("ativo", "categoria_ativo")
    .withColumnRenamed("data_criacao", "categoria_data_criacao")
)
pagamentos_df = spark.read.table("ecommerce.silver.pagamentos")
avaliacoes_df = spark.read.table("ecommerce.silver.avaliacoes")

In [0]:
# Agrega pagamentos por pedido
pagamentos_agg = (
    pagamentos_df
    .groupBy("pedido_id")
    .agg(
        sum("valor").alias("total_pago"),
        first("metodo").alias("metodo_pagamento"),
        first("status").alias("status_pagamento")
    )
)

# Agrega avaliações por produto
avaliacoes_agg = (
    avaliacoes_df
    .groupBy("produto_id")
    .agg(
        avg("nota").alias("media_avaliacao"),
        count("*").alias("qtd_avaliacoes")
    )
)

# Métricas por cliente
metricas_cliente = (
    pedidos_df
    .groupBy("cliente_id")
    .agg(
        sum("valor_liquido").alias("lifetime_value"),
        count("pedido_id").alias("qtd_pedidos"),
        avg("valor_total").alias("ticket_medio"),
        min("data_pedido").alias("primeiro_pedido"),
        max("data_pedido").alias("ultimo_pedido")
    )
)

In [0]:
# Construção da OBT
obt_df = (
    itens_pedidos_df.alias("i")
    .join(pedidos_df.alias("p"),       "pedido_id",   "left")
    .join(clientes_df.alias("c"),      "cliente_id",  "left")
    .join(produtos_df.alias("pr"),     "produto_id",  "left")
    .join(categorias_df.alias("cat"),  "categoria_id","left")
    .join(pagamentos_agg.alias("pg"),  "pedido_id",   "left")
    .join(avaliacoes_agg.alias("av"),  "produto_id",  "left")
    .join(metricas_cliente.alias("mc"),"cliente_id",  "left")
    .select(
        # Chaves
        col("i.item_pedido_id"),
        col("p.pedido_id"),
        col("c.cliente_id"),
        col("pr.produto_id"),

        # Pedido
        col("p.data_pedido"),
        col("p.status").alias("status_pedido"),
        col("p.valor_total"),
        col("p.valor_liquido"),
        col("p.frete"),
        col("p.desconto"),
        col("p.data_entrega_prevista"),
        col("p.data_entrega_realizada"),
        col("p.ano"),
        col("p.trimestre"),
        col("p.ticket_classificacao"),

        # Cliente
        col("c.nome").alias("nome_cliente"),
        col("c.cidade"),
        col("c.estado"),
        col("c.regiao"),
        col("c.data_cadastro"),

        # Produto
        col("pr.nome").alias("nome_produto"),
        col("pr.marca"),
        col("pr.preco"),
        col("pr.faixa_preco"),
        col("pr.produto_ativo"),

        # Categoria
        col("cat.nome").alias("nome_categoria"),
        col("cat.categoria_ativo"),

        # Item
        col("i.quantidade"),
        col("i.preco_unitario"),
        (col("i.quantidade") * col("i.preco_unitario")).alias("valor_item"),

        # Pagamento
        col("pg.total_pago"),
        col("pg.metodo_pagamento"),
        col("pg.status_pagamento"),

        # Avaliação
        col("av.media_avaliacao"),
        col("av.qtd_avaliacoes"),

        # Métricas Cliente
        col("mc.lifetime_value"),
        col("mc.qtd_pedidos"),
        col("mc.ticket_medio"),
        col("mc.primeiro_pedido"),
        col("mc.ultimo_pedido")
    )
)

In [0]:
(
    obt_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("ecommerce.silver.obt_ecommerce")
)

print("OBT salva com sucesso!")
display(obt_df.limit(5))