In [1]:
pip install "flask<2.3,>=2.2" python-dotenv pyspark boto3

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("XP ETL Pipeline) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .getOrCreate()

from pyspark.sql.functions import col, from_unixtime, avg, count
from dotenv import load_dotenv

In [3]:
load_dotenv()

aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
aws_region = os.getenv("S3_REGION")

In [4]:
current_dir = os.getcwd()
hadoop_aws_jar = os.path.join(current_dir, "hadoop-aws-3.3.4.jar")
aws_sdk_jar = os.path.join(current_dir, "aws-java-sdk-bundle-1.12.262.jar")

jars_path = f"{hadoop_aws_jar},{aws_sdk_jar}"

spark = SparkSession.builder \
    .appName("XP ETL Pipeline - S3 Integration") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.jars", jars_path) \
    .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", f"s3.{aws_region}.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")

In [None]:
bronze_path = "s3a://xp-etl-pipeline/raw/kafka/ipca/postgres-postg_ipca/partition=0/"

try:
    df_bronze = spark.read.json(bronze_path)
    print("Bronze!")
    df_bronze.show()
    df_silver.write.mode("overwrite").parquet("s3a://xp-etl-pipeline/processed-data/ipca/1 - bronze/")
except Exception as e:
    print(f"Error accessing S3: {e}")


In [None]:
df_bronze = df_bronze.select("payload.*")

df_silver = df_bronze.dropDuplicates()

df_silver = df_silver.withColumn("Data_Vencimento", from_unixtime(col("Data_Vencimento") / 1000, "yyyy-MM-dd")) \
                     .withColumn("Data_Base", from_unixtime(col("Data_Base") / 1000, "yyyy-MM-dd")) \
                     .withColumn("dt_update", from_unixtime(col("dt_update") / 1000, "yyyy-MM-dd HH:mm:ss"))

df_silver = df_silver.fillna({
    "PUCompraManha": 0,
    "PUVendaManha": 0,
    "PUBaseManha": 0
})

print("Silver!")
df_silver.show(truncate=False)

silver_path = "s3a://xp-etl-pipeline/processed-data/ipca/2 - silver/"
df_silver.write.mode("overwrite").parquet(silver_path)

In [None]:
df_gold = df_silver.groupBy("Tipo").agg(
    avg("PUCompraManha").alias("Media_PUCompraManha"),
    avg("PUVendaManha").alias("Media_PUVendaManha"),
    count("*").alias("Total_Registros")
)

print("Gold!")
df_gold.show(truncate=False)

gold_path = "s3a://xp-etl-pipeline/processed-data/ipca/3 - gold/"
df_gold.write.mode("overwrite").parquet(gold_path)

In [None]:
spark.stop()