In [5]:
!pip install pdfplumber sqlalchemy psycopg2-binary pandas dotenv requests

Collecting requests
  Downloading requests-2.32.5-py3-none-any.whl.metadata (4.9 kB)
Collecting idna<4,>=2.5 (from requests)
  Downloading idna-3.11-py3-none-any.whl.metadata (8.4 kB)
Collecting urllib3<3,>=1.21.1 (from requests)
  Using cached urllib3-2.5.0-py3-none-any.whl.metadata (6.5 kB)
Collecting certifi>=2017.4.17 (from requests)
  Downloading certifi-2025.10.5-py3-none-any.whl.metadata (2.5 kB)
Downloading requests-2.32.5-py3-none-any.whl (64 kB)
   ---------------------------------------- 0.0/64.7 kB ? eta -:--:--
   ------------------ --------------------- 30.7/64.7 kB 660.6 kB/s eta 0:00:01
   ---------------------------------------- 64.7/64.7 kB 865.0 kB/s eta 0:00:00
Downloading certifi-2025.10.5-py3-none-any.whl (163 kB)
   ---------------------------------------- 0.0/163.3 kB ? eta -:--:--
   -------------------------------- ------- 133.1/163.3 kB 4.0 MB/s eta 0:00:01
   ---------------------------------------- 163.3/163.3 kB 3.3 MB/s eta 0:00:00
Downloading idna-3.11-p


[notice] A new release of pip is available: 24.0 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import requests
import pandas as pd
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine

# Carrega as variáveis do .env
load_dotenv()

# --- Carregue os componentes individuais ---
API_TOKEN = os.getenv('SOLIDES_API_TOKEN')
DB_USER = os.getenv('DB_USER')
DB_PASS = os.getenv('DB_PASS')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
DB_NAME = os.getenv('DB_NAME')

# (Opcional) Seu schema
DB_SCHEMA = os.getenv('DB_SCHEMA') 

# --- Construa a DB_URL aqui no Python ---
if not all([DB_USER, DB_PASS, DB_HOST, DB_PORT, DB_NAME]):
    print("Erro: Faltando uma ou mais variáveis (DB_USER, DB_PASS, etc) no .env")
    exit()

DB_URL = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
# ------------------------------------------

# Configurações globais da API
# (A linha "DB_URL = DB_URL" era redundante e foi removida)
HEADERS = {
    "Authorization": f"Token token={API_TOKEN}",
    "Accept": "application/json"
}

# Cria a "engine" de conexão com o banco
try:
    # Opção para definir o schema padrão da conexão
    engine = create_engine(
        DB_URL,
        connect_args={'options': f'-csearch_path={DB_SCHEMA}'}
    )
    print(f"Conexão com PostgreSQL estabelecida com sucesso no schema '{DB_SCHEMA}'.")

