# Objetivo:

- Recuperação dos dados das tabelas 'stg_empresas_bronze' e 'stg_socios_bronze';
- Filtro de colunas;
- Tipagem dos Dados;
- Sanitização da Base;
- Salvar saída das tabelas no banco de dados 'stg_empresas_silver' e 'stg_empresas_silver'.

### Import das Bibliotecas

In [None]:
import polars as pl
import os
import psycopg2
import gc
from sqlalchemy import create_engine
from dotenv import load_dotenv
from psycopg2 import sql
from io import StringIO

### Definição dos Diretórios

In [None]:
DATA_DIR = '../data'

SILVER_DIR = os.path.join(DATA_DIR, 'silver')

# Garante que os diretórios existam
os.makedirs(SILVER_DIR, exist_ok=True)

### Ler as tabelas no Postgres

In [None]:
# Carrega variáveis de ambiente (.env)
load_dotenv(dotenv_path="../.env")

DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")

In [None]:
# Conexão com o PostgreSQL
conn = psycopg2.connect(
    dbname=DB_NAME,
    user=DB_USER,
    password=DB_PASS,
    host=DB_HOST,
    port=DB_PORT
)

### Tabela Empresas

#### Leitura da Raw e Tratamento dos Dados

In [None]:
# Path para salvar o arquivo
csv_empresas_temp = os.path.join(SILVER_DIR, "silver_empresas_temp.csv")

# Se o arquivo já existir, remove antes de começar
if os.path.exists(csv_empresas_temp):
    os.remove(csv_empresas_temp)

# --- Ler dados em chunks e salvar no CSV ---
chunk_size = 500_000  # ajuste conforme memória disponível
first_chunk = True

with conn.cursor(name="server_side_cursor") as cur:
    # Server-side cursor para evitar carregar tudo na memória
    cur.itersize = chunk_size
    cur.execute("SELECT * FROM stg_empresas_bronze;")
    
    for rows in iter(lambda: cur.fetchmany(chunk_size), []):
        # Converte para Polars
        df_chunk = pl.DataFrame(
            rows,
            schema=[desc[0] for desc in cur.description],
            orient="row"
        )
        
        # --- TRATAMENTOS ---
        df_chunk = df_chunk.select([
            df_chunk.columns[0],
            df_chunk.columns[1],
            df_chunk.columns[2],
            df_chunk.columns[3],
            df_chunk.columns[4],
            df_chunk.columns[5]
        ]).rename({
            df_chunk.columns[0]: "cnpj",
            df_chunk.columns[1]: "razao_social",
            df_chunk.columns[2]: "natureza_juridica",
            df_chunk.columns[3]: "qualificacao_responsavel",
            df_chunk.columns[4]: "capital_social",
            df_chunk.columns[5]: "cod_porte"
        })

        # Conversão de tipos e limpeza
        df_chunk = df_chunk.with_columns([
            pl.col("natureza_juridica").cast(pl.Int32, strict=False).fill_null(-1),
            pl.col("qualificacao_responsavel").cast(pl.Int32, strict=False).fill_null(-1),
            pl.col("capital_social").str.replace(",", ".").cast(pl.Float64, strict=False).fill_null(0.0),
            pl.col("cnpj").cast(pl.Utf8),
            pl.col("razao_social").cast(pl.Utf8).str.strip_chars(),
            pl.col("cod_porte").cast(pl.Int32, strict=False).fill_null(-1)
        ])

        # Remove duplicatas dentro do chunk
        df_chunk = df_chunk.unique()

        # --- SALVAR CSV incremental ---
        with open(csv_empresas_temp, "a" if not first_chunk else "w", encoding="utf-8") as f:
            df_chunk.write_csv(
                file=f,
                separator="~",
                include_header=first_chunk,
                quote_style="necessary"
            )
                
        first_chunk = False

        # --- LIBERAR MEMÓRIA ---
        del df_chunk
        gc.collect()

print("CSV final limpo gerado no ", csv_empresas_temp)

### Tabela Sócios

#### Leitura Raw e Tratamento de Dados

In [None]:
# Path para salvar o arquivo
csv_socios_temp = os.path.join(SILVER_DIR, "silver_socios_temp.csv")

# Se o arquivo já existir, remove antes de começar
if os.path.exists(csv_socios_temp):
    os.remove(csv_socios_temp)

# --- Ler dados em chunks e salvar no CSV ---
chunk_size = 500_000  # ajuste conforme memória disponível
first_chunk = True

