In [26]:
#pip install --upgrade pandas
#pip install pyspark==4.5.1

In [8]:
# bibliotecas necessárias
import os
import sys
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import date_format, col
from pyspark.sql import functions as F

# Parâmetros de conexão com PostgreSQL
host = "dpg-d2q93uv5r7bs73aesqlg-a.oregon-postgres.render.com"
port = 5432
database = "sicoop"
user = "admin"
password = "vnbygROdVav9q0C5gqF6S7zJjieM7hIr"

def create_spark_session():
    
    print("\nCriando SparkSession...")

    spark = SparkSession.builder \
        .appName("ETL_Sicoop_PostgreSQL") \
        .master("local[*]") \
        .config("spark.driver.extraClassPath", "drivers/postgresql-42.7.7.jar") \
        .config("spark.executor.extraClassPath", "drivers/postgresql-42.7.7.jar") \
        .config("spark.driver.extraJavaOptions", "-Djava.io.tmpdir=C:/temp") \
        .config("spark.executor.extraJavaOptions", "-Djava.io.tmpdir=C:/temp") \
        .config("spark.sql.adaptive.enabled", "false") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "false") \
        .getOrCreate()
        
    return spark

def configure_postgres_connection():
    
    print("\nConexão PostgreSQL...")
    
    jdbc_url = f"jdbc:postgresql://{host}:{port}/{database}"
    connection_properties = {
        "user": user,
        "password": password,
        "driver": "org.postgresql.Driver"
    }
        
    return jdbc_url, connection_properties

def read_table_from_postgres(spark, jdbc_url, connection_properties, table_name):
    
    print(f"Inicia Leitura do Script...")

    df = spark.read.jdbc(
        url=jdbc_url, 
        table=table_name, 
        properties=connection_properties
    )
    
    print(f"Número de linhas retornadas na tabela {table_name}: {df.count()}")
    
    return df


In [7]:
# Cria SparkSession
spark = create_spark_session()

# Configurar conexão PostgreSQL
jdbc_url, connection_properties = configure_postgres_connection()

# Leitura tabelas
df_associado = read_table_from_postgres(spark, jdbc_url, connection_properties, "associado")
df_conta     = read_table_from_postgres(spark, jdbc_url, connection_properties, "conta")
df_cartao    = read_table_from_postgres(spark, jdbc_url, connection_properties, "cartao")
df_movimento = read_table_from_postgres(spark, jdbc_url, connection_properties, "movimento")

df_query = df_movimento \
    .join(df_cartao.withColumnRenamed("data_criacao", "data_criacao_cartao"), "id_cartao") \
    .join(df_conta.withColumnRenamed("data_criacao", "data_criacao_conta"), "id_conta") \
    .join(df_associado, "id_associado")


Criando SparkSession...

Conexão PostgreSQL...
Inicia Leitura do Script...
Número de linhas retornadas na tabela associado: 100
Inicia Leitura do Script...
Número de linhas retornadas na tabela conta: 120
Inicia Leitura do Script...
Número de linhas retornadas na tabela cartao: 240
Inicia Leitura do Script...
Número de linhas retornadas na tabela movimento: 1000


In [11]:
df_result = df_query.groupBy(
    "nome", "sobrenome", "idade",
    "des_tranacao", "data_movimento",
    "num_cartao", "nom_impresso",
    "data_criacao_cartao", "tipo", "data_criacao_conta"
).agg(
    F.sum("vlr_transacao").alias("vlr_transacao_movimento")
)

df_result = df_result.select(
    F.col("nome").alias("nome_associado"),
    F.col("sobrenome").alias("sobrenome_associado"),
    F.col("idade").alias("idade_associado"),
    F.col("vlr_transacao_movimento"),
    F.col("des_tranacao").alias("des_tranacao_movimento"),
    F.col("data_movimento"),
    F.col("num_cartao").alias("numero_cartao"),
    F.col("nom_impresso").alias("nome_impresso_cartao"),
    F.col("data_criacao_cartao"),
    F.col("tipo").alias("tipo_conta"),
    F.col("data_criacao_conta")
)

In [12]:
# Converter timestamps para string no Spark
df_result_treated = df_result
for col_name in df_result.columns:
    if 'data' in col_name.lower():
        df_result_treated = df_result_treated.withColumn(col_name, date_format(col(col_name), 'yyyy-MM-dd HH:mm:ss'))
        
# Converter para Pandas
df_result_output = df_result_treated.toPandas()
        
# Salvar CSV
output_path = "outputs/movimento_final.csv"
df_result_output.to_csv(output_path, sep=';', index=False, header=True, encoding='utf-8')