In [0]:
from pyspark.sql.functions import current_timestamp, input_file_name

# 1. Configura√ß√£o dos caminhos
# Baseado na sua imagem, este √© o local dos seus CSVs
caminho_origem = "/Volumes/workspace/default/arquivos_pipe/"

# Vamos listar os arquivos para garantir que o Databricks est√° vendo tudo
arquivos = dbutils.fs.ls(caminho_origem)

display(arquivos)

In [0]:
from pyspark.sql.functions import current_timestamp, col

# 1. Configura√ß√£o dos caminhos
caminho_origem = "/Volumes/workspace/default/arquivos_pipe/"

# 2. Fun√ß√£o de Ingest√£o Gen√©rica (Corrigida para Unity Catalog)
def ingerir_tabela(nome_arquivo, nome_tabela):
    print(f"Iniciando ingest√£o de: {nome_tabela}...")
    
    # Lendo o CSV
    df = (spark.read
          .format("csv")
          .option("header", "true")
          .option("inferSchema", "true")
          .option("delimiter", ",")
          .load(f"{caminho_origem}{nome_arquivo}")
    )
    
    # CORRE√á√ÉO AQUI: Usando _metadata.file_path ao inv√©s de input_file_name()
    # O asterisco "*" garante que todas as colunas originais sejam mantidas
    df_bronze = df.select(
        "*", 
        col("_metadata.file_path").alias("arquivo_origem")
    ).withColumn("data_ingestao", current_timestamp())
    
    # Salvando como tabela Delta
    df_bronze.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(f"default.bronze_{nome_tabela}")
        
    print(f"Tabela bronze_{nome_tabela} salva com sucesso!")

# 3. Executando para todos os arquivos
tabelas_para_criar = {
    "olist_customers_dataset.csv": "clientes",
    "olist_geolocation_dataset.csv": "geolocalizacao",
    "olist_order_items_dataset.csv": "itens_pedido",
    "olist_order_payments_dataset.csv": "pagamentos",
    "olist_order_reviews_dataset.csv": "avaliacoes",
    "olist_orders_dataset.csv": "pedidos",
    "olist_products_dataset.csv": "produtos",
    "olist_sellers_dataset.csv": "vendedores",
    "product_category_name_translation.csv": "traducao_categorias"
}

# Loop de execu√ß√£o
for arquivo_csv, nome_tabela in tabelas_para_criar.items():
    ingerir_tabela(arquivo_csv, nome_tabela)

In [0]:
from pyspark.sql.functions import col, to_timestamp

# --- 1. Criando a SILVER_PEDIDOS ---
print("Criando tabela: SILVER_PEDIDOS...")
df_pedidos = spark.read.table("default.bronze_pedidos")

df_silver_pedidos = df_pedidos.select(
    col("order_id").alias("id_pedido"),
    col("customer_id").alias("id_cliente"),
    col("order_status").alias("status_pedido"),
    # Convertendo Strings para Timestamps
    to_timestamp(col("order_purchase_timestamp")).alias("data_compra"),
    to_timestamp(col("order_approved_at")).alias("data_aprovacao"),
    to_timestamp(col("order_delivered_carrier_date")).alias("data_envio_transportadora"),
    to_timestamp(col("order_delivered_customer_date")).alias("data_entrega_cliente"),
    to_timestamp(col("order_estimated_delivery_date")).alias("data_estimada_entrega")
)

df_silver_pedidos.write.format("delta").mode("overwrite").saveAsTable("default.silver_pedidos")
print("‚úÖ Tabela silver_pedidos criada.")

# --- 2. Criando a SILVER_ITENS_PEDIDO ---
print("Criando tabela: SILVER_ITENS_PEDIDO...")
df_itens = spark.read.table("default.bronze_itens_pedido")

df_silver_itens = df_itens.select(
    col("order_id").alias("id_pedido"),
    col("order_item_id").alias("id_item_pedido"),
    col("product_id").alias("id_produto"),
    col("seller_id").alias("id_vendedor"),
    # Convertendo pre√ßos para n√∫meros decimais
    col("price").cast("double").alias("vl_preco"),
    col("freight_value").cast("double").alias("vl_frete")
)

df_silver_itens.write.format("delta").mode("overwrite").saveAsTable("default.silver_itens_pedido")
print("‚úÖ Tabela silver_itens_pedido criada.")

In [0]:
from pyspark.sql.functions import col, to_date, sum, round

# 1. Leitura das tabelas Silver (Que agora existem!)
df_pedidos = spark.read.table("default.silver_pedidos")
df_itens = spark.read.table("default.silver_itens_pedido")

# 2. O JOIN
df_join = df_pedidos.join(df_itens, on="id_pedido", how="inner")

# 3. Agrega√ß√£o (Vendas por Dia)
df_gold = df_join.select(
    to_date(col("data_compra")).alias("data_venda"),
    col("vl_preco"),
    col("vl_frete")
).groupBy("data_venda") \
 .agg(
     round(sum("vl_preco"), 2).alias("total_vendas"),
     round(sum("vl_frete"), 2).alias("total_frete")
 ).orderBy("data_venda")

# 4. Salvando a Gold
df_gold.write.format("delta").mode("overwrite").saveAsTable("default.gold_vendas_diarias")

display(df_gold)

In [0]:
from pyspark.sql.functions import col

print("Iniciando tratamento: SILVER_PRODUTOS...")

# 1. Leitura da Bronze
df_produtos = spark.read.table("default.bronze_produtos")

# 2. Sele√ß√£o e Renomea√ß√£o (O cora√ß√£o da camada Silver)
df_silver_produtos = df_produtos.select(
    col("product_id").alias("id_produto"),
    col("product_category_name").alias("categoria")
)

# 3. Escrita na Silver
df_silver_produtos.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.silver_produtos")

print("‚úÖ Tabela silver_produtos criada com sucesso!")
display(df_silver_produtos.limit(5))

In [0]:
from pyspark.sql.functions import col, to_date, sum, round, desc

print("üìä Gerando Relat√≥rio: Campe√µes da Black Friday 2017...")

# 1. Leitura das 3 Tabelas Silver
df_pedidos = spark.read.table("default.silver_pedidos")
df_itens = spark.read.table("default.silver_itens_pedido")
df_produtos = spark.read.table("default.silver_produtos")

# 2. O Join Triplo (Cascata)
# Primeiro juntamos Pedidos com Itens...
df_join_1 = df_pedidos.join(df_itens, on="id_pedido", how="inner")

# ...e o resultado juntamos com Produtos
df_completo = df_join_1.join(df_produtos, on="id_produto", how="inner")

# 3. A Pergunta de Neg√≥cio: "O que mais vendeu em 24/11/2017?"
df_relatorio = df_completo \
    .filter(to_date(col("data_compra")) == "2017-11-24") \
    .groupBy("categoria") \
    .agg(
        round(sum("vl_preco"), 2).alias("receita_total"),
        sum("vl_preco").alias("check_ordenacao") # Auxiliar
    ) \
    .orderBy(col("check_ordenacao").desc()) \
    .drop("check_ordenacao") # Removemos a auxiliar para ficar limpo

# 4. Exibindo o Top 10
display(df_relatorio.limit(10))