### Importação das bibliotecas utilizadas

In [0]:
import requests
from datetime import datetime
from dateutil.relativedelta import relativedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from pyspark.sql.types import DateType, DoubleType


#### Criação do Database "_db_analytics_", local onde será alocado os datasets 

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS db_analytics;

#### Extração dos dados de IPCA da API do Banco Central do Brasil.

In [0]:
# Criação da Variável com a data de início da série histórica, filtrando os últimos 20 anos.
start_date = ((datetime.today() - relativedelta(years=20)).replace(day=1, month=1)).strftime("%d/%m/%Y")

response = requests.get(
        f'https://api.bcb.gov.br/dados/serie/bcdata.sgs.10844/dados?formato=json&dataInicial={start_date}'
    )

# Validando o status code da API
assert response.status_code == 200, f'Error: {response.status_code}, Response: {response.text}'

ipca_data = response.json()

# Criação do Dataframe Spark
spark = SparkSession.builder.getOrCreate()
df_ipca = spark.createDataFrame(ipca_data)

### Tratamento e visualização da qualidade dos dados 

In [0]:
# Ajustando o data type dos Dados
df_ipca = (
    df_ipca
    .withColumn("valor", df_ipca["valor"].cast(DoubleType()))
    .withColumn("data", to_date(df_ipca["data"], "dd/MM/yyyy").cast(DateType()))
)

# Visualizando o dataframe e as estatístcas dos valores
df_ipca.show()
df_ipca.describe().show()
print(f'Registros: {len(df_ipca.collect())}')

In [0]:
# Salvando os dados em uma tabela delta no Database "db_analytics.dataset_ipca"
df_ipca.write.format("delta").mode("overwrite").saveAsTable("db_analytics.dataset_ipca")

#### Visualização da tabela no Database "db_analytics"

In [0]:
%sql
SELECT * FROM db_analytics.dataset_ipca;