In [0]:
%load_ext autoreload
%autoreload 2


In [0]:
# carregamento de bibliotecas
import sys
import os
import requests
from datetime import date, timedelta
from io import StringIO
import pandas as pd
root = os.path.abspath(os.path.join(os.getcwd(), "../../"))

if root not in sys.path:
    sys.path.append(root)


from src.features.utils import *




In [0]:
# VARIÁVEIS DE DESTINO
TABELA_INCREMENTAL_RAW = "`api-forecasting`.bronze.dolar_ptax_incremental_raw"

In [0]:
# construção da tabela histórica de cotação do dolar ptax
API_BC_URL = "https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/"

# intervalo de data
DATA_INICIAL = (date.today() - timedelta(days=1)).strftime("%m-%d-%Y")
DATA_FINAL = (date.today() - timedelta(days=1)).strftime("%m-%d-%Y")

endpoint = (
    "CotacaoDolarPeriodo("
    "dataInicial=@dataInicial,"
    "dataFinalCotacao=@dataFinalCotacao)?"
    f"@dataInicial='{DATA_INICIAL}'&"
    f"@dataFinalCotacao='{DATA_FINAL}'&"
    "$format=json"
)

url = API_BC_URL + endpoint

response = requests.get(url, timeout=60)

print("Status:", response.status_code)

if response.status_code != 200:
    print(response.text)
    raise Exception(f"Erro {response.status_code} na API do BC")

data = response.json()["value"]
df = pd.DataFrame(data)

df.head()

In [0]:
# sanitização da tabela para persistência
df = df.rename(columns={'dataHoraCotacao': 'Data'})
df['Data'] = pd.to_datetime(df['Data'], format='mixed')
df.info()


In [0]:
# conversão do dataframe no formato spark
spark = SparkSession.builder.appName("Ingestion").getOrCreate()
df = spark.createDataFrame(df)

In [0]:
# persistência da tabela na camada bronze
salvar_tabela_delta_spark(df=df, 
                          mode='overwrite',
                          merge_schema=False,
                          partitionby=['Data'], 
                          path=TABELA_INCREMENTAL_RAW)

In [0]:
%sql
-- TABELA CATALOGADA NA CAMADA BRONZE
SELECT * 
FROM `api-forecasting`.bronze.precos_dolar_ptax_incremental_raw
LIMIT 10