In [None]:
# Config para CaseSensitive
spark.conf.set('spark.sql.caseSensitive', True)

In [None]:
# Libraries
from pyspark.sql.functions import col, expr, round as spark_round
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
import requests, json
from datetime import datetime
from notebookutils import mssparkutils

In [None]:
%%sql
CREATE TABLE Flag (
    Flag STRING,
    FlagID INTEGER
) USING DELTA;

INSERT INTO Flag (Flag, FlagID)
VALUES 
    ('🡥 acima IPCA', 0),
    ('🡧 abaixo IPCA', 1),
    ('Primeira compra', 2);

In [None]:
%%sql
CREATE TABLE Medidas (
    Value INTEGER
) USING DELTA;

### Get IPCA 

In [None]:
# Datas
start_date = "01/01/2020"
end_date = datetime.today().strftime("%d/%m/%Y")

# API
url = "https://api.bcb.gov.br/dados/serie/bcdata.sgs.433/dados"
params = {
    "formato": "json",
    "dataInicial": start_date,
    "dataFinal": end_date
}

# Requisição
response = requests.get(url, params=params)
json_data = response.json()

# Serializar JSON para string
json_str = json.dumps(json_data, ensure_ascii=False, indent=2)

# Caminho do Lakehouse
full_path = f"Files/Raw/ipca.json"

# Escrever no Lakehouse
mssparkutils.fs.put(full_path, json_str, overwrite=True)


In [None]:
# Lê o JSON
df = spark.read.option("multiline", "true").json("Files/Raw/ipca.json")
display(df)

In [None]:
# Transformando
df_transformado = df \
    .withColumn("Data", expr("to_date(data, 'dd/MM/yyyy')").cast("date")) \
    .withColumn("VarMensal", expr("valor * 0.01").cast("double")) \
    .withColumn("VarMensal", spark_round(col("VarMensal"), 4)) \
    .select("Data", "VarMensal")

# Exibindo
df_transformado.show(10)

# Salvando em delta
df_transformado.write.mode("overwrite").saveAsTable("IPCA")

### Get Item

In [None]:
url = "https://raw.githubusercontent.com/alisonpezzott/preco-versus-ipca/refs/heads/main/dados/item.csv"

response = requests.get(url)
with open(f"/lakehouse/default/Files/Raw/item.csv", 'wb') as f:
    f.write(response.content)

schema = StructType() \
    .add("ItemID", IntegerType(), True) \
    .add("Item", StringType(), True) \

df = spark.read.format("csv") \
    .option("header","true") \
    .schema(schema) \
    .load("Files/Raw/item.csv")

df.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("Item")

### Get Compras

In [None]:
url = "https://raw.githubusercontent.com/alisonpezzott/preco-versus-ipca/refs/heads/main/dados/compras.csv"

response = requests.get(url)
with open(f"/lakehouse/default/Files/Raw/compras.csv", 'wb') as f:
    f.write(response.content)

schema = StructType() \
    .add("PedidoID", IntegerType(), True) \
    .add("DataPedido", DateType(), True) \
    .add("ItemID", IntegerType(), True) \
    .add("QtdPedido", IntegerType(), True) \
    .add("PrecoUnitario", DoubleType(), True)

df = spark.read.format("csv") \
    .option("header","true") \
    .schema(schema) \
    .load("Files/Raw/compras.csv")

df.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("Compras")