# Silver to Gold Transformation

Este notebook executa a transforma√ß√£o da camada **Silver** para a camada **Gold**.

## Objetivos:
1. Configurar ambiente PySpark
2. Conectar ao PostgreSQL Data Warehouse
3. Ler dados da tabela `bronze.bop_clean`
4. Aplicar transforma√ß√µes de agrega√ß√£o e enriquecimento
5. Salvar dados na camada Gold

---

## 1. Configura√ß√£o de Caminhos e Imports

### 1.1. Configura√ß√£o de Vari√°veis de Ambiente

Esta c√©lula configura as vari√°veis de ambiente necess√°rias para conectar ao PostgreSQL quando o notebook √© executado **manualmente** (fora do Airflow).

**No Airflow**: Usa as configura√ß√µes do Docker (host=`postgres`, porta=`5432`)  
**Manualmente**: Usa localhost com a porta mapeada (host=`localhost`, porta=`5433`)

In [None]:
import sys
import os
from pathlib import Path
# Configurar vari√°veis de ambiente para conex√£o PostgreSQL quando rodar manualmente
if 'AIRFLOW_HOME' not in os.environ:
    print("üîß Configurando vari√°veis de ambiente para execu√ß√£o manual...")
    
    # Configura√ß√£o do Data Warehouse (conex√£o local via Docker)
    os.environ['POSTGRES_DW_DB'] = 'data_warehouse'
    os.environ['POSTGRES_DW_USER'] = 'dw_user'
    os.environ['POSTGRES_DW_PASSWORD'] = 'dw_password'
    os.environ['POSTGRES_HOST'] = 'localhost'  # Fora do Docker usa localhost
    os.environ['POSTGRES_PORT'] = '5433'        # Porta mapeada no docker-compose
    
    print(f"   ‚úÖ Database: {os.environ['POSTGRES_DW_DB']}")
    print(f"   ‚úÖ Host: {os.environ['POSTGRES_HOST']}")
    print(f"   ‚úÖ Port: {os.environ['POSTGRES_PORT']}")
else:
    print("‚úÖ Rodando no Airflow - usando configura√ß√µes do ambiente Docker")

üîß Configurando vari√°veis de ambiente para execu√ß√£o manual...
   ‚úÖ Database: data_warehouse
   ‚úÖ Host: localhost
   ‚úÖ Port: 5433


In [31]:
import sys
import os
from pathlib import Path

# Define paths
if 'AIRFLOW_HOME' in os.environ:
    # Running in Airflow
    BASE_PATH = Path('/opt/airflow')
    SPARK_CONFIG_PATH = BASE_PATH / 'spark_config'
    POSTGRES_HELPER_PATH = BASE_PATH / 'postgres' / 'helpers'
    POSTGRES_PLUGINS_PATH = BASE_PATH / 'postgres' / 'plugins'
else:
    # Running manually
    BASE_PATH = Path.cwd().parent.parent
    SPARK_CONFIG_PATH = BASE_PATH / 'spark_config'
    POSTGRES_HELPER_PATH = BASE_PATH / 'postgres' / 'helpers'
    POSTGRES_PLUGINS_PATH = BASE_PATH / 'postgres' / 'plugins'

# Add paths to system path
sys.path.insert(0, str(SPARK_CONFIG_PATH))
sys.path.insert(0, str(POSTGRES_HELPER_PATH))
sys.path.insert(0, str(POSTGRES_PLUGINS_PATH))

print(f"‚úÖ Base Path: {BASE_PATH}")
print(f"‚úÖ Spark Config Path: {SPARK_CONFIG_PATH}")
print(f"‚úÖ Postgres Helper Path: {POSTGRES_HELPER_PATH}")
print(f"‚úÖ Postgres Plugins Path: {POSTGRES_PLUGINS_PATH}")

‚úÖ Base Path: /home/davi/√Årea de Trabalho/bancos4/TrabalhoSBD2
‚úÖ Spark Config Path: /home/davi/√Årea de Trabalho/bancos4/TrabalhoSBD2/spark_config
‚úÖ Postgres Helper Path: /home/davi/√Årea de Trabalho/bancos4/TrabalhoSBD2/postgres/helpers
‚úÖ Postgres Plugins Path: /home/davi/√Årea de Trabalho/bancos4/TrabalhoSBD2/postgres/plugins


## 2. Inicializar PySpark

In [32]:
from config import SparkConfig, DataSchemas
from pyspark.sql import functions as F

# Create Spark session
spark_config = SparkConfig(app_name="Silver_Gold_Transformation")
spark = spark_config.create_spark_session()
spark_config.configure_for_banking_data()

print(f"‚úÖ Spark Version: {spark.version}")
print(f"‚úÖ Spark UI: {spark.sparkContext.uiWebUrl}")

INFO:config:Spark Session criada: Silver_Gold_Transformation
INFO:config:Spark UI dispon√≠vel em: http://192.168.0.12:4040
INFO:config:Spark UI dispon√≠vel em: http://192.168.0.12:4040


‚úÖ Spark Version: 3.5.1
‚úÖ Spark UI: http://192.168.0.12:4040


## 3. Configurar Conex√£o com PostgreSQL

In [33]:
from postgres_helper import get_postgres_conn
from cliente_postgres import ClientPostgresDB

# Conectar ao Data Warehouse usando a conex√£o correta
conn_str = get_postgres_conn(conn_id="postgres_dw")
client = ClientPostgresDB(conn_str)

