In [None]:
!pip install pyspark boto3

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, avg, max, min, count
import os

In [None]:
from dotenv import load_dotenv
load_dotenv()

In [None]:
spark = SparkSession.builder \
    .appName("TesouroDirectoETL") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .getOrCreate()

print("Spark Session criada com sucesso!")

In [None]:
bronze_bucket = "s3a://bronze-tesouro-lakehouse/raw-data/kafka/"
gold_bucket = "s3a://gold-tesouro-lakehouse/"

ipca_path = bronze_bucket + "postgres-dadostesouroipca/"
pre_path = bronze_bucket + "postgres-dadostesouropre/"

In [None]:
df_ipca_bronze = spark.read.json(ipca_path)
df_pre_bronze = spark.read.json(pre_path)

print(f"IPCA Bronze: {df_ipca_bronze.count()} registros")
print(f"Pre-fixados Bronze: {df_pre_bronze.count()} registros")

df_ipca_bronze.printSchema()
df_ipca_bronze.show(5)

In [None]:
df_ipca_silver = df_ipca_bronze \
    .select(
        col("CompraManha").cast("double"),
        col("VendaManha").cast("double"),
        col("PUCompraManha").cast("double"),
        col("PUVendaManha").cast("double"),
        col("PUBaseManha").cast("double"),
        col("Data_Vencimento").cast("date"),
        col("Data_Base").cast("date"),
        col("Tipo"),
        col("dt_update").cast("timestamp")
    ) \
    .filter(col("CompraManha").isNotNull()) \
    .filter(col("VendaManha").isNotNull())

df_pre_silver = df_pre_bronze \
    .select(
        col("CompraManha").cast("double"),
        col("VendaManha").cast("double"),
        col("PUCompraManha").cast("double"),
        col("PUVendaManha").cast("double"),
        col("PUBaseManha").cast("double"),
        col("Data_Vencimento").cast("date"),
        col("Data_Base").cast("date"),
        col("Tipo"),
        col("dt_update").cast("timestamp")
    ) \
    .filter(col("CompraManha").isNotNull()) \
    .filter(col("VendaManha").isNotNull())

print(f"IPCA Silver: {df_ipca_silver.count()} registros")
print(f"Pre-fixados Silver: {df_pre_silver.count()} registros")

In [None]:
df_ipca_silver \
    .write \
    .mode("overwrite") \
    .partitionBy("Tipo") \
    .parquet(gold_bucket + "silver/ipca")

df_pre_silver \
    .write \
    .mode("overwrite") \
    .partitionBy("Tipo") \
    .parquet(gold_bucket + "silver/pre-fixados")

print("Camada Silver gravada com sucesso!")

In [None]:
df_ipca_gold = df_ipca_silver \
    .withColumn("ano_vencimento", year(col("Data_Vencimento"))) \
    .withColumn("mes_base", month(col("Data_Base"))) \
    .groupBy("ano_vencimento", "mes_base", "Tipo") \
    .agg(
        avg("CompraManha").alias("taxa_compra_media"),
        avg("VendaManha").alias("taxa_venda_media"),
        min("CompraManha").alias("taxa_compra_minima"),
        max("CompraManha").alias("taxa_compra_maxima"),
        avg("PUCompraManha").alias("pu_compra_medio"),
        count("*").alias("total_registros")
    )

df_pre_gold = df_pre_silver \
    .withColumn("ano_vencimento", year(col("Data_Vencimento"))) \
    .withColumn("mes_base", month(col("Data_Base"))) \
    .groupBy("ano_vencimento", "mes_base", "Tipo") \
    .agg(
        avg("CompraManha").alias("taxa_compra_media"),
        avg("VendaManha").alias("taxa_venda_media"),
        min("CompraManha").alias("taxa_compra_minima"),
        max("CompraManha").alias("taxa_compra_maxima"),
        avg("PUCompraManha").alias("pu_compra_medio"),
        count("*").alias("total_registros")
    )

print("Agregacoes Gold calculadas!")
df_ipca_gold.show(10)
df_pre_gold.show(10)

In [None]:
df_ipca_gold \
    .write \
    .mode("overwrite") \
    .partitionBy("ano_vencimento") \
    .parquet(gold_bucket + "gold/ipca_agregado")

df_pre_gold \
    .write \
    .mode("overwrite") \
    .partitionBy("ano_vencimento") \
    .parquet(gold_bucket + "gold/pre_agregado")

print("Camada Gold gravada com sucesso!")

In [None]:
spark.stop()
print("ETL Spark finalizado com sucesso!")