In [0]:
%run "/Workspace/ETL_Arquitetura_Medalhão/00.Configuração/configuração"

Catalogo: workspace
Schemas: bronze silver gold


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import to_date, regexp_replace, col, greatest

In [0]:
ipca = spark.table(f"{CATALOG}.{BRONZE}.ipca")
boi = spark.table(f"{CATALOG}.{BRONZE}.boi_gordo")

print("Schema IPCA:")
ipca.printSchema()

print("Schema BOI:")
boi.printSchema()

Schema IPCA:
root
 |-- data: date (nullable = true)
 |-- IPCA: double (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)

Schema BOI:
root
 |-- Indicador_Boi_Gordo: string (nullable = true)
 |-- Valor: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



In [0]:
display(boi)

Indicador_Boi_Gordo,Valor,data_ingestao
Nota,"por arroba, descontado o Prazo de Pagamento pela taxa CDI/CETIP",2025-12-27T15:34:11.368Z
Fonte,Cepea,2025-12-27T15:34:11.368Z
Data,Valor,2025-12-27T15:34:11.368Z
01/2025,32495,2025-12-27T15:34:11.368Z
02/2025,31921,2025-12-27T15:34:11.368Z
03/2025,31247,2025-12-27T15:34:11.368Z
04/2025,32396,2025-12-27T15:34:11.368Z
05/2025,30815,2025-12-27T15:34:11.368Z
06/2025,31351,2025-12-27T15:34:11.368Z
07/2025,29997,2025-12-27T15:34:11.368Z


In [0]:
from pyspark.sql.functions import col

boi_filtrado = boi.filter(
    col("Indicador_Boi_Gordo").rlike("^[0-9]{2}/2025$")
)
                                                           
display(boi_filtrado)

Indicador_Boi_Gordo,Valor,data_ingestao
01/2025,32495,2025-12-27T15:34:11.368Z
02/2025,31921,2025-12-27T15:34:11.368Z
03/2025,31247,2025-12-27T15:34:11.368Z
04/2025,32396,2025-12-27T15:34:11.368Z
05/2025,30815,2025-12-27T15:34:11.368Z
06/2025,31351,2025-12-27T15:34:11.368Z
07/2025,29997,2025-12-27T15:34:11.368Z
08/2025,30725,2025-12-27T15:34:11.368Z
09/2025,30787,2025-12-27T15:34:11.368Z
10/2025,31051,2025-12-27T15:34:11.368Z


In [0]:
boi_step1 = boi_filtrado.withColumnRenamed("Indicador_Boi_Gordo","data")

boi_step1.printSchema()
display(boi_step1)

root
 |-- data: string (nullable = true)
 |-- Valor: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



data,Valor,data_ingestao
01/2025,32495,2025-12-27T15:34:11.368Z
02/2025,31921,2025-12-27T15:34:11.368Z
03/2025,31247,2025-12-27T15:34:11.368Z
04/2025,32396,2025-12-27T15:34:11.368Z
05/2025,30815,2025-12-27T15:34:11.368Z
06/2025,31351,2025-12-27T15:34:11.368Z
07/2025,29997,2025-12-27T15:34:11.368Z
08/2025,30725,2025-12-27T15:34:11.368Z
09/2025,30787,2025-12-27T15:34:11.368Z
10/2025,31051,2025-12-27T15:34:11.368Z


In [0]:
from pyspark.sql.functions import col, concat, lit, to_date

boi_step2 = (boi_step1.withColumn("data", to_date(concat(lit("2025-"), col("data").substr(1,2), lit("-01")))))

boi_step2.printSchema()
display(boi_step2)

root
 |-- data: date (nullable = true)
 |-- Valor: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



data,Valor,data_ingestao
2025-01-01,32495,2025-12-27T15:34:11.368Z
2025-02-01,31921,2025-12-27T15:34:11.368Z
2025-03-01,31247,2025-12-27T15:34:11.368Z
2025-04-01,32396,2025-12-27T15:34:11.368Z
2025-05-01,30815,2025-12-27T15:34:11.368Z
2025-06-01,31351,2025-12-27T15:34:11.368Z
2025-07-01,29997,2025-12-27T15:34:11.368Z
2025-08-01,30725,2025-12-27T15:34:11.368Z
2025-09-01,30787,2025-12-27T15:34:11.368Z
2025-10-01,31051,2025-12-27T15:34:11.368Z


In [0]:
from pyspark.sql.functions import regexp_replace, col

