In [31]:
import os
import sys

# Sobe o nível da pasta para a raiz do projeto, onde configs.py está localizado
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

import configs
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

print("configs.py em:", configs.__file__)
print("BRONZE_PATH:", configs.BRONZE_PATH)
print("SILVER_PATH:", configs.SILVER_PATH)

spark = (
    SparkSession.builder
        .appName("sicredi-silver")
        .getOrCreate()
)

spark


configs.py em: c:\Users\viami\Documents\projetos\teste-tecnico-engenharia-de-dados-sicredi-pl\configs.py
BRONZE_PATH: c:\Users\viami\Documents\projetos\teste-tecnico-engenharia-de-dados-sicredi-pl\data\bronze
SILVER_PATH: c:\Users\viami\Documents\projetos\teste-tecnico-engenharia-de-dados-sicredi-pl\data\silver


In [None]:
# Ler camada Bronze

df_associado = (
    spark.read.parquet(os.path.join(configs.BRONZE_PATH, "associado"))
        .withColumnRenamed("id", "id_associado")
)

df_conta = (
    spark.read.parquet(os.path.join(configs.BRONZE_PATH, "conta"))
        .withColumnRenamed("id", "id_conta")
)

df_cartao = (
    spark.read.parquet(os.path.join(configs.BRONZE_PATH, "cartao"))
        .withColumnRenamed("id", "id_cartao")
)

df_movimento = (
    spark.read.parquet(os.path.join(configs.BRONZE_PATH, "movimento"))
        .withColumnRenamed("id", "id_movimento")
)

print("✔️ Bronze carregada:")
print("associado:", df_associado.count())
print("conta    :", df_conta.count())
print("cartao   :", df_cartao.count())
print("movimento:", df_movimento.count())


✔️ Bronze carregada:
associado: 700
conta    : 700
cartao   : 700
movimento: 3848


In [None]:
# criar df_movimento_flat
df_movimento_flat = (
    df_movimento
        .join(df_cartao,   df_movimento.id_cartao   == df_cartao.id_cartao,   "inner")
        .join(df_conta,    df_cartao.id_conta       == df_conta.id_conta,     "inner")
        .join(df_associado,df_cartao.id_associado   == df_associado.id_associado, "inner")
        .select(
            # ASSOCIADO
            F.col("nome").alias("nome_associado"),
            F.col("sobrenome").alias("sobrenome_associado"),
            F.col("idade").alias("idade_associado"),

            # MOVIMENTO
            F.col("vlr_transacao").alias("vlr_transacao_movimento"),
            F.col("des_transacao").alias("des_transacao_movimento"),
            F.col("data_movimento"),

            # CARTAO
            F.col("num_cartao").alias("numero_cartao"),
            F.col("nom_impresso").alias("nome_impresso_cartao"),

            # CONTA
            F.col("tipo").alias("tipo_conta"),
            F.col("data_criacao").alias("data_criacao_conta"),
        )
)

df_movimento_flat.printSchema()
df_movimento_flat.show(5, truncate=False)


root
 |-- nome_associado: string (nullable = true)
 |-- sobrenome_associado: string (nullable = true)
 |-- idade_associado: integer (nullable = true)
 |-- vlr_transacao_movimento: decimal(10,2) (nullable = true)
 |-- des_transacao_movimento: string (nullable = true)
 |-- data_movimento: timestamp (nullable = true)
 |-- numero_cartao: long (nullable = true)
 |-- nome_impresso_cartao: string (nullable = true)
 |-- tipo_conta: string (nullable = true)
 |-- data_criacao_conta: timestamp (nullable = true)

+--------------+-------------------+---------------+-----------------------+-----------------------+-------------------+----------------+--------------------+----------+-------------------+
|nome_associado|sobrenome_associado|idade_associado|vlr_transacao_movimento|des_transacao_movimento|data_movimento     |numero_cartao   |nome_impresso_cartao|tipo_conta|data_criacao_conta |
+--------------+-------------------+---------------+-----------------------+-----------------------+-------------

In [None]:
# ajustar tipos de dados
df_movimento_flat = df_movimento_flat.select(
    F.col("nome_associado").cast("string"),
    F.col("sobrenome_associado").cast("string"),
    F.col("idade_associado").cast("string"),

    F.col("vlr_transacao_movimento").cast("string"),
    F.col("des_transacao_movimento").cast("string"),
    F.col("data_movimento").cast("string"),

    F.col("numero_cartao").cast("string"),
    F.col("nome_impresso_cartao").cast("string"),

    F.col("tipo_conta").cast("string"),
    F.col("data_criacao_conta").cast("string"),
)


In [None]:
# remover duplicatas(boas praticas)
df_movimento_flat = df_movimento_flat.dropDuplicates()

In [None]:
# carregar Silver full
silver_base = configs.SILVER_PATH
print("Salvando Silver em:", silver_base)

os.makedirs(silver_base, exist_ok=True)

(
    df_movimento_flat
        .coalesce(1)              
        .write
        .mode("overwrite")        
        .option("header", True)
        .csv(silver_base)
)

print("Silver gerado com sucesso!")


Salvando Silver em: c:\Users\viami\Documents\projetos\teste-tecnico-engenharia-de-dados-sicredi-pl\data\silver
Silver gerado com sucesso!


In [None]:
# 5) Encerrar Spark

spark.stop()
