In [19]:
import logging
from src import config
import polars as pl
import psycopg2
import s3fs

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

logger = logging.getLogger(__name__)


def run():
    print("--- Running Final Join and Load ETL ---")
    bancos = pl.read_parquet(f"{config.TRUSTED_DATA_PATH}/bancos/bancos.parquet", storage_options=config.STORAGE_OPTIONS)
    empregados = pl.read_parquet(f"{config.TRUSTED_DATA_PATH}/empregados/empregados.parquet", storage_options=config.STORAGE_OPTIONS)
    reclamacoes = pl.read_parquet(f"{config.TRUSTED_DATA_PATH}/reclamacoes/reclamacoes.parquet", storage_options=config.STORAGE_OPTIONS)

    df_bancos_empregados_1 = bancos.join(
        empregados,
        left_on="CNPJ",
        right_on="CNPJ",
        how="left"
    ).select(
        [
            'SEGMENTO', 
            'CNPJ', 
            'NOME', 
            'EMPLOYER_NAME', 
            'REVIEWS_COUNT', 
            'CULTURE_COUNT', 
            'SALARIES_COUNT', 
            'BENEFITS_COUNT', 
            'EMPLOYERWEBSITE', 
            'EMPLOYERHEADQUARTERS', 
            'EMPLOYERFOUNDED', 
            'EMPLOYERINDUSTRY',
            'EMPLOYERREVENUE', 
            'URL', 
            'GERAL', 
            'CULTURA_E_VALORES', 
            'DIVERSIDADE_E_INCLUSAO', 
            'QUALIDADE_DE_VIDA', 
            'ALTA_LIDERANCA', 
            'REMUNERACAO_E_BENEFICIOS', 
            'OPORTUNIDADES_DE_CARREIRA', 
            'RECOMENDAM_PARA_OUTRAS_PESSOAS', 
            'PERSPECTIVA_POSITIVA_DA_EMPRESA', 
            'MATCH_PERCENT'
        ]
    )

    df_bancos_empregados_2 = bancos.join(
        empregados,
        left_on=['SEGMENTO', 'NOME'],
        right_on=['SEGMENTO', 'NOME'],
        how="left"
    ).select(
        [
            'SEGMENTO', 
            'CNPJ', 
            'NOME', 
            'EMPLOYER_NAME', 
            'REVIEWS_COUNT', 
            'CULTURE_COUNT', 
            'SALARIES_COUNT', 
            'BENEFITS_COUNT', 
            'EMPLOYERWEBSITE', 
            'EMPLOYERHEADQUARTERS', 
            'EMPLOYERFOUNDED', 
            'EMPLOYERINDUSTRY',
            'EMPLOYERREVENUE', 
            'URL', 
            'GERAL', 
            'CULTURA_E_VALORES', 
            'DIVERSIDADE_E_INCLUSAO', 
            'QUALIDADE_DE_VIDA', 
            'ALTA_LIDERANCA', 
            'REMUNERACAO_E_BENEFICIOS', 
            'OPORTUNIDADES_DE_CARREIRA', 
            'RECOMENDAM_PARA_OUTRAS_PESSOAS', 
            'PERSPECTIVA_POSITIVA_DA_EMPRESA', 
            'MATCH_PERCENT'
        ]
    )

    df_bancos_empregados = pl.concat([df_bancos_empregados_1, df_bancos_empregados_2]).unique()

    df_reclamacoes = reclamacoes.filter(
        pl.col("CNPJ_IF") != pl.lit('')
    )
    
    print(len(df_reclamacoes))
    print(df_reclamacoes.head())

    df_reclamacoes.write_csv("reclamacoes.csv")

    df_final = df_bancos_empregados.join(
        df_reclamacoes,
        left_on="CNPJ",
        right_on="CNPJ_IF",
        how="left"
    ).select(
        [
            'SEGMENTO', 
            'CNPJ', 
            'NOME', 
            'EMPLOYER_NAME', 
            'REVIEWS_COUNT', 
            'CULTURE_COUNT', 
            'SALARIES_COUNT', 
            'BENEFITS_COUNT', 
            'EMPLOYERWEBSITE', 
            'EMPLOYERHEADQUARTERS', 
            'EMPLOYERFOUNDED', 
            'EMPLOYERINDUSTRY',
            'EMPLOYERREVENUE', 
            'URL', 
            'GERAL', 
            'CULTURA_E_VALORES', 
            'DIVERSIDADE_E_INCLUSAO', 
            'QUALIDADE_DE_VIDA', 
            'ALTA_LIDERANCA', 
            'REMUNERACAO_E_BENEFICIOS', 
            'OPORTUNIDADES_DE_CARREIRA', 
            'RECOMENDAM_PARA_OUTRAS_PESSOAS', 
            'PERSPECTIVA_POSITIVA_DA_EMPRESA', 
            'MATCH_PERCENT',
            'ANO', 
            'TRIMESTRE', 
            'CATEGORIA', 
            'TIPO', 
            'INSTITUICAO_FINANCEIRA', 
            'INDICE', 
            'QUANTIDADE_DE_RECLAMACOES_REGULADAS_PROCEDENTES', 
            'QUANTIDADE_DE_RECLAMACOES_REGULADAS_OUTRAS', 
            'QUANTIDADE_DE_RECLAMACOES_NAO_REGULADAS', 
            'QUANTIDADE_TOTAL_DE_RECLAMACOES', 
            'QUANTIDADE_TOTAL_DE_CLIENTES_CCS_E_SCR', 
            'QUANTIDADE_DE_CLIENTES_CCS', 
            'QUANTIDADE_DE_CLIENTES_SCR'
        ]
    )

    print(f"Total rows after join: {len(df_final)}")
    print(df_final.head())

    df_final.write_csv("final.csv")

    fs = s3fs.S3FileSystem(
        key=config.STORAGE_OPTIONS["aws_access_key_id"],
        secret=config.STORAGE_OPTIONS["aws_secret_access_key"],
        endpoint_url=config.STORAGE_OPTIONS["aws_endpoint_url"]
    )

    destination_path = f"{config.S3_BUCKET}/delivery-data/joined_data.parquet"

    with fs.open(destination_path, 'wb') as f:
        df_final.write_parquet(f)

    logging.info(f"Saved {len(df_final)} final rows to {destination_path}")

    # df_final.write_parquet(f"{config.DELIVERY_DATA_PATH}/relatorio_consolidado.parquet", storage_options=config.STORAGE_OPTIONS)
    # print(f"Saved final report to {config.DELIVERY_DATA_PATH}")

    # try:
    #     conn = psycopg2.connect(host=config.DB_HOST, dbname=config.DB_NAME, user=config.DB_USER, password=config.DB_PASSWORD, port=config.DB_PORT)
    #     # ... (your _load_to_postgres helper function logic here) ...
    #     print("Final data loaded to PostgreSQL.")
    # finally:
    #     if 'conn' in locals() and conn: conn.close()