with conn.cursor(name="server_side_cursor") as cur:
    # Server-side cursor para evitar carregar tudo na memória
    cur.itersize = chunk_size
    cur.execute("SELECT * FROM stg_socios_bronze;")
    
    for rows in iter(lambda: cur.fetchmany(chunk_size), []):
        # Converte para Polars
        df_chunk = pl.DataFrame(rows, schema=[desc[0] for desc in cur.description], orient="row")
        
        # --- TRATAMENTOS ---
        df_chunk = df_chunk.select([
            df_chunk.columns[0],
            df_chunk.columns[1],
            df_chunk.columns[2],
            df_chunk.columns[3],
            df_chunk.columns[4]
        ])
        
        df_chunk = df_chunk.rename({
            df_chunk.columns[0]: "cnpj",
            df_chunk.columns[1]: "tipo_socio",
            df_chunk.columns[2]: "nome_socio",
            df_chunk.columns[3]: "documento_socio",
            df_chunk.columns[4]: "codigo_qualificacao_socio"
        })

        # Conversão de tipos
        df_chunk = df_chunk.with_columns([
            pl.col("tipo_socio").cast(pl.Int32, strict=False).fill_null(-1),
            pl.col("cnpj").cast(pl.Utf8),
            pl.col("nome_socio").cast(pl.Utf8),
            pl.col("documento_socio").cast(pl.Utf8),
            pl.col("codigo_qualificacao_socio").cast(pl.Utf8)
        ])
        
        # Limpeza de strings
        df_chunk = df_chunk.with_columns([
            pl.col("nome_socio").str.strip_chars()
        ])
        
        # Remove duplicatas
        df_chunk = df_chunk.unique()
        
        # --- SALVAR CSV incremental ---
        with open(csv_socios_temp, "a" if not first_chunk else "w", encoding="utf-8") as f:
            df_chunk.write_csv(
                file=f,
                separator="~",
                include_header=first_chunk,
                quote_style="necessary"
            )
                
        first_chunk = False

        # --- LIBERAR MEMÓRIA ---
        del df_chunk
        gc.collect()

print("CSV final limpo gerado no ", csv_socios_temp)

### Salvar no Postgres

#### Mapeamento dos tipos do Polars para o Postgres

In [None]:
def inferir_tipo_postgres(dtype: pl.datatypes.DataType) -> str:
    if dtype == pl.Int64 or dtype == pl.Int32 or dtype == pl.UInt32 or dtype == pl.UInt64:
        return "BIGINT"
    elif dtype == pl.Float64 or dtype == pl.Float32:
        return "DOUBLE PRECISION"
    elif dtype == pl.Boolean:
        return "BOOLEAN"
    elif dtype == pl.Datetime or dtype == pl.Date:
        return "TIMESTAMP"
    else:
        return "TEXT"

#### Função escrever no banco de Dados

In [None]:
def escrever_banco(csv_temp, nome_tabela,
                   dbname, user, password, host, port):
    
    # Lê apenas 1000 linhas para inferir o schema
    df_sample = pl.read_csv(
        csv_temp,
        separator="~",
        encoding="utf-8",
        has_header=True,  # use True se seu CSV tem header
        infer_schema_length=1000,
        n_rows=1000,
        truncate_ragged_lines=True,
        ignore_errors=True
    )

    # Monta schema dinâmico
    colunas = []
    for col, dtype in zip(df_sample.columns, df_sample.dtypes):
        pg_tipo = inferir_tipo_postgres(dtype)
        colunas.append(f"{col} {pg_tipo}")

    schema_sql = ",\n    ".join(colunas)

    # Conecta no Postgres
    conn = psycopg2.connect(
        dbname=dbname,
        user=user,
        password=password,
        host=host,
        port=port
    )
    cur = conn.cursor()

    # Cria tabela com nome dinâmico
    cur.execute(sql.SQL("""
        DROP TABLE IF EXISTS {tabela};
        CREATE TABLE {tabela} (
            {schema}
        );
    """).format(
        tabela=sql.Identifier(nome_tabela),
        schema=sql.SQL(schema_sql)
    ))

    # COPY direto, sem abrir no Python
    with open(csv_temp, "r", encoding="utf-8") as f:
        cur.copy_expert(
            sql.SQL("COPY {tabela} FROM STDIN WITH CSV HEADER DELIMITER '~'").format(
                tabela=sql.Identifier(nome_tabela)
            ),
            f
        )

    conn.commit()
    cur.close()
    conn.close()

    print(f"Tabela '{nome_tabela}' salva no Postgres via COPY")

    # Remove CSV temporário
    if os.path.exists(csv_temp):
        os.remove(csv_temp)
        print("Arquivo temporário removido:", csv_temp)

#### Chamar funções

In [None]:
escrever_banco(
    csv_temp=csv_empresas_temp,
    nome_tabela="stg_empresas_silver",
    dbname=DB_NAME,
    user=DB_USER,
    password=DB_PASS,
    host=DB_HOST,
    port=DB_PORT
)

In [None]:
escrever_banco(
    csv_temp=csv_socios_temp,
    nome_tabela="stg_socios_silver",
    dbname=DB_NAME,
    user=DB_USER,
    password=DB_PASS,
    host=DB_HOST,
    port=DB_PORT
)