#Camada Bronze 

## Ingestão bruta dos dados do kaggle:
https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce

In [0]:
# Definição do caminho do schema Bronze
catalogo = "medalhao"
bronze_db_name = "bronze"

In [0]:
spark.sql("DROP TABLE IF EXISTS bronze.ft_consumidores")
spark.sql("DROP TABLE IF EXISTS bronze.ft_geolocalizacao")
spark.sql("DROP TABLE IF EXISTS bronze.ft_itens_pedidos")
spark.sql("DROP TABLE IF EXISTS bronze.ft_pagamentos_pedidos")
spark.sql("DROP TABLE IF EXISTS bronze.ft_avaliacoes_pedidos")
spark.sql("DROP TABLE IF EXISTS bronze.ft_pedidos")
spark.sql("DROP TABLE IF EXISTS bronze.ft_produtos")
spark.sql("DROP TABLE IF EXISTS bronze.ft_vendedores")
spark.sql("DROP TABLE IF EXISTS bronze.dm_categoria_produtos_traducao")



DataFrame[]

In [0]:
from pyspark.sql import functions as F

def ingest_csv(nome_arquivo, nome_tabela):
   
    try:
        table_name = nome_tabela
        landing_path = f"/Volumes/medalhao/default/landing/{nome_arquivo}"

        # Leitura do arquivo CSV
        df = spark.read.csv(landing_path, header=True, inferSchema=True)

        # Validação: arquivo vazio
        if df.count() == 0:
            raise ValueError(f"O arquivo {nome_arquivo} está vazio ou não pôde ser lido.")

        # Adiciona timestamp de ingestão
        df_with_metadata = df.withColumn("ingestion_timestamp", F.current_timestamp())

        # Escrita no formato Delta
        df_with_metadata.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{bronze_db_name}.{table_name}")

        print(f"Tabela {nome_tabela} criada com sucesso!\n")

    except Exception as e:
        print(f"Erro ao processar {nome_tabela}: {str(e)}")

In [0]:
# Ingestão de todas as tabelas da camada Bronze
ingest_csv("olist_customers_dataset.csv", "ft_consumidores")
ingest_csv("olist_geolocation_dataset.csv", "ft_geolocalizacao")
ingest_csv("olist_order_items_dataset.csv", "ft_itens_pedidos")
ingest_csv("olist_order_payments_dataset.csv", "ft_pagamentos_pedidos")
ingest_csv("olist_order_reviews_dataset.csv", "ft_avaliacoes_pedidos")
ingest_csv("olist_orders_dataset.csv", "ft_pedidos")
ingest_csv("olist_products_dataset.csv", "ft_produtos")
ingest_csv("olist_sellers_dataset.csv", "ft_vendedores")
ingest_csv("product_category_name_translation.csv", "dm_categoria_produtos_traducao")

print("Ingestão da camada Bronze concluída!")


Tabela ft_consumidores criada com sucesso!

Tabela ft_geolocalizacao criada com sucesso!

Tabela ft_itens_pedidos criada com sucesso!

Tabela ft_pagamentos_pedidos criada com sucesso!

Tabela ft_avaliacoes_pedidos criada com sucesso!

Tabela ft_pedidos criada com sucesso!

Tabela ft_produtos criada com sucesso!

Tabela ft_vendedores criada com sucesso!

Tabela dm_categoria_produtos_traducao criada com sucesso!

Ingestão da camada Bronze concluída!


## Ingestão da cotação do banco central

In [0]:
# Definindo data de início e fim do período
dbutils.widgets.text("data_inicio", '09-03-2016')
dbutils.widgets.text("data_fim", '10-16-2018')


In [0]:
from datetime import datetime
import requests
import json

# Definindo período 
dbutils.widgets.text("data_inicio", "09-03-2016")
dbutils.widgets.text("data_fim", "10-16-2018")

data_inicio = dbutils.widgets.get("data_inicio")
data_fim = dbutils.widgets.get("data_fim")

# Construção da URL (tudo em uma única chamada)
url = (
    "https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/"
    "CotacaoDolarPeriodo(dataInicial=@dataInicial,dataFinalCotacao=@dataFinalCotacao)"
    f"?@dataInicial='{data_inicio}'"
    f"&@dataFinalCotacao='{data_fim}'"
    "&$top=10000"  # aumenta o limite padrão de 100 registros
    "&$format=json"
)

response = requests.get(url)
data = response.json()

# Converte para DataFrame Spark
df_cotacao = spark.createDataFrame(data["value"])
display(df_cotacao.limit(5))


cotacaoCompra,cotacaoVenda,dataHoraCotacao
3.2715,3.2721,2016-09-05 13:09:55.659
3.2446,3.2452,2016-09-06 13:02:39.984
3.1928,3.1934,2016-09-08 13:03:53.968
3.2632,3.2638,2016-09-09 13:14:00.885
3.2848,3.2854,2016-09-12 13:08:01.541


In [0]:
from pyspark.sql import functions as F

# Adicionar coluna de ingestão
df_cotacao = (
    df_cotacao
    .withColumn("ingestion_timestamp", F.current_timestamp())
)

# Salvar tabela das cotações no formato Delta
df_cotacao.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{bronze_db_name}.dm_cotacao_dolar")

print("Tabela bronze.dm_cotacao_dolar criada com sucesso!")


Tabela bronze.dm_cotacao_dolar criada com sucesso!