In [20]:
run()

  df_final.write_parquet(f)
2025-08-06 01:19:22,481 INFO Saved 1737 final rows to data-ingestion-edb-ex2/delivery-data/joined_data.parquet


--- Running Final Join and Load ETL ---
437
shape: (5, 15)
┌──────┬───────────┬────────────┬────────────┬───┬────────────┬────────────┬───────────┬───────────┐
│ ANO  ┆ TRIMESTRE ┆ CATEGORIA  ┆ TIPO       ┆ … ┆ QUANTIDADE ┆ QUANTIDADE ┆ QUANTIDAD ┆ UNNAMED_1 │
│ ---  ┆ ---       ┆ ---        ┆ ---        ┆   ┆ _TOTAL_DE_ ┆ _DE_CLIENT ┆ E_DE_CLIE ┆ 4         │
│ str  ┆ str       ┆ str        ┆ str        ┆   ┆ CLIENTES_C ┆ ES_CCS     ┆ NTES_SCR  ┆ ---       │
│      ┆           ┆            ┆            ┆   ┆ …          ┆ ---        ┆ ---       ┆ null      │
│      ┆           ┆            ┆            ┆   ┆ ---        ┆ str        ┆ str       ┆           │
│      ┆           ┆            ┆            ┆   ┆ str        ┆            ┆           ┆           │
╞══════╪═══════════╪════════════╪════════════╪═══╪════════════╪════════════╪═══════════╪═══════════╡
│ 2021 ┆ 1º        ┆ Grupo      ┆ Banco/fina ┆ … ┆ 420692     ┆ 129        ┆ 420563    ┆ null      │
│      ┆           ┆ Secundário 