In [133]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper , to_date
from pyspark.sql.types import DateType
from dotenv import load_dotenv

# Carregar variáveis de ambiente do caminho dentro do container
load_dotenv('../.env_kafka_connect')

# Config AWS e Postgres (variáveis de ambiente)
aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
aws_region = "us-east-1"

pg_host = os.getenv("POSTGRES_HOST")
pg_port = os.getenv("POSTGRES_PORT", "5432")
pg_db = os.getenv("POSTGRES_DB")
pg_user = os.getenv("POSTGRES_USER")
pg_password = os.getenv("POSTGRES_PASSWORD")

# JDBC URL e propriedades de conexão
#jdbc_url = f"jdbc:postgresql://{pg_host}:{pg_port}/{pg_db}"
jdbc_url = f"jdbc:postgresql://host.docker.internal:5432/postgres"
jdbc_properties = {
    "user": pg_user,
    "password": pg_password,
    "driver": "org.postgresql.Driver"
}

print(jdbc_url)
print(jdbc_properties)


jdbc:postgresql://host.docker.internal:5432/postgres
{'user': 'postgres', 'password': 'postgres', 'driver': 'org.postgresql.Driver'}


In [134]:
# Caminho dos jars para Spark (Postgres + AWS)
hadoop_aws_jar = "../jars/hadoop-aws-3.3.4.jar"
aws_sdk_jar = "../jars/aws-java-sdk-bundle-1.12.262.jar"
postgres_jdbc_jar = "../jars/postgresql-42.6.2.jar"
jars_path = f"{hadoop_aws_jar},{aws_sdk_jar},{postgres_jdbc_jar}"

print(f"jars_path: {jars_path}")

# Criar SparkSession com configurações AWS + JDBC
spark = SparkSession.builder \
    .appName("Pipeline - Silver") \
    .config("spark.jars", jars_path) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", aws_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", aws_secret_key) \
    .config("spark.hadoop.fs.s3a.endpoint", f"s3.{aws_region}.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .getOrCreate()

# Colunas obrigatórias para limpeza
colunas_obrigatorias = ["CompraManha", "VendaManha", "PUCompraManha", "PUVendaManha", "PUBaseManha", "Data_Vencimento", "Data_Base", "Tipo"]

jars_path: ../jars/hadoop-aws-3.3.4.jar,../jars/aws-java-sdk-bundle-1.12.262.jar,../jars/postgresql-42.6.2.jar


In [135]:
# Ler dados Bronze do Postgres
df_pre = spark.read.jdbc(url=jdbc_url, table="public.dadostesouropre", properties=jdbc_properties)
df_ipca = spark.read.jdbc(url=jdbc_url, table="public.dadostesouroipca", properties=jdbc_properties)
df_pre.show()

+-----------+----------+-------------+------------+-----------+-----------+--------------------+
|CompraManha|VendaManha|PUCompraManha|PUVendaManha|PUBaseManha|       Tipo|           dt_update|
+-----------+----------+-------------+------------+-----------+-----------+--------------------+
|      18.93|     18.99|       851.89|      851.49|      850.9|PRE-FIXADOS|2025-08-31 19:09:...|
|      18.75|     18.79|        971.1|      971.04|     970.38|PRE-FIXADOS|2025-08-31 19:09:...|
|      18.46|     18.54|       786.63|      785.88|     785.35|PRE-FIXADOS|2025-08-31 19:09:...|
|      19.13|     19.17|       929.01|      928.88|     928.23|PRE-FIXADOS|2025-08-31 19:09:...|
|      19.13|     19.19|        888.0|      887.69|     887.08|PRE-FIXADOS|2025-08-31 19:09:...|
|      18.74|     18.78|       970.45|      970.39|     969.73|PRE-FIXADOS|2025-08-31 19:09:...|
|      19.12|     19.16|        928.4|      928.27|     927.62|PRE-FIXADOS|2025-08-31 19:09:...|
|      19.12|     19.18|      

In [136]:
df_pre.describe()

DataFrame[summary: string, CompraManha: string, VendaManha: string, PUCompraManha: string, PUVendaManha: string, PUBaseManha: string, Tipo: string]

In [None]:
# Drop duplicatas
df_ipca = df_ipca.dropDuplicates()
df_pre = df_pre.dropDuplicates()

# Ajuste de tipos para colunas de data
df_ipca = df_ipca.withColumn("Data_Base", col("Data_Base").cast("date"))
df_ipca = df_ipca.withColumn("Data_Vencimento", col("Data_Vencimento").cast("date"))
df_pre = df_pre.withColumn("Data_Base", col("Data_Base").cast("date"))
df_pre = df_pre.withColumn("Data_Vencimento", col("Data_Vencimento").cast("date"))

# Drop nulos nas colunas importantes
df_ipca = df_ipca.dropna(subset=colunas_obrigatorias)
df_pre = df_pre.dropna(subset=colunas_obrigatorias)

# Padronizar coluna Tipo para maiusculo
df_ipca = df_ipca.withColumn("Tipo", upper(col("Tipo")))
df_pre = df_pre.withColumn("Tipo", upper(col("Tipo")))

# Gravar os dados tratados na Silver
df_ipca.write.jdbc(url=jdbc_url, table="public.dadostesouroipca_prata", mode="overwrite", properties=jdbc_properties)
df_pre.write.jdbc(url=jdbc_url, table="public.dadostesouropre_prata", mode="overwrite", properties=jdbc_properties)

print("Pipeline Silver executado com sucesso!")

{"ts": "2025-08-31 20:15:02.296", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `Data_Base` cannot be resolved. Did you mean one of the following? [`dt_update`, `Tipo`, `VendaManha`, `CompraManha`, `PUBaseManha`]. SQLSTATE: 42703", "context": {"file": "line 6 in cell [137]", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o460.withColumn.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `Data_Base` cannot be resolved. Did you mean one of the following? [`dt_update`, `Tipo`, `VendaManha`, `CompraManha`, `PUBaseManha`]. SQLSTATE: 42703;\n'Project [CompraManha#526, VendaManha#527, PUCompraManha#528, PUVendaManha#529, PUBaseManha#530, Tipo#531, dt_update#532, cast('Data_Base as date) AS Da

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `Data_Base` cannot be resolved. Did you mean one of the following? [`dt_update`, `Tipo`, `VendaManha`, `CompraManha`, `PUBaseManha`]. SQLSTATE: 42703;
'Project [CompraManha#526, VendaManha#527, PUCompraManha#528, PUVendaManha#529, PUBaseManha#530, Tipo#531, dt_update#532, cast('Data_Base as date) AS Data_Base#655]
+- Deduplicate [PUVendaManha#529, PUBaseManha#530, VendaManha#527, dt_update#532, PUCompraManha#528, CompraManha#526, Tipo#531]
   +- Relation [CompraManha#526,VendaManha#527,PUCompraManha#528,PUVendaManha#529,PUBaseManha#530,Tipo#531,dt_update#532] JDBCRelation(public.dadostesouroipca) [numPartitions=1]
