In [0]:
# importações necessárias
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession
from datetime import date

# parâmetros da URL
codigo_serie = 10844  
data_inicial = "01/01/2005"
data_final = date.today().strftime("%d/%m/%Y")

# construir URL para acessar API
url = f"https://api.bcb.gov.br/dados/serie/bcdata.sgs.{codigo_serie}/dados?formato=csv&dataInicial={data_inicial}&dataFinal={data_final}"

# requisição
response = requests.get(url)

In [0]:
# verificar se a requisição funcionou
if response.status_code == 200:
    # ler dados CSV com Pandas
    df_pandas = pd.read_csv(StringIO(response.text), sep=";", encoding="latin1")
    
    # converter "valor" para o formato numérico
    df_pandas['valor'] = df_pandas['valor'].str.replace(',', '.')
    df_pandas.valor = pd.to_numeric(df_pandas.valor, errors='coerce')

    # converter Pandas para Spark
    df_spark = spark.createDataFrame(df_pandas)

    # exibir dados
    df_spark.show()

    # salvar tabela temporária no Databricks
    df_spark.createOrReplaceTempView("dados_ipca")

else:
    print(f"erro ao acessar a API: {response.status_code}")

+----------+-----+
|      data|valor|
+----------+-----+
|01/01/2005| 0.46|
|01/02/2005| 1.82|
|01/03/2005| 0.51|
|01/04/2005| 0.35|
|01/05/2005| 0.28|
|01/06/2005|  0.5|
|01/07/2005| 0.59|
|01/08/2005| 0.44|
|01/09/2005| 0.44|
|01/10/2005| 0.56|
|01/11/2005| 0.41|
|01/12/2005| 0.22|
|01/01/2006| 0.68|
|01/02/2006| 1.54|
|01/03/2006|  0.4|
|01/04/2006| 0.33|
|01/05/2006| 0.33|
|01/06/2006| 0.17|
|01/07/2006| 0.36|
|01/08/2006| 0.36|
+----------+-----+
only showing top 20 rows


In [0]:
# criar base a partir do cálculo IPCA acumulado jan-dez por ano
df_spark_acumulado = spark.sql("""
SELECT 
    EXTRACT(YEAR FROM TO_DATE(data, 'dd/MM/yyyy')) AS ano, 
    ROUND(EXP(SUM(LOG(1 + (valor / 100)))) - 1, 4) * 100 AS ipca_acumulado_jan_dez
FROM dados_ipca
GROUP BY all
ORDER BY ano ASC
""")

# exibir dados
df_spark_acumulado.show()

+----+----------------------+
| ano|ipca_acumulado_jan_dez|
+----+----------------------+
|2005|                  6.77|
|2006|     5.489999999999999|
|2007|                  5.17|
|2008|                  6.38|
|2009|     6.370000000000001|
|2010|                  7.62|
|2011|                  9.01|
|2012|                  8.75|
|2013|                  8.75|
|2014|                  8.33|
|2015|                  8.09|
|2016|    6.4799999999999995|
|2017|                  4.53|
|2018|                  3.35|
|2019|                  3.52|
|2020|                  1.73|
|2021|                  4.75|
|2022|                  7.57|
|2023|                  6.22|
|2024|                  4.74|
+----+----------------------+
only showing top 20 rows


In [0]:
# criar base a partir do cálculo médio do IPCA por mes
df_spark_acumulado_mes = spark.sql("""
SELECT 
    EXTRACT(MONTH FROM TO_DATE(data, 'dd/MM/yyyy')) AS mes, 
    avg(valor) AS ipca_acumulado_mes
FROM dados_ipca
GROUP BY all
ORDER BY mes ASC
""")

# exibir dados
df_spark_acumulado_mes.show()

+---+-------------------+
|mes| ipca_acumulado_mes|
+---+-------------------+
|  1| 0.5390476190476191|
|  2|  1.235238095238095|
|  3|              0.372|
|  4|0.41550000000000004|
|  5|0.26549999999999996|
|  6|0.43149999999999994|
|  7|              0.449|
|  8|             0.3125|
|  9|0.42750000000000005|
| 10|0.46749999999999997|
| 11|             0.4335|
| 12|             0.6465|
+---+-------------------+



In [0]:
# salvar dados Delta Lake no Databricks
df_spark.write.format("delta").mode("overwrite").saveAsTable("dados_ipca")
df_spark_acumulado.write.format("delta").mode("overwrite").saveAsTable("ipca_acumulado")
df_spark_acumulado_mes.write.format("delta").mode("overwrite").saveAsTable("ipca_acumulado_mes")

print("Dados armazenados como tabelas Delta no catálogo do Databricks")

Dados armazenados como tabelas Delta no catálogo do Databricks


In [0]:
# verificar se as tabelas foram criadas no Databricks
display(spark.sql("SELECT * FROM default.dados_ipca LIMIT 30"))
display(spark.sql("SELECT * FROM default.ipca_acumulado ORDER BY ano ASC LIMIT 30"))
display(spark.sql("SELECT * FROM default.ipca_acumulado_mes ORDER BY mes ASC LIMIT 30"))

data,valor
01/01/2005,0.46
01/02/2005,1.82
01/03/2005,0.51
01/04/2005,0.35
01/05/2005,0.28
01/06/2005,0.5
01/07/2005,0.59
01/08/2005,0.44
01/09/2005,0.44
01/10/2005,0.56


ano,ipca_acumulado_jan_dez
2005,6.77
2006,5.489999999999999
2007,5.17
2008,6.38
2009,6.370000000000001
2010,7.62
2011,9.01
2012,8.75
2013,8.75
2014,8.33


mes,ipca_acumulado_mes
1,0.5390476190476191
2,1.235238095238095
3,0.372
4,0.4155
5,0.2654999999999999
6,0.4314999999999999
7,0.449
8,0.3125
9,0.4275
10,0.4674999999999999