boi_step3 = boi_step2.withColumn("valor", regexp_replace(col("valor"), ",", ".").cast("double"))

boi_step3.printSchema()
display(boi_step3)

root
 |-- data: date (nullable = true)
 |-- valor: double (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



data,valor,data_ingestao
2025-01-01,324.95,2025-12-27T15:34:11.368Z
2025-02-01,319.21,2025-12-27T15:34:11.368Z
2025-03-01,312.47,2025-12-27T15:34:11.368Z
2025-04-01,323.96,2025-12-27T15:34:11.368Z
2025-05-01,308.15,2025-12-27T15:34:11.368Z
2025-06-01,313.51,2025-12-27T15:34:11.368Z
2025-07-01,299.97,2025-12-27T15:34:11.368Z
2025-08-01,307.25,2025-12-27T15:34:11.368Z
2025-09-01,307.87,2025-12-27T15:34:11.368Z
2025-10-01,310.51,2025-12-27T15:34:11.368Z


In [0]:
boi_step3 = boi_step3.withColumnRenamed("valor", "boi_gordo")

In [0]:
from pyspark.sql.functions import col

boi_step3.filter(col("valor").isNull()).count()


0

In [0]:
from pyspark.sql.functions import avg

boi_step3.select(avg("boi_gordo").alias("media_valor")).display()


media_valor
314.1991666666666


In [0]:
display(ipca)

print("Schema IPCA:")
ipca.printSchema()

data,IPCA,data_ingestao
2025-01-01,0.16,2025-12-27T18:24:24.338Z
2025-02-01,1.31,2025-12-27T18:24:24.338Z
2025-03-01,0.56,2025-12-27T18:24:24.338Z
2025-04-01,0.43,2025-12-27T18:24:24.338Z
2025-05-01,0.26,2025-12-27T18:24:24.338Z
2025-06-01,0.24,2025-12-27T18:24:24.338Z
2025-07-01,0.26,2025-12-27T18:24:24.338Z
2025-08-01,-0.11,2025-12-27T18:24:24.338Z
2025-09-01,0.48,2025-12-27T18:24:24.338Z
2025-10-01,0.09,2025-12-27T18:24:24.338Z


Schema IPCA:
root
 |-- data: date (nullable = true)
 |-- IPCA: double (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



In [0]:
df_join = ipca.join(boi_step3, on ="data", how = "inner").select("data", "IPCA", "boi_gordo")

display(df_join)

data,IPCA,boi_gordo
2025-01-01,0.16,324.95
2025-02-01,1.31,319.21
2025-03-01,0.56,312.47
2025-04-01,0.43,323.96
2025-05-01,0.26,308.15
2025-06-01,0.24,313.51
2025-07-01,0.26,299.97
2025-08-01,-0.11,307.25
2025-09-01,0.48,307.87
2025-10-01,0.09,310.51


In [0]:
ip = ipca.alias("ip")
bo = boi_step3.alias("bo")

df_join = ip.join(bo, col("ip.data") == col("bo.data"), "inner").select(
    col("ip.data").alias("data"),
    col("ip.IPCA").alias("IPCA"),
    col("bo.boi_gordo").alias("boi_gordo"),
    col("ip.data_ingestao").alias("data_ingestao")

)

display(df_join)

data,IPCA,boi_gordo,data_ingestao
2025-01-01,0.16,324.95,2025-12-27T18:24:24.338Z
2025-02-01,1.31,319.21,2025-12-27T18:24:24.338Z
2025-03-01,0.56,312.47,2025-12-27T18:24:24.338Z
2025-04-01,0.43,323.96,2025-12-27T18:24:24.338Z
2025-05-01,0.26,308.15,2025-12-27T18:24:24.338Z
2025-06-01,0.24,313.51,2025-12-27T18:24:24.338Z
2025-07-01,0.26,299.97,2025-12-27T18:24:24.338Z
2025-08-01,-0.11,307.25,2025-12-27T18:24:24.338Z
2025-09-01,0.48,307.87,2025-12-27T18:24:24.338Z
2025-10-01,0.09,310.51,2025-12-27T18:24:24.338Z


In [0]:
df_join.printSchema()

root
 |-- data: date (nullable = true)
 |-- IPCA: double (nullable = true)
 |-- boi_gordo: double (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



In [0]:
df_join.write.format("delta").mode("overwrite").option("overwriteSchma", "true").saveAsTable(f"{CATALOG}.{SILVER}.economia_consolidada")