# ETL Spark – Bronze → Silver → Gold (S3)

In [21]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, avg, count
from dotenv import load_dotenv

load_dotenv()  # carrega AWS_* do ambiente

spark = (
    SparkSession.builder
    .appName("S3 Access Example")
    .master("local[*]")
    .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.EnvironmentVariableCredentialsProvider")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true")
    .getOrCreate()
)

aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
aws_region = os.getenv("AWS_REGION", "us-east-2")

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", f"s3.{aws_region}.amazonaws.com")

print("Spark session ready")

Spark session ready


In [22]:
bronze_ipca = "s3a://ferdelfino-xpe/bronze/ipca/"
bronze_pre = "s3a://ferdelfino-xpe/bronze/pre/"

silver_ipca = "s3a://ferdelfino-xpe/processed-data/ipca/silver/"
silver_pre = "s3a://ferdelfino-xpe/processed-data/pre/silver/"

gold_ipca = "s3a://ferdelfino-xpe/analytics/ipca/gold/"
gold_pre = "s3a://ferdelfino-xpe/analytics/pre/gold/"

for p in [bronze_ipca, bronze_pre, silver_ipca, silver_pre, gold_ipca, gold_pre]:
    print(p)

s3a://ferdelfino-xpe/bronze/ipca/
s3a://ferdelfino-xpe/bronze/pre/
s3a://ferdelfino-xpe/processed-data/ipca/silver/
s3a://ferdelfino-xpe/processed-data/pre/silver/
s3a://ferdelfino-xpe/analytics/ipca/gold/
s3a://ferdelfino-xpe/analytics/pre/gold/


In [23]:
# Ler Bronze
df_bronze_ipca = spark.read.json(bronze_ipca)
df_bronze_pre = spark.read.json(bronze_pre)
print("Bronze IPCA count:", df_bronze_ipca.count())
print("Bronze PRE count:", df_bronze_pre.count())

Bronze IPCA count: 560
Bronze PRE count: 512


In [24]:
# Silver: deduplicar, converter timestamps, tratar nulos
def to_silver(df):
    df_s = df.dropDuplicates()
    df_s = df_s.withColumn("Data_Vencimento", from_unixtime(col("Data_Vencimento")/1000, "yyyy-MM-dd")) \
               .withColumn("Data_Base", from_unixtime(col("Data_Base")/1000, "yyyy-MM-dd")) \
               .withColumn("dt_update", from_unixtime(col("dt_update")/1000, "yyyy-MM-dd HH:mm:ss"))
    df_s = df_s.fillna({"PUCompraManha": 0, "PUVendaManha": 0, "PUBaseManha": 0})
    return df_s

silver_ipca_df = to_silver(df_bronze_ipca)
silver_pre_df = to_silver(df_bronze_pre)

silver_ipca_df.write.mode("overwrite").parquet(silver_ipca)
silver_pre_df.write.mode("overwrite").parquet(silver_pre)
print("Silver gravado")

Silver gravado


In [25]:
# Gold: agregações (médias e contagem por tipo)
gold_ipca_df = silver_ipca_df.groupBy("Tipo").agg(
    avg("PUCompraManha").alias("Media_PUCompraManha"),
    avg("PUVendaManha").alias("Media_PUVendaManha"),
    count("*").alias("Total_Registros")
)

gold_pre_df = silver_pre_df.groupBy("Tipo").agg(
    avg("PUCompraManha").alias("Media_PUCompraManha"),
    avg("PUVendaManha").alias("Media_PUVendaManha"),
    count("*").alias("Total_Registros")
)

gold_ipca_df.write.mode("overwrite").parquet(gold_ipca)
gold_pre_df.write.mode("overwrite").parquet(gold_pre)
print("Gold gravado")

Gold gravado


In [None]:
spark.stop()