In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("Atividade2PraticaSpark").getOrCreate()



In [0]:
def add_tabela_bronze(nome_arquivo,nome_tabela):
    try:
        landing_path = f"/Volumes/medalhao/landing_zone/landing/{nome_arquivo}"

        df = spark.read.csv(landing_path, header=True, inferSchema=True)

        if df.count() == 0:
            raise ValueError(f"O arquivo {nome_arquivo} está vazio ou não pôde ser lido.")
        
        df_timestamp = df.withColumn("ingestion_timestamp", F.current_timestamp())

        df_timestamp.write.format("delta").mode("overwrite").saveAsTable(f"medalhao.bronze.{nome_tabela}")

        print(f"Tabela bronze.{nome_tabela} criada com sucesso!\n")

    except Exception as e:
        print(f"Erro ao criar bronze.{nome_tabela}: {str(e)}")

tabelas=[
    ['olist_customers_dataset.csv', 'ft_consumidores'],
    ['olist_geolocation_dataset.csv', 'ft_geolocalizacao'],
    ['olist_order_items_dataset.csv', 'ft_itens_pedidos'],
    ['olist_order_payments_dataset.csv', 'ft_pagamentos_pedidos'],
    ['olist_order_reviews_dataset.csv', 'ft_avaliacoes_pedidos'],
    ['olist_orders_dataset.csv', 'ft_pedidos'],
    ['olist_products_dataset.csv', 'ft_produtos'],
    ['olist_sellers_dataset.csv', 'ft_vendedores'],
    ['product_category_name_translation.csv', 'dm_categoria_produtos_traducao']
]
for t in tabelas: add_tabela_bronze(*t)


Tabela bronze.ft_consumidores criada com sucesso!

Tabela bronze.ft_geolocalizacao criada com sucesso!

Tabela bronze.ft_itens_pedidos criada com sucesso!

Tabela bronze.ft_pagamentos_pedidos criada com sucesso!

Tabela bronze.ft_avaliacoes_pedidos criada com sucesso!

Tabela bronze.ft_pedidos criada com sucesso!

Tabela bronze.ft_produtos criada com sucesso!

Tabela bronze.ft_vendedores criada com sucesso!

Tabela bronze.dm_categoria_produtos_traducao criada com sucesso!



In [0]:
import requests

df_pedidos = spark.table("medalhao.bronze.ft_pedidos")
min_max_timestamp = df_pedidos.select(
    F.min('order_purchase_timestamp').alias('min'),
    F.max('order_purchase_timestamp').alias('max')
)

data_inicio_formatada = min_max_timestamp.first()['min'].strftime("%m-%d-%Y")
data_fim_formatada = min_max_timestamp.first()['max'].strftime("%m-%d-%Y")

url = f"https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/CotacaoDolarPeriodo(dataInicial=@dataInicial,dataFinalCotacao=@dataFinalCotacao)?@dataInicial='{data_inicio_formatada}'&@dataFinalCotacao='{data_fim_formatada}'&$select=dataHoraCotacao,cotacaoCompra&$format=json"

req = requests.get(url)
cotacoes = req.json()['value']
cotacoes_df = spark.createDataFrame(cotacoes).withColumn("ingestion_timestamp", F.current_timestamp())
cotacoes_df.write.format("delta").mode("overwrite").saveAsTable(f"medalhao.bronze.dm_cotacao_dolar")

Data Inicio: 09-04-2016 | Data Fim: 10-17-2018
<class 'str'> <class 'str'>