except Exception as e:
    print(f"Erro ao conectar ao PostgreSQL: {e}")
    # Imprime a URL sem a senha para depuração
    print(f"String de conexão tentada: postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
    exit()

Conexão com PostgreSQL estabelecida com sucesso no schema 'FOPAG'.


# Fase 1: Pipelines das dimensões (Dados da API)

## Passo 1.A: Pipelie da ```dim_departamentos```

In [17]:
import requests
import pandas as pd
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine, text, exc as sqlalchemy_exc
import sys
import numpy as np # Import numpy for NaN handling
import json # <-- Import mantido, mas o debug foi removido

# 1. CARREGAR VARIÁVEIS DE AMBIENTE
# -----------------------------------
print("Iniciando ETL...")
load_dotenv()

# Carrega o Token da API
API_TOKEN = os.getenv('SOLIDES_API_TOKEN')

# Carrega os componentes do Banco
DB_USER = os.getenv('DB_USER')
DB_PASS = os.getenv('DB_PASS')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
DB_NAME = os.getenv('DB_NAME')
DB_SCHEMA = os.getenv('DB_SCHEMA')

# Verifica se tudo foi carregado
if not all([API_TOKEN, DB_USER, DB_PASS, DB_HOST, DB_PORT, DB_NAME, DB_SCHEMA]):
    print("ERRO: Faltando uma ou mais variáveis no arquivo .env")
    print(f"API_TOKEN Carregado: {'Sim' if API_TOKEN else 'NÃO'}")
    print(f"DB_SCHEMA CarregADO: {DB_SCHEMA}")
    sys.exit() # Encerra o script se faltar configuração

# 2. CONFIGURAÇÕES GLOBAIS
# -----------------------------------
DB_URL = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
BASE_URL = "https://app.solides.com/pt-BR/api/v1"
HEADERS = {
    "Authorization": f"Token token={API_TOKEN}",
    "Accept": "application/json"
}

# 3. CRIA A CONEXÃO E GARANTE O SCHEMA (COM ASPAS)
# ----------------------------------------------------
try:
    engine = create_engine(DB_URL)
    with engine.begin() as conn:
        # Mantemos o unaccent para o caso de ser útil no futuro
        conn.execute(text('CREATE EXTENSION IF NOT EXISTS unaccent;'))
        conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{DB_SCHEMA}"'))

    # Recria a engine, definindo o search_path
    engine = create_engine(
        DB_URL,
        connect_args={'options': f'-csearch_path="{DB_SCHEMA}"'}
    )

    print(f"Conexão com PostgreSQL estabelecida e schema '\"{DB_SCHEMA}\"' garantido.")

except Exception as e:
    print(f"Erro ao conectar ao PostgreSQL ou criar schema: {e}")
    sys.exit()


# --- FUNÇÃO HELPER PARA LIMPEZA ---
def limpar_salario_api(salario_str):
    """Limpa a string de salário vinda da API (ex: "R$ 8.200,00") para float."""
    if salario_str is None or pd.isna(salario_str):
        return np.nan
    try:
        # Remove 'R$', espaços, e usa '.' como separador de milhar
        salario_limpo = str(salario_str).replace('R$', '').replace(' ', '').replace('.', '')
        # Troca ',' por '.' para ser decimal
        salario_limpo = salario_limpo.replace(',', '.')
        return pd.to_numeric(salario_limpo, errors='coerce')
    except Exception:
        return np.nan


# --- FASE 1: PIPELINES DAS DIMENSÕES (API) ---

def pipeline_dim_colaboradores():
    """
    PUXA dados de Colaboradores da API (paginado) e carrega na dim_colaboradores.
    Esta é a "Dimensão Mestre" de Colaboradores, com dados cadastrais.
    
    *** ATUALIZAÇÃO ***:
    O pipeline agora é feito em 2 PASSOS:
    1. Busca a LISTA de colaboradores (/colaboradores) para obter os IDs.
    2. Itera sobre os IDs e busca os DADOS DETALHADOS de cada um (/colaboradores/{id})
       para obter os dados pessoais (endereço, email, etc.).
    """
    print("\n--- Iniciando Pipeline: dim_colaboradores ---")

    # 1. Extração (E) - PASSO 1: Obter IDs da lista
    all_colaboradores_lista = []
    page = 1
    page_size = 100
    ENDPOINT_LISTA = "/colaboradores" # [cite: openapi.json]

    print("Iniciando extração (Passo 1/2): Buscando lista de IDs de colaboradores...")
    while True:
        params = {'page': page, 'page_size': page_size, 'status': 'todos'} # [cite: openapi.json]
        try:
            response = requests.get(f"{BASE_URL}{ENDPOINT_LISTA}", headers=HEADERS, params=params)
            if response.status_code == 200:
                data = response.json()
                if not data:
                    print(f"Extração da lista concluída. Total de {len(all_colaboradores_lista)} colaboradores encontrados.")
                    break
                all_colaboradores_lista.extend(data) # Contém dicts {'id': 123, 'name': '...'}
                print(f"Página {page} da lista carregada...")
                page += 1
            else:
                print(f"Erro na API (Página {page}): {response.status_code} {response.text}")
                return False
        except Exception as e:
            print(f"Erro na extração de colaboradores (lista): {e}")
            return False

    if not all_colaboradores_lista:
        print("Nenhum colaborador encontrado.")
        return True
    
    print(f"Passo 1/2 concluído. {len(all_colaboradores_lista)} colaboradores encontrados.")

    # PASSO 2: Buscar detalhes de CADA colaborador
    all_colaboradores_detalhado = []
    total_colabs = len(all_colaboradores_lista)
    
    print(f"Iniciando extração (Passo 2/2): Buscando detalhes completos...")

    for i, colab_info in enumerate(all_colaboradores_lista):
        colab_id = colab_info.get('id')
        if not colab_id:
            continue
            
        print(f"  Buscando colaborador {i+1} de {total_colabs} (ID: {colab_id})...")
        ENDPOINT_DETALHE = f"/colaboradores/{colab_id}"
        
        try:
            # Adiciona um pequeno delay para não sobrecarregar a API
            # time.sleep(0.1) # Descomente se encontrar erros de "Too Many Requests"
            response_detalhe = requests.get(f"{BASE_URL}{ENDPOINT_DETALHE}", headers=HEADERS)
            
            if response_detalhe.status_code == 200:
                data_detalhe = response_detalhe.json()
                all_colaboradores_detalhado.append(data_detalhe)
            else:
                print(f"    ERRO ao buscar detalhes do ID {colab_id}: {response_detalhe.status_code}. Usando dados básicos da lista.")
                # Se falhar, usamos a info básica da lista para não perder o funcionário
                all_colaboradores_detalhado.append(colab_info) 
        except Exception as e:
            print(f"    EXCEÇÃO ao buscar detalhes do ID {colab_id}: {e}. Usando dados básicos da lista.")
            all_colaboradores_detalhado.append(colab_info) # Adiciona info básica

    print("Passo 2/2 concluído. Detalhes de todos os colaboradores buscados.")

    # 2. Transformação (T)
    # Agora usamos a lista 'all_colaboradores_detalhado' que contém os dados completos
    df = pd.json_normalize(all_colaboradores_detalhado)

    # --- Tentativa robusta de buscar o NOME DO DEPARTAMENTO ---
    # O JSON confirmou que a chave é 'departament.name' (com 'a')
    df['dept_name_temp'] = None
    if 'departament.name' in df.columns: # Tenta com 'a'
        print("Info: Departamento encontrado na chave 'departament.name'.")
        df['dept_name_temp'] = df['departament.name']
    elif 'department.name' in df.columns: 
        print("Info: Departamento encontrado na chave 'department.name'.")
        df['dept_name_temp'] = df['department.name']
    else:
        print("Aviso: Nenhuma chave de Departamento ('departament.name', 'department.name') foi encontrada.")

    # --- Tentativa robusta de buscar o NOME DO CARGO ---
    # O JSON confirmou que a chave é 'position.name'
    df['cargo_name_temp'] = None
    if 'position.name' in df.columns:
        print("Info: Cargo encontrado na chave 'position.name'.")
        df['cargo_name_temp'] = df['position.name']
    elif 'cargo.name' in df.columns: # Tenta 'cargo'
        print("Info: Cargo encontrado na chave 'cargo.name'.")
        df['cargo_name_temp'] = df['cargo.name']
    else:
        print("Aviso: Nenhuma chave de Cargo ('position.name', 'cargo.name') foi encontrada.")

    # --- CORREÇÃO: Tentativa robusta de buscar o NÍVEL EDUCACIONAL ---
    # O JSON confirmou que a chave é 'education'
    df['education_level_temp'] = None
    if 'education' in df.columns: # <-- A CHAVE CORRETA DO JSON
        print("Info: Nível Educacional encontrado na chave 'education'.")
        df['education_level_temp'] = df['education']
    elif 'educationLevel' in df.columns: 
        print("Info: Nível Educacional encontrado na chave 'educationLevel'.")
        df['education_level_temp'] = df['educationLevel']
    elif 'scholarship' in df.columns: # Tentativa comum em PT-BR
        print("Info: Nível Educacional encontrado na chave 'scholarship'.")
        df['education_level_temp'] = df['scholarship']
    elif 'schooling' in df.columns:
        print("Info: Nível Educacional encontrado na chave 'schooling'.")
        df['education_level_temp'] = df['schooling']
    elif 'escolaridade' in df.columns: # Tentativa direta em PT
        print("Info: Nível Educacional encontrado na chave 'escolaridade'.")
        df['education_level_temp'] = df['escolaridade']
    else:
        print("Aviso: Nenhuma chave de Nível Educacional ('education', 'educationLevel', 'scholarship', 'schooling', 'escolaridade') foi encontrada.")
    # --- FIM DA CORREÇÃO ---

    # --- CORREÇÃO: Tentativa robusta de buscar o CPF ---
    # O JSON confirmou que a chave é 'documents.idNumber'
    df['cpf_temp'] = None 
    
    if 'documents.idNumber' in df.columns: # <-- A CHAVE CORRETA
        print("Info: CPF encontrado na chave 'documents.idNumber'.")
        df['cpf_temp'] = df['documents.idNumber']
    elif 'documents.cpf' in df.columns:
        print("Info: CPF encontrado na chave 'documents.cpf'.")
        df['cpf_temp'] = df['documents.cpf']
    elif 'idNumber' in df.columns:
        print("Info: CPF encontrado na chave 'idNumber' (raiz).")
        df['cpf_temp'] = df['idNumber']
    elif 'cpf' in df.columns:
        print("Info: CPF encontrado na chave 'cpf' (raiz).")
        df['cpf_temp'] = df['cpf']
    elif 'document' in df.columns:
        print("Info: CPF encontrado na chave 'document' (raiz).")
        df['cpf_temp'] = df['document']
    else:
        print("Aviso: Nenhuma chave de CPF ('documents.idNumber', 'idNumber', 'cpf', 'document') foi encontrada.")

    # Limpa a coluna temporária (remove pontos, traços, etc.)
    if 'cpf_temp' in df.columns:
         df['cpf_temp'] = df['cpf_temp'].astype(str).str.replace(r'\D', '', regex=True)
         df['cpf_temp'] = df['cpf_temp'].replace(r'^\s*$', np.nan, regex=True).replace('None', np.nan).replace('nan', np.nan)
    else:
         df['cpf_temp'] = None
    # --- FIM DA CORREÇÃO ---

    # --- ADIÇÃO: Limpeza do Salário da API ---
    if 'salary' in df.columns:
        df['salario_api_temp'] = df['salary'].apply(limpar_salario_api)
    else:
        df['salario_api_temp'] = np.nan
    # --- FIM DA ADIÇÃO ---


    # --- ATUALIZAÇÃO: Renomeia todas as colunas (antigas e novas) ---
    df = df.rename(columns={
        'id': 'colaborador_id_solides',
        'name': 'nome_completo',
        'cpf_temp': 'cpf', 
        'birthDate': 'data_nascimento',
        'gender': 'genero',
        'dateAdmission': 'data_admissao',
        'dateDismissal': 'data_demissao',
        'active': 'ativo',
        'dept_name_temp': 'departamento_nome_api', 
        'cargo_name_temp': 'cargo_nome_api',      
        'email': 'email',
        'contact.phone': 'telefone_pessoal', # <-- Chave correta do JSON
        'contact.cellPhone': 'celular', # <-- Chave correta do JSON
        'nationality': 'nacionalidade',
        'education_level_temp': 'nivel_educacional', # <-- Chave correta
        'motherName': 'nome_mae',
        'fatherName': 'nome_pai',
        'address.streetName': 'logradouro', # <-- Chave correta do JSON
        'address.number': 'numero_endereco',
        'address.additionalInformation': 'complemento_endereco', # <-- Chave correta do JSON
        'address.neighborhood': 'bairro',
        'address.city.name': 'cidade', # <-- Chave correta do JSON
        'address.state.initials': 'estado', # <-- Chave correta do JSON
        'address.zipCode': 'cep',
        
        # --- NOVOS CAMPOS DE PEOPLE ANALYTICS ---
        'registration': 'matricula',
        'maritalStatus': 'estado_civil',
        'salario_api_temp': 'salario_api',
        'workShift': 'turno_trabalho',
        'typeContract': 'tipo_contrato',
        'course': 'curso_formacao',
        'hierarchicalLevel': 'nivel_hierarquico',
        'senior.name': 'nome_lider_imediato',
        'ethnicity': 'etnia',
        'unity.name': 'unidade_nome'
    })

    date_cols = ['data_nascimento', 'data_admissao', 'data_demissao']
    for col in date_cols:
        if col in df.columns:
            # Tenta formatar 'dd/mm/YYYY'. Se falhar, tenta 'YYYY-mm-dd' (visto em 'dispatchDate' do JSON)
            try:
                df[col] = pd.to_datetime(df[col], format='%d/%m/%Y', errors='raise')
            except ValueError:
                 df[col] = pd.to_datetime(df[col], format='%Y-%m-%d', errors='coerce')
        else:
             df[col] = pd.NaT

    # --- ATUALIZAÇÃO: Lista de colunas final (antigas e novas) ---
    colunas_staging = [
        'colaborador_id_solides', 'cpf', 'nome_completo', 'data_nascimento', 'genero',
        'data_admissao', 'data_demissao', 'ativo',
        'departamento_nome_api', 'cargo_nome_api',
        'email', 'telefone_pessoal', 'celular', 'nacionalidade', 'nivel_educacional',
        'nome_mae', 'nome_pai',
        'logradouro', 'numero_endereco', 'complemento_endereco', 'bairro', 'cidade', 'estado', 'cep',
        
        # --- NOVOS CAMPOS DE PEOPLE ANALYTICS ---
        'matricula', 'estado_civil', 'salario_api', 'turno_trabalho', 'tipo_contrato',
        'curso_formacao', 'nivel_hierarquico', 'nome_lider_imediato', 'etnia', 'unidade_nome'
    ]
    
    for col in colunas_staging:
        if col not in df.columns:
            df[col] = None 

    df_staging = df[colunas_staging].copy()
    print("Transformação de colaboradores concluída.")

    # 3. Carga (L)
    NOME_TABELA_FINAL = "dim_colaboradores"
    NOME_TABELA_STAGING = "staging_colaboradores"

    try:
        df_staging.to_sql(NOME_TABELA_STAGING, engine, if_exists='replace', index=False, schema=DB_SCHEMA)
        print("Carga na staging de colaboradores concluída.")

        # --- ATUALIZAÇÃO: SQL com todas as colunas novas ---
        sql = f"""
        CREATE TABLE IF NOT EXISTS "{DB_SCHEMA}".{NOME_TABELA_FINAL} (
            colaborador_sk SERIAL PRIMARY KEY,
            colaborador_id_solides INTEGER UNIQUE NOT NULL,
            cpf VARCHAR(11), 
            nome_completo VARCHAR(255),
            data_nascimento DATE,
            genero VARCHAR(50),
            data_admissao DATE,
            data_demissao DATE,
            ativo BOOLEAN,
            departamento_nome_api VARCHAR(255),
            cargo_nome_api VARCHAR(255),
            email VARCHAR(255),
            telefone_pessoal VARCHAR(50),
            celular VARCHAR(50),
            nacionalidade VARCHAR(100),
            nivel_educacional VARCHAR(100),
            nome_mae VARCHAR(255),
            nome_pai VARCHAR(255),
            logradouro VARCHAR(255),
            numero_endereco VARCHAR(50),
            complemento_endereco VARCHAR(100),
            bairro VARCHAR(100),
            cidade VARCHAR(100),
            estado VARCHAR(50),
            cep VARCHAR(20),
            
            -- NOVAS COLUNAS
            matricula VARCHAR(50),
            estado_civil VARCHAR(50),
            salario_api NUMERIC(12, 2),
            turno_trabalho VARCHAR(100),
            tipo_contrato VARCHAR(100),
            curso_formacao VARCHAR(255),
            nivel_hierarquico VARCHAR(100),
            nome_lider_imediato VARCHAR(255),
            etnia VARCHAR(50),
            unidade_nome VARCHAR(255),
            
            data_ultima_atualizacao TIMESTAMP DEFAULT current_timestamp
        );

        -- *** CORREÇÃO DO ERRO 'coluna "matricula" ... não existe' ***
        -- Este bloco ALTER TABLE garante que o schema da tabela existente
        -- seja atualizado ANTES de tentar inserir os dados.
        ALTER TABLE "{DB_SCHEMA}".{NOME_TABELA_FINAL}
            ADD COLUMN IF NOT EXISTS matricula VARCHAR(50),
            ADD COLUMN IF NOT EXISTS estado_civil VARCHAR(50),
            ADD COLUMN IF NOT EXISTS salario_api NUMERIC(12, 2),
            ADD COLUMN IF NOT EXISTS turno_trabalho VARCHAR(100),
            ADD COLUMN IF NOT EXISTS tipo_contrato VARCHAR(100),
            ADD COLUMN IF NOT EXISTS curso_formacao VARCHAR(255),
            ADD COLUMN IF NOT EXISTS nivel_hierarquico VARCHAR(100),
            ADD COLUMN IF NOT EXISTS nome_lider_imediato VARCHAR(255),
            ADD COLUMN IF NOT EXISTS etnia VARCHAR(50),
            ADD COLUMN IF NOT EXISTS unidade_nome VARCHAR(255);
        -- *** FIM DA CORREÇÃO ***

        INSERT INTO "{DB_SCHEMA}".{NOME_TABELA_FINAL} (
            colaborador_sk, colaborador_id_solides, cpf, nome_completo, departamento_nome_api, cargo_nome_api
        )
        VALUES (0, -1, 'N/A', 'Desconhecido', 'Desconhecido', 'Desconhecido')
        ON CONFLICT (colaborador_sk) DO NOTHING;

        INSERT INTO "{DB_SCHEMA}".{NOME_TABELA_FINAL} (
            colaborador_id_solides, cpf, nome_completo, data_nascimento, genero,
            data_admissao, data_demissao, ativo,
            departamento_nome_api, cargo_nome_api,
            email, telefone_pessoal, celular, nacionalidade, nivel_educacional,
            nome_mae, nome_pai,
            logradouro, numero_endereco, complemento_endereco, bairro, cidade, estado, cep,
            
            -- NOVAS COLUNAS
            matricula, estado_civil, salario_api, turno_trabalho, tipo_contrato,
            curso_formacao, nivel_hierarquico, nome_lider_imediato, etnia, unidade_nome,
            
            data_ultima_atualizacao
        )
        SELECT
            stg.colaborador_id_solides, 
            stg.cpf,
            stg.nome_completo, stg.data_nascimento, stg.genero,
            stg.data_admissao, stg.data_demissao, stg.ativo,
            stg.departamento_nome_api,
            stg.cargo_nome_api,
            stg.email, stg.telefone_pessoal, stg.celular, stg.nacionalidade, stg.nivel_educacional,
            stg.nome_mae, stg.nome_pai,
            stg.logradouro, stg.numero_endereco, stg.complemento_endereco, stg.bairro, stg.cidade, stg.estado, stg.cep,

            -- NOVAS COLUNAS
            stg.matricula, stg.estado_civil, stg.salario_api, stg.turno_trabalho, stg.tipo_contrato,
            stg.curso_formacao, stg.nivel_hierarquico, stg.nome_lider_imediato, stg.etnia, stg.unidade_nome,

            current_timestamp
        FROM
            "{DB_SCHEMA}".{NOME_TABELA_STAGING} AS stg

        ON CONFLICT (colaborador_id_solides) DO UPDATE SET
            cpf = EXCLUDED.cpf, 
            nome_completo = EXCLUDED.nome_completo,
            data_nascimento = EXCLUDED.data_nascimento,
            genero = EXCLUDED.genero,
            data_admissao = EXCLUDED.data_admissao,
            data_demissao = EXCLUDED.data_demissao,
            ativo = EXCLUDED.ativo,
            departamento_nome_api = EXCLUDED.departamento_nome_api,
            cargo_nome_api = EXCLUDED.cargo_nome_api,
            email = EXCLUDED.email,
            telefone_pessoal = EXCLUDED.telefone_pessoal,
            celular = EXCLUDED.celular,
            nacionalidade = EXCLUDED.nacionalidade,
            nivel_educacional = EXCLUDED.nivel_educacional,
            nome_mae = EXCLUDED.nome_mae,
            nome_pai = EXCLUDED.nome_pai,
            logradouro = EXCLUDED.logradouro,
            numero_endereco = EXCLUDED.numero_endereco,
            complemento_endereco = EXCLUDED.complemento_endereco,
            bairro = EXCLUDED.bairro,
            cidade = EXCLUDED.cidade,
            estado = EXCLUDED.estado,
            cep = EXCLUDED.cep,
            
            -- NOVAS COLUNAS
            matricula = EXCLUDED.matricula,
            estado_civil = EXCLUDED.estado_civil,
            salario_api = EXCLUDED.salario_api,
            turno_trabalho = EXCLUDED.turno_trabalho,
            tipo_contrato = EXCLUDED.tipo_contrato,
            curso_formacao = EXCLUDED.curso_formacao,
            nivel_hierarquico = EXCLUDED.nivel_hierarquico,
            nome_lider_imediato = EXCLUDED.nome_lider_imediato,
            etnia = EXCLUDED.etnia,
            unidade_nome = EXCLUDED.unidade_nome,
            
            data_ultima_atualizacao = current_timestamp;
        """

        with engine.begin() as conn:
            conn.execute(text(sql))

        print(f"Carga na {NOME_TABELA_FINAL} concluída com sucesso!")
        return True

    except Exception as e:
        print(f"Erro na carga de {NOME_TABELA_FINAL}: {e}")
        print(f"Detalhe do erro: {e}")
        return False


# --- NOVA DIMENSÃO: CALENDÁRIO ---
def pipeline_dim_calendario():
    """Gera ou atualiza a dimensão de calendário (dim_calendario)."""
    print("\n--- Iniciando Pipeline: dim_calendario ---")
    
    NOME_TABELA_FINAL = "dim_calendario"
    
    # SQL para criar a tabela e popular com datas
    # Gera datas de 2023 (início dos dados FOPAG) até 2030
    sql = f"""
    CREATE TABLE IF NOT EXISTS "{DB_SCHEMA}".{NOME_TABELA_FINAL} (
        data DATE PRIMARY KEY,
        ano INTEGER,
        mes INTEGER,
        dia INTEGER,
        trimestre INTEGER,
        semestre INTEGER,
        dia_da_semana INTEGER, -- 0=Dom, 1=Seg, ... 6=Sab
        nome_dia_da_semana VARCHAR(20),
        nome_mes VARCHAR(20),
        nome_mes_abrev CHAR(3),
        ano_mes VARCHAR(7), -- YYYY-MM
        dia_do_ano INTEGER,
        semana_do_ano INTEGER
    );

    -- Define o início e o fim do calendário
    DO $$
    DECLARE
        data_inicio DATE := '2023-01-01'; -- Baseado nos dados da FOPAG
        data_fim DATE := '2030-12-31';
    BEGIN
        -- Tenta definir o locale para Português para esta transação
        -- Se 'pt_BR.UTF-8' não estiver disponível no servidor, 'pt_BR' é uma alternativa.
        BEGIN
            SET LOCAL lc_time = 'pt_BR.UTF-8';
        EXCEPTION WHEN OTHERS THEN
            BEGIN
                SET LOCAL lc_time = 'pt_BR';
            EXCEPTION WHEN OTHERS THEN
                -- Se ambos falharem, continua com o padrão do DB (provavelmente inglês)
                RAISE NOTICE 'Não foi possível definir o locale pt_BR. Nomes de mês/dia podem ficar em inglês.';
            END;
        END;

        INSERT INTO "{DB_SCHEMA}".{NOME_TABELA_FINAL} (
            data,
            ano,
            mes,
            dia,
            trimestre,
            semestre,
            dia_da_semana,
            nome_dia_da_semana,
            nome_mes,
            nome_mes_abrev,
            ano_mes,
            dia_do_ano,
            semana_do_ano
        )
        SELECT
            d AS data,
            EXTRACT(YEAR FROM d) AS ano,
            EXTRACT(MONTH FROM d) AS mes,
            EXTRACT(DAY FROM d) AS dia,
            EXTRACT(QUARTER FROM d) AS trimestre,
            CASE WHEN EXTRACT(MONTH FROM d) <= 6 THEN 1 ELSE 2 END AS semestre,
            EXTRACT(DOW FROM d) AS dia_da_semana, -- 0=Domingo, 6=Sábado
            to_char(d, 'TMDay') AS nome_dia_da_semana,
            to_char(d, 'TMMonth') AS nome_mes,
            to_char(d, 'TMMon') AS nome_mes_abrev,
            to_char(d, 'YYYY-MM') AS ano_mes,
            EXTRACT(DOY FROM d) AS dia_do_ano,
            EXTRACT(WEEK FROM d) AS semana_do_ano
        FROM generate_series(data_inicio, data_fim, '1 day'::interval) d
        ON CONFLICT (data) DO NOTHING; -- Torna a carga idempotente (não insere duplicados)
    END $$;
    """
    
    try:
        with engine.begin() as conn:
            conn.execute(text(sql))
        print(f"Carga na {NOME_TABELA_FINAL} concluída com sucesso!")
        return True
    except Exception as e:
        print(f"Erro na carga de {NOME_TABELA_FINAL}: {e}")
        print(f"Detalhe do erro: {e}")
        return False


# --- FASE 2: PIPELINES DAS FATOS (CSV) ---

def converter_para_numero(coluna):
    """Limpa coluna de texto (ex: '10,000.50' ou '9677.42') para um número."""
    if coluna is None or coluna.empty:
        return pd.Series(index=coluna.index, dtype=float) if isinstance(coluna, pd.Series) else None
    
    if pd.api.types.is_numeric_dtype(coluna) and not pd.api.types.is_object_dtype(coluna):
        return coluna
    
    coluna_str = coluna.astype(str)

    # Assume formato Padrão/EUA (ex: 10,000.50 ou 9677.42)
    # 1. Remove vírgulas (que seriam separador de milhar)
    try:
        coluna_str = coluna_str.str.replace(r',', '', regex=True)
    except AttributeError:
        pass
    
    resultado = pd.to_numeric(coluna_str, errors='coerce')
    return resultado


def pipeline_fato_folha_consolidada():
    print("\n--- Iniciando Pipeline: fato_folha_consolidada ---")

    CSV_FILE = 'BASE_FOPAG_CONSOLIDADA_TOTAIS.csv'
    NOME_TABELA_STAGING = 'staging_folha_consolidada'
    NOME_TABELA_FINAL = 'fato_folha_consolidada'

    try:
        try:
            dtype_map = {}
            
            try:
                header_df = pd.read_csv(CSV_FILE, sep=';', nrows=0)
                if 'cargo' in header_df.columns:
                    dtype_map['cargo'] = str
                if 'departamento' in header_df.columns: # 'departamento' aqui é o Centro de Custo
                   dtype_map['departamento'] = str
                if 'nome_funcionario' in header_df.columns:
                   dtype_map['nome_funcionario'] = str
            except Exception:
                pass 

            df_csv = pd.read_csv(CSV_FILE, sep=';', dtype=dtype_map)
        except Exception as read_err:
            print(f"Erro ao ler CSV {CSV_FILE}: {read_err}. Tentando leitura simples.")
            df_csv = pd.read_csv(CSV_FILE, sep=';')


        # 2. Transformação (T)
        if 'cpf' in df_csv.columns:
            df_csv['cpf'] = df_csv['cpf'].astype(str).str.replace(r'\D', '', regex=True)
        else:
            print(f"Aviso: Coluna 'cpf' não encontrada no CSV {CSV_FILE}.")
            df_csv['cpf'] = None

        colunas_metricas = [
            'salario_contratual', 'total_proventos', 'total_descontos',
            'valor_liquido', 'base_inss', 'base_fgts', 'valor_fgts', 'base_irrf'
        ]
        print("Convertendo colunas de métricas...")
        for col in colunas_metricas:
            if col in df_csv.columns:
                df_csv[col] = converter_para_numero(df_csv[col])
            else:
                df_csv[col] = np.nan

        if 'departamento' not in df_csv.columns:
             print(f"Aviso: Coluna 'departamento' (Centro de Custo) não encontrada no CSV {CSV_FILE}.")
             df_csv['departamento'] = None
        if 'cargo' not in df_csv.columns:
             print(f"Aviso: Coluna 'cargo' (do CSV) não encontrada.")
             df_csv['cargo'] = None
        if 'nome_funcionario' not in df_csv.columns:
             print(f"Aviso: Coluna 'nome_funcionario' (do CSV) não encontrada.")
             df_csv['nome_funcionario'] = None


        print(f"CSV '{CSV_FILE}' lido e transformado.")


        # 3. Carga (L)
        df_csv.to_sql(NOME_TABELA_STAGING, engine, if_exists='replace', index=False, schema=DB_SCHEMA)
        print(f"CSV carregado para {NOME_TABELA_STAGING}.")

        # *** Arquitetura Mantida: Sem JOIN no ETL ***
        sql = f"""
        CREATE TABLE IF NOT EXISTS "{DB_SCHEMA}".{NOME_TABELA_FINAL} (
            fato_folha_id SERIAL PRIMARY KEY,
            competencia DATE,
            nome_funcionario_csv VARCHAR(255), -- Nome do CSV
            centro_de_custo VARCHAR(255), -- Coluna para o 'departamento' do CSV
            cargo_nome_csv VARCHAR(255),  -- Coluna para o 'cargo' do CSV
            
            -- *** Adicionada a coluna CPF do CSV ***
            cpf_csv VARCHAR(11),

            salario_contratual NUMERIC(12, 2),
            total_proventos NUMERIC(12, 2),
            total_descontos NUMERIC(12, 2),
            valor_liquido NUMERIC(12, 2),
            base_inss NUMERIC(12, 2),
            base_fgts NUMERIC(12, 2),
            valor_fgts NUMERIC(12, 2),
            base_irrf NUMERIC(12, 2)
        );

        -- Carga Incremental: Deleta apenas as competências do arquivo
        DELETE FROM "{DB_SCHEMA}".{NOME_TABELA_FINAL}
        WHERE competencia IN (
            SELECT DISTINCT
                CASE
                    WHEN stg.competencia ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}$'
                    THEN to_date(stg.competencia, 'YYYY-MM-DD')
                    ELSE NULL
                END
            FROM "{DB_SCHEMA}".{NOME_TABELA_STAGING} AS stg
            WHERE stg.competencia ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}$'
        );

        INSERT INTO "{DB_SCHEMA}".{NOME_TABELA_FINAL} (
            competencia,
            nome_funcionario_csv, centro_de_custo, cargo_nome_csv, cpf_csv,
            salario_contratual, total_proventos, total_descontos, valor_liquido,
            base_inss, base_fgts, valor_fgts, base_irrf
        )
        SELECT
            CASE
                WHEN stg.competencia ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}$'
                THEN to_date(stg.competencia, 'YYYY-MM-DD')
                ELSE NULL
            END AS competencia,

            stg.nome_funcionario AS nome_funcionario_csv, -- Salva o nome do CSV
            stg.departamento AS centro_de_custo, -- Salva o 'departamento' do CSV
            stg.cargo AS cargo_nome_csv,          -- Salva o 'cargo' do CSV
            stg.cpf AS cpf_csv,                   -- Salva o CPF do CSV

            stg.salario_contratual, stg.total_proventos, stg.total_descontos, stg.valor_liquido,
            stg.base_inss, stg.base_fgts, stg.valor_fgts, stg.base_irrf
        FROM
            "{DB_SCHEMA}".{NOME_TABELA_STAGING} AS stg
        ;
        """

        with engine.begin() as conn:
            conn.execute(text(sql))
        print(f"Carga na {NOME_TABELA_FINAL} concluída com sucesso!")
        return True

    except sqlalchemy_exc.SQLAlchemyError as e: # Captura erros do SQLAlchemy
        print(f"Falha no pipeline {NOME_TABELA_FINAL} (SQLAlchemyError): {e}")
        if hasattr(e, 'orig') and e.orig:
             print(f"  Erro original (psycopg2): {e.orig}")
        return False
    except pd.errors.ParserError as e: # Captura erros de leitura do CSV
       print(f"Falha ao ler o CSV {CSV_FILE}: {e}")
       return False
    except Exception as e: # Captura outros erros
        print(f"Falha no pipeline {NOME_TABELA_FINAL} (Erro genérico): {e}")
        return False


def pipeline_fato_folha_detalhada():
    print("\n--- Iniciando Pipeline: fato_folha_detalhada ---")

    CSV_FILE = 'BASE_FOPAG_DETALHADA_RUBRICAS.csv'
    NOME_TABELA_STAGING = 'staging_folha_detalhada'
    NOME_TABELA_FINAL = 'fato_folha_detalhada'

    try:
        try:
            dtype_map = {}
            try:
                header_df = pd.read_csv(CSV_FILE, sep=';', nrows=0)
                if 'departamento' in header_df.columns: 
                    dtype_map['departamento'] = str
                if 'nome_funcionario' in header_df.columns:
                   dtype_map['nome_funcionario'] = str
            except Exception:
                pass
            df_csv = pd.read_csv(CSV_FILE, sep=';', dtype=dtype_map)
        except Exception as read_err:
            print(f"Erro ao ler CSV {CSV_FILE}: {read_err}. Tentando leitura simples.")
            df_csv = pd.read_csv(CSV_FILE, sep=';') # Fallback


        # 2. Transformação (T)
        if 'cpf' in df_csv.columns:
            df_csv['cpf'] = df_csv['cpf'].astype(str).str.replace(r'\D', '', regex=True)
        else:
             print(f"Aviso: Coluna 'cpf' não encontrada no CSV {CSV_FILE}.")
             df_csv['cpf'] = None

        if 'valor_rubrica' in df_csv.columns:
            df_csv['valor_rubrica'] = converter_para_numero(df_csv['valor_rubrica'])
        else:
            df_csv['valor_rubrica'] = np.nan 

        if 'departamento' not in df_csv.columns:
             print(f"Aviso: Coluna 'departamento' (Centro de Custo) não encontrada no CSV {CSV_FILE}. Será preenchida com None.")
             df_csv['departamento'] = None
        if 'nome_funcionario' not in df_csv.columns:
             print(f"Aviso: Coluna 'nome_funcionario' (do CSV) não encontrada.")
             df_csv['nome_funcionario'] = None


        print(f"CSV '{CSV_FILE}' lido e transformado.")

        # 3. Carga (L)
        df_csv.to_sql(NOME_TABELA_STAGING, engine, if_exists='replace', index=False, schema=DB_SCHEMA)
        print(f"CSV carregado para {NOME_TABELA_STAGING}.")

        sql = f"""
        CREATE TABLE IF NOT EXISTS "{DB_SCHEMA}".{NOME_TABELA_FINAL} (
            fato_rubrica_id SERIAL PRIMARY KEY,
            competencia DATE,
            nome_funcionario_csv VARCHAR(255), -- Nome do CSV
            centro_de_custo VARCHAR(255), -- Coluna para o 'departamento' do CSV
            
            -- *** Adicionada a coluna CPF do CSV ***
            cpf_csv VARCHAR(11),

            codigo_rubrica VARCHAR(100),
            nome_rubrica VARCHAR(255),
            tipo_rubrica VARCHAR(100),
            valor_rubrica NUMERIC(12, 2)
        );

        -- Carga Incremental: Deleta apenas as competências do arquivo
        DELETE FROM "{DB_SCHEMA}".{NOME_TABELA_FINAL}
        WHERE competencia IN (
            SELECT DISTINCT
                CASE
                    WHEN stg.competencia ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}$'
                    THEN to_date(stg.competencia, 'YYYY-MM-DD')
                    ELSE NULL
                END
            FROM "{DB_SCHEMA}".{NOME_TABELA_STAGING} AS stg
            WHERE stg.competencia ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}$'
        );

        INSERT INTO "{DB_SCHEMA}".{NOME_TABELA_FINAL} (
            competencia,
            nome_funcionario_csv, centro_de_custo, cpf_csv,
            codigo_rubrica, nome_rubrica, tipo_rubrica, valor_rubrica
        )
        SELECT
             CASE
                 WHEN stg.competencia ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}$'
                 THEN to_date(stg.competencia, 'YYYY-MM-DD')
                 ELSE NULL
             END AS competencia,
            
            stg.nome_funcionario AS nome_funcionario_csv, -- Salva o nome do CSV
            stg.departamento AS centro_de_custo, -- Salva o 'departamento' do CSV
            stg.cpf AS cpf_csv,                   -- Salva o CPF do CSV

            stg.codigo_rubrica, stg.nome_rubrica, stg.tipo_rubrica, stg.valor_rubrica
        FROM
            "{DB_SCHEMA}".{NOME_TABELA_STAGING} AS stg
        ;
        """

        with engine.begin() as conn:
            conn.execute(text(sql))
        print(f"Carga na {NOME_TABELA_FINAL} concluída com sucesso!")
        return True

    except sqlalchemy_exc.SQLAlchemyError as e: # Captura erros do SQLAlchemy
        print(f"Falha no pipeline {NOME_TABELA_FINAL} (SQLAlchemyError): {e}")
        if hasattr(e, 'orig') and e.orig:
             print(f"  Erro original (psycopg2): {e.orig}")
        return False
    except pd.errors.ParserError as e: # Captura erros de leitura do CSV
       print(f"Falha ao ler o CSV {CSV_FILE}: {e}")
       return False
    except Exception as e: # Captura outros erros
        print(f"Falha no pipeline {NOME_TABELA_FINAL} (Erro genérico): {e}")
        return False


# --- PONTO DE EXECUÇÃO PRINCIPAL ---
if __name__ == "__main__":

    # Ordem de execution é crucial

    # 1. Dimensões independentes
    sucesso_colab = pipeline_dim_colaboradores()
    sucesso_calendario = pipeline_dim_calendario() # <-- NOVA DIMENSÃO

    # 2. Fatos (dependem apenas de Colaboradores)
    # As fatos também dependem do calendário, mas a FK é feita no BI.
    if sucesso_colab and sucesso_calendario:
        sucesso_fato_cons = pipeline_fato_folha_consolidada()
        sucesso_fato_det = pipeline_fato_folha_detalhada()
        if not sucesso_fato_cons or not sucesso_fato_det:
             print("\n!!! Atenção: Pelo menos um pipeline de FATO falhou. Verifique os logs acima. !!!")
        else:
             print("\n--- Pipeline ETL Concluído com Sucesso! ---")

    else:
        if not sucesso_colab:
             print("\nFalha ao carregar dim_colaboradores. Abortando pipelines de Fatos.")
        if not sucesso_calendario:
             print("\nFalha ao carregar dim_calendario. Abortando pipelines de Fatos.")
        sys.exit() # Encerra se as dimensões falharem


    # Fecha a conexão com o banco
    engine.dispose()



Iniciando ETL...
Conexão com PostgreSQL estabelecida e schema '"FOPAG"' garantido.

--- Iniciando Pipeline: dim_colaboradores ---
Iniciando extração (Passo 1/2): Buscando lista de IDs de colaboradores...
Página 1 da lista carregada...
Página 2 da lista carregada...
Extração da lista concluída. Total de 138 colaboradores encontrados.
Passo 1/2 concluído. 138 colaboradores encontrados.
Iniciando extração (Passo 2/2): Buscando detalhes completos...
  Buscando colaborador 1 de 138 (ID: 3382492)...
  Buscando colaborador 2 de 138 (ID: 805340)...
  Buscando colaborador 3 de 138 (ID: 588289)...
  Buscando colaborador 4 de 138 (ID: 1440231)...
  Buscando colaborador 5 de 138 (ID: 588290)...
  Buscando colaborador 6 de 138 (ID: 2301298)...
  Buscando colaborador 7 de 138 (ID: 2048031)...
  Buscando colaborador 8 de 138 (ID: 2050250)...
  Buscando colaborador 9 de 138 (ID: 2284524)...
  Buscando colaborador 10 de 138 (ID: 805341)...
  Buscando colaborador 11 de 138 (ID: 2274831)...
  Buscando co