print("‚úÖ Conex√£o com PostgreSQL estabelecida")
print(f"   Conn ID: postgres_dw (Data Warehouse)")

INFO:root:[postgres_helpers] Using manual PostgreSQL connection (conn_id=postgres_dw): dbname=data_warehouse, user=dw_user, host=localhost, port=5433
INFO:root:[cliente_postgres.py] Initialized ClientPostgresDB with conn_str: dbname=data_warehouse user=dw_user password=dw_password host=localhost port=5433
INFO:root:[cliente_postgres.py] Initialized ClientPostgresDB with conn_str: dbname=data_warehouse user=dw_user password=dw_password host=localhost port=5433


‚úÖ Conex√£o com PostgreSQL estabelecida
   Conn ID: postgres_dw (Data Warehouse)


## 4. Ler Dados da Tabela Bronze

Vamos ler os dados da tabela `bronze.bop_clean` que foi criada no processo Bronze ‚Üí Silver.

In [34]:
# Query para ler dados da tabela bronze.bop_clean
query = """
SELECT *
FROM silver.bop_clean
"""

print("üìä Executando query no PostgreSQL...")
print(f"   Schema: silver")
print(f"   Tabela: bop_clean")

üìä Executando query no PostgreSQL...
   Schema: silver
   Tabela: bop_clean


In [35]:
# Executar query e obter resultados
result = client.execute_query(query)

if result:
    print(f"‚úÖ Query executada com sucesso!")
    print(f"   Registros retornados: {len(result):,}")
else:
    print("‚ö†Ô∏è  Nenhum registro encontrado")

INFO:root:[cliente_postgres.py] Executing query: 
SELECT *
FROM silver.bop_clean

INFO:root:[cliente_postgres.py] Query executed successfully, fetched 5615 rows
INFO:root:[cliente_postgres.py] Query executed successfully, fetched 5615 rows


‚úÖ Query executada com sucesso!
   Registros retornados: 5,615


## 5. Converter para DataFrame PySpark

Vamos converter os dados do PostgreSQL para um DataFrame do PySpark para aplicar transforma√ß√µes.

In [36]:
# Converter lista de dicts para DataFrame Spark
if result:
    df_bop_clean = spark.createDataFrame(result)
    
    print(f"‚úÖ DataFrame PySpark criado")
    print(f"   Total de registros: {df_bop_clean.count():,}")
    print(f"   Total de colunas: {len(df_bop_clean.columns)}")
else:
    print("‚ùå N√£o foi poss√≠vel criar DataFrame - dados vazios")

‚úÖ DataFrame PySpark criado


25/11/06 23:38:14 WARN TaskSetManager: Stage 4 contains a task of very large size (1041 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

   Total de registros: 5,615
   Total de colunas: 106


## 6. Explorar Dados - Primeiras Linhas

In [37]:
# Mostrar as primeiras 5 linhas
print("üìã Primeiras 5 linhas do DataFrame:")
print("="*80)
df_bop_clean.show(5, truncate=False, vertical=False)

üìã Primeiras 5 linhas do DataFrame:
+---+-------+---+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----------------+------+-----------------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+
|_1 |_2     |_3 |_4  |_5 |_6 |_7 |_8 |_9 |_10|_11|_12|_13|_14|_15|_16|_17|_18|_19|_20|_21|_22|_23|_24|_25|_26|_27|_28|_29|_30|_31|_32|_33|_34|_35|_36|_37|_38|_39|_40|_41|_42|_43|_44   |_45              |_46   |_47              |_48 |_49|_50|_51|_52|_53|_54|_55|_56|_57|_58|_59|_60|_61|_62|_63|_64|_65|_66|_67|_68|_69|_70|_71|_72|_73|_74|_75|_76 |_77|_78|_79|_80|_81|_82|_83|_84|_85|_86|_87|_88|_89|_90|_91|_92|_93|_94|_95|_96|_97|_98|_99|_100|_101|_102|_103|_104|_105|_106|
+---+-------+-

## 7. Informa√ß√µes do Schema

In [38]:
# Exibir schema do DataFrame
print("üìä Schema do DataFrame:")
print("="*80)
df_bop_clean.printSchema()

üìä Schema do DataFrame:
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)
 |-- _5: string (nullable = true)
 |-- _6: string (nullable = true)
 |-- _7: string (nullable = true)
 |-- _8: string (nullable = true)
 |-- _9: string (nullable = true)
 |-- _10: string (nullable = true)
 |-- _11: string (nullable = true)
 |-- _12: string (nullable = true)
 |-- _13: string (nullable = true)
 |-- _14: string (nullable = true)
 |-- _15: string (nullable = true)
 |-- _16: string (nullable = true)
 |-- _17: string (nullable = true)
 |-- _18: string (nullable = true)
 |-- _19: string (nullable = true)
 |-- _20: string (nullable = true)
 |-- _21: string (nullable = true)
 |-- _22: string (nullable = true)
 |-- _23: string (nullable = true)
 |-- _24: string (nullable = true)
 |-- _25: string (nullable = true)
 |-- _26: string (nullable = true)
 |-- _27: string (nullable = true)
 |-- _28: string (nullable = true

In [29]:
# Fechar conex√£o Spark
print("\n" + "="*80)
print("‚úÖ Notebook Silver ‚Üí Gold executado com sucesso!")
print("="*80)


‚úÖ Notebook Silver ‚Üí Gold executado com sucesso!
