In [13]:
from pyspark.sql.functions import col, isnan, lit
from pyspark.sql import SparkSession
from pyspark.sql.types import NumericType


In [None]:
# Iniciar Spark
spark = SparkSession.builder \
        .appName("Read from Postgres") \
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.18") \
        .getOrCreate()

    # Configurações do Postgres
pg_url = "jdbc:postgresql://localhost:5432/postgres"
pg_properties = {
        "user": "postgres",
        "password": "postgres",
        "driver": "org.postgresql.Driver"
    }

# Nome da tabela que você quer ler
table_name = "dadostesouroipca"

# Ler os dados do Postgres
df = spark.read.jdbc(
        url=pg_url,
        table=table_name,
        properties=pg_properties
    )

# Mostrar os dados
df.show()


+-----------+----------+-------------+------------+-----------+-------------------+-------------------+----+--------------------+
|CompraManha|VendaManha|PUCompraManha|PUVendaManha|PUBaseManha|    Data_Vencimento|          Data_Base|Tipo|           dt_update|
+-----------+----------+-------------+------------+-----------+-------------------+-------------------+----+--------------------+
|       6.68|      6.72|      1403.66|     1401.08|    1400.62|2015-05-15 00:00:00|2010-06-17 00:00:00|IPCA|2025-05-16 14:48:...|
|       6.27|      6.35|       816.77|      808.14|     807.88|2024-08-15 00:00:00|2010-06-17 00:00:00|IPCA|2025-05-16 14:48:...|
|       6.26|      6.36|       426.82|      416.97|     416.84|2035-05-15 00:00:00|2010-06-17 00:00:00|IPCA|2025-05-16 14:48:...|
|       6.27|      6.37|       425.69|      415.86|     415.73|2035-05-15 00:00:00|2010-06-16 00:00:00|IPCA|2025-05-16 14:48:...|
|       6.74|      6.78|      1399.33|     1396.76|    1396.29|2015-05-15 00:00:00|2010-06

In [7]:
df.printSchema()

root
 |-- CompraManha: double (nullable = true)
 |-- VendaManha: double (nullable = true)
 |-- PUCompraManha: double (nullable = true)
 |-- PUVendaManha: double (nullable = true)
 |-- PUBaseManha: double (nullable = true)
 |-- Data_Vencimento: timestamp (nullable = true)
 |-- Data_Base: timestamp (nullable = true)
 |-- Tipo: string (nullable = true)
 |-- dt_update: timestamp (nullable = true)



In [None]:
# Remover duplicatas
df_no_duplicates = df.dropDuplicates()

# Tratamento de nulos
colunas_numericas = [f.name for f in df_no_duplicates.schema.fields if isinstance(f.dataType, NumericType)]
colunas_nao_numericas = [f.name for f in df_no_duplicates.schema.fields if f.name not in colunas_numericas]

# Preencher colunas numéricas com 0
df_tratado = df_no_duplicates.fillna({coluna: 0 for coluna in colunas_numericas})

# Remover linhas com nulos em colunas não numéricas
df_tratado = df_tratado.dropna(subset=colunas_nao_numericas)

df_tratado.show()

+-----------+----------+-------------+------------+-----------+-------------------+-------------------+----+--------------------+
|CompraManha|VendaManha|PUCompraManha|PUVendaManha|PUBaseManha|    Data_Vencimento|          Data_Base|Tipo|           dt_update|
+-----------+----------+-------------+------------+-----------+-------------------+-------------------+----+--------------------+
|       9.04|       9.1|       751.18|      747.58|     747.28|2015-05-15 00:00:00|2006-08-14 00:00:00|IPCA|2025-05-16 14:48:...|
|       5.78|      5.88|       493.09|      481.84|     481.41|2035-05-15 00:00:00|2010-11-05 00:00:00|IPCA|2025-05-16 14:49:...|
|       5.63|      5.73|       527.98|      516.02|     515.51|2035-05-15 00:00:00|2011-01-28 00:00:00|IPCA|2025-05-16 14:49:...|
|       6.16|       6.2|      1469.19|     1466.64|    1466.07|2015-05-15 00:00:00|2010-10-05 00:00:00|IPCA|2025-05-16 14:49:...|
|       5.79|      5.87|       924.55|      915.14|     914.33|2024-08-15 00:00:00|2011-01