## Extração

Durante a análise exploratória inicial dos arquivos CSV, identificamos uma anomalia na formatação: os arquivos originais possuem aspas duplas (`"`) envolvendo a linha inteira ou campos de forma inconsistente.

Ao utilizar o parser padrão do Pandas, isso causava erros de tokenização e deslocamento de colunas.

Para contornar isso, optamos por uma leitura "bruta" dos dados:
1.  Desativamos a interpretação automática de aspas (`quoting=csv.QUOTE_NONE`).
2.  Carregamos o conteúdo integralmente.
3.  Removemos as aspas residuais (`"`) manualmente das bordas (primeira e última coluna) e do cabeçalho via manipulação de string.

In [1]:
import pandas as pd
import numpy as np
import csv
import glob
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

In [2]:
caminho_pasta = '../data'
arquivos_csv = glob.glob(os.path.join(caminho_pasta, '*.csv'))

lista_dataframes = []

print(f"Encontrados {len(arquivos_csv)} arquivos na pasta '{caminho_pasta}'.")

for arquivo in arquivos_csv:
    try:
        print(f"Processando: {os.path.basename(arquivo)}...")
        df = pd.read_csv(
            arquivo,
            sep=',',
            quoting=csv.QUOTE_NONE,
            engine='python',
            on_bad_lines='skip',
            encoding='utf-8',
            dtype=str
        )

        col_primeira = df.columns[0]
        col_ultima = df.columns[-1]

        df[col_primeira] = df[col_primeira].astype(str).str.replace('"', '')
        df[col_ultima] = df[col_ultima].astype(str).str.replace('"', '')
        df.columns = df.columns.str.replace('"', '')

        lista_dataframes.append(df)
    except Exception as e:
        print(f"Erro ao ler o arquivo {arquivo}: {e}")

if lista_dataframes:
    df_raw = pd.concat(lista_dataframes, ignore_index=True)
    print(f"\nTotal de linhas e colunas no df unificado: {df_raw.shape}")
else:
    print("\nNenhum dataframe foi carregado.")

Encontrados 3 arquivos na pasta '../data'.
Processando: func2019_completo.csv...
Processando: func2020_completo.csv...
Processando: func2021_completo.csv...

Total de linhas e colunas no df unificado: (1398332, 34)


## Transformação

Notamos que as colunas possuem nomes codificados (ex: nsalsenome). Para facilitar a análise exploratória, aplicamos um mapeamento inicial para termos de negócio.

In [3]:
map_colunas = {
    'csalsematr': 'matricula',
    'csalseccpf': 'cpf',
    'nsalsenome': 'nome',
    'esalsegenero': 'genero',
    'esalseinstrucao': 'grau_instrucao',
    'dslderadmissao': 'data_admissao',
    'dslserdesligamento': 'data_desligamento',
    'dsalseaposentadoria': 'data_aposentadoria',
    'eslserlotacao': 'lotacao_nome',
    'esalseunidade': 'unidade_nome',
    'nsalseempr': 'entidade_nome',
    'esalseadministracao': 'tipo_administracao',
    'nsalsecarg': 'cargo_nome',
    'nsalsefunc': 'funcao_nome',
    'nsalsecate': 'categoria_nome',
    'eselsesituacao': 'situacao_nome',
    'aslserjornadamensal': 'jornada_mensal',
    'tslserulat': 'data_atualizacao_sistema',
    'asalseanoo': 'ano_folha',
    'asalsemess': 'mes_folha',
    'vsalseprov': 'valor_remuneracao_bruta',
    'vsalseremu': 'valor_remuneracao_base',
    'vsalsecarg': 'valor_salario_base',
    'vsalsefunc': 'valor_funcao_gratificada',
    'vsalseoutr': 'valor_outras_remuneracoes',
    'vsalseferi': 'valor_ferias',
    'vsalsenatl': 'valor_13_salario',
    'vsalsedife': 'valor_diferenca_salarial',
    'vsalsedrrf': 'valor_irrf',
    'vsalsedprv': 'valor_previdencia',
    'vsalsedtot': 'valor_descontos_total',
    'vsalsedrst': 'valor_desconto_faltas',
    'vsalsedxcd': 'valor_descontos_diversos',
    'vsalseliqd': 'valor_liquido'
}

df = df_raw.rename(columns=map_colunas)
df = df[list(map_colunas.values())].copy()
df.head()

Unnamed: 0,matricula,cpf,nome,genero,grau_instrucao,data_admissao,data_desligamento,data_aposentadoria,lotacao_nome,unidade_nome,...,valor_outras_remuneracoes,valor_ferias,valor_13_salario,valor_diferenca_salarial,valor_irrf,valor_previdencia,valor_descontos_total,valor_desconto_faltas,valor_descontos_diversos,valor_liquido
0,4480,***040714**,EVERALDO PADRE DA SILVA,Masculino,Mdio Completo,1991-08-30,2021-11-01,,CTTU,SETOR DE SERVICOS ADMINISTRATIVOS,...,444.6,0.0,0.0,0.0,0.0,131.5,131.5,0.0,0.0,1512.0
1,4529,***105614**,JOSE ARTUR SEABRA,Masculino,Mdio Completo,1991-09-05,2021-10-01,,CTTU,SETOR DE SERVICOS ADMINISTRATIVOS,...,949.0,0.0,0.0,0.0,0.0,183.9,235.9,52.0,0.0,1911.9
2,512222,***507834**,VICTOR FELIX SILVA MELO,Masculino,Mdio Incompleto,2019-01-16,2019-07-31,,CTTU,SETOR DE PROCESSAMENTO DE MULTAS,...,377.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,377.0
3,512230,***256834**,DAIANNA M C MAIA DE OLIVEIRA,Feminino,Mdio Incompleto,2019-02-01,2019-11-30,,CTTU,GESTAO DE UNID. DE MOB. SUSTENTAVEL,...,377.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,377.0
4,512249,***026244**,ENILTON RUAN M DOS SANTOS,Masculino,Mdio Incompleto,2019-02-01,2020-12-31,,CTTU,SETOR DE PROCESSAMENTO DE MULTAS,...,377.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,377.0


Como os dados foram carregados como strings, precisamos converter o tipo de algumas colunas.

In [4]:
print(df[['ano_folha', 'valor_remuneracao_bruta']].dtypes)

cols_data = ['data_admissao', 'data_desligamento', 'data_aposentadoria', 'data_atualizacao_sistema']
for col in cols_data:
    df[col] = pd.to_datetime(df[col], errors='coerce').dt.date

df['ano_folha'] = pd.to_numeric(df['ano_folha'], errors='coerce').fillna(0).astype(int)
df['mes_folha'] = pd.to_numeric(df['mes_folha'], errors='coerce').fillna(0).astype(int)

cols_financeiras = [c for c in df.columns if 'valor_' in c or 'jornada' in c]
for col in cols_financeiras:
    df[col] = df[col].astype(str).str.replace(',', '.').apply(pd.to_numeric, errors='coerce').fillna(0.0)

print("\nTipos corrigidos:")
print(df[['ano_folha', 'valor_remuneracao_bruta']].dtypes)

ano_folha                  object
valor_remuneracao_bruta    object
dtype: object

Tipos corrigidos:
ano_folha                    int64
valor_remuneracao_bruta    float64
dtype: object


Identificamos valores nulos em colunas críticas. No caso de genero e grau_instrucao, optamos por criar uma categoria 'NI' (Não Informado) para não perder registros.

In [5]:
df.isnull().sum()

matricula                          0
cpf                                0
nome                               0
genero                             2
grau_instrucao                  4275
data_admissao                     36
data_desligamento            1113112
data_aposentadoria           1053502
lotacao_nome                       0
unidade_nome                     441
entidade_nome                      0
tipo_administracao                 0
cargo_nome                         0
funcao_nome                        0
categoria_nome                     0
situacao_nome                      0
jornada_mensal                     0
data_atualizacao_sistema           0
ano_folha                          0
mes_folha                          0
valor_remuneracao_bruta            0
valor_remuneracao_base             0
valor_salario_base                 0
valor_funcao_gratificada           0
valor_outras_remuneracoes          0
valor_ferias                       0
valor_13_salario                   0
v

In [6]:
df['genero'] = df['genero'].fillna('NI')
df['grau_instrucao'] = df['grau_instrucao'].fillna('NAO INFORMADO')
df['unidade_nome'] = df['unidade_nome'].fillna('NAO INFORMADO')

cols_texto = ['nome', 'lotacao_nome', 'unidade_nome', 'entidade_nome',
              'cargo_nome', 'funcao_nome', 'categoria_nome', 'situacao_nome']

for col in cols_texto:
    df[col] = df[col].astype(str).str.strip().str.upper()
    df[col] = df[col].str.replace(r'\.+$', '', regex=True)
    df[col] = df[col].replace({'NAN': np.nan, 'NONE': np.nan, '': np.nan})

df.isnull().sum()

matricula                          0
cpf                                0
nome                               0
genero                             0
grau_instrucao                     0
data_admissao                     36
data_desligamento            1113112
data_aposentadoria           1053502
lotacao_nome                       0
unidade_nome                       0
entidade_nome                      0
tipo_administracao                 0
cargo_nome                         0
funcao_nome                        0
categoria_nome                     0
situacao_nome                      0
jornada_mensal                     0
data_atualizacao_sistema           0
ano_folha                          0
mes_folha                          0
valor_remuneracao_bruta            0
valor_remuneracao_base             0
valor_salario_base                 0
valor_funcao_gratificada           0
valor_outras_remuneracoes          0
valor_ferias                       0
valor_13_salario                   0
v

Ao verificarmos os nulos remanescentes, notamos dois cenários distintos:
1. Erro de Cadastro: A coluna 'data_admissao' possui 36 registros vazios.
   Como esta data é fundamental para o cálculo de tempo de serviço, não podemos deixá-la vazia.
   Adotaremos a data padrão '1900-01-01' (padrão de sistemas legados) para indicar "Data Desconhecida",
   mantendo a consistência com a regra definida no modelo SQL (stg_servidores_unificados).

2. Regra de Negócio: As colunas 'data_desligamento' e 'data_aposentadoria' possuem alto volume de nulos.
   Isso é esperado e correto, pois indica que o servidor está ATIVO na folha.
   Manteremos como NaT (Not a Time) para que o banco de dados entenda como NULL.

In [7]:
data_padrao = pd.to_datetime('1900-01-01').date()
df['data_admissao'] = df['data_admissao'].fillna(data_padrao)

nulos_finais = df[['data_admissao', 'data_desligamento', 'data_aposentadoria']].isnull().sum()
print("Status final dos nulos de data:")
print(nulos_finais)

# Explicando o resultado para quem lê o notebook:
print("\nConclusão:")
print(f"- Admissão: {nulos_finais['data_admissao']} nulos (Corrigido).")
print(f"- Desligamento: {nulos_finais['data_desligamento']} nulos (Representam servidores ATIVOS).")
print(f"- Aposentadoria: {nulos_finais['data_aposentadoria']} nulos (Representam servidores ATIVOS).")

Status final dos nulos de data:
data_admissao               0
data_desligamento     1113112
data_aposentadoria    1053502
dtype: int64

Conclusão:
- Admissão: 0 nulos (Corrigido).
- Desligamento: 1113112 nulos (Representam servidores ATIVOS).
- Aposentadoria: 1053502 nulos (Representam servidores ATIVOS).


RESULTADO FINAL

In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1398332 entries, 0 to 1398331
Data columns (total 34 columns):
 #   Column                     Non-Null Count    Dtype  
---  ------                     --------------    -----  
 0   matricula                  1398332 non-null  object 
 1   cpf                        1398332 non-null  object 
 2   nome                       1398332 non-null  object 
 3   genero                     1398332 non-null  object 
 4   grau_instrucao             1398332 non-null  object 
 5   data_admissao              1398332 non-null  object 
 6   data_desligamento          285220 non-null   object 
 7   data_aposentadoria         344830 non-null   object 
 8   lotacao_nome               1398332 non-null  object 
 9   unidade_nome               1398332 non-null  object 
 10  entidade_nome              1398332 non-null  object 
 11  tipo_administracao         1398332 non-null  object 
 12  cargo_nome                 1398332 non-null  object 
 13  funcao_nome 

Com os dados limpos e tipados, avançamos para a modelagem dimensional. O objetivo aqui é sair de um tabela única para um Esquema Estrela, que otimiza o desempenho analítico e garante a integridade dos dados.

As principais decisões de modelagem foram:

- Normalização: Removemos a redundância de textos repetitivos (cargos, lotações, situações) movendo-os para tabelas de Dimensão.

- Deduplicação de Servidores: Um servidor pode aparecer múltiplas vezes na base original (uma vez por mês/ano). Na `dim_servidor`, garantimos um registro único por matrícula, consolidando sua data de admissão mais antiga.

- Chaves Substitutas: Criamos IDs numéricos (`id_cargo`, `id_servidor`) para isolar nossa estrutura interna de mudanças nos sistemas de origem.

In [9]:
dim_tempo = df[['ano_folha', 'mes_folha']].drop_duplicates().sort_values(['ano_folha', 'mes_folha']).reset_index(drop=True)

dim_tempo['id_tempo'] = dim_tempo['ano_folha'] * 100 + dim_tempo['mes_folha']

dim_tempo['semestre'] = dim_tempo['mes_folha'].apply(lambda x: 1 if x <= 6 else 2)
dim_tempo['trimestre'] = dim_tempo['mes_folha'].apply(lambda x: (x - 1) // 3 + 1)

mapa_meses = {
    1:'JANEIRO', 2:'FEVEREIRO', 3:'MARCO', 4:'ABRIL', 5:'MAIO', 6:'JUNHO',
    7:'JULHO', 8:'AGOSTO', 9:'SETEMBRO', 10:'OUTUBRO', 11:'NOVEMBRO', 12:'DEZEMBRO'
}
dim_tempo['nome_mes'] = dim_tempo['mes_folha'].map(mapa_meses)

print(f"Dimensão Tempo criada: {len(dim_tempo)} registros.")

Dimensão Tempo criada: 36 registros.


In [10]:
cols_cargo = ['cargo_nome', 'funcao_nome', 'categoria_nome']
dim_cargo = df[cols_cargo].drop_duplicates().sort_values(cols_cargo).reset_index(drop=True)
dim_cargo['id_cargo'] = dim_cargo.index + 1

print(f"Dimensão Cargo criada: {len(dim_cargo)} registros.")

Dimensão Cargo criada: 1869 registros.


In [11]:
dim_situacao = df[['situacao_nome']].drop_duplicates().sort_values('situacao_nome').reset_index(drop=True)
dim_situacao['id_situacao'] = dim_situacao.index + 1
dim_situacao['is_ativo'] = ~dim_situacao['situacao_nome'].str.contains('DESLIGADO/EXONERADO|APOSENTADO', na=False, case=False)

print(f"Dimensão Situação criada: {len(dim_situacao)} registros.")

Dimensão Situação criada: 3 registros.


In [12]:
cols_lotacao = ['lotacao_nome', 'unidade_nome', 'entidade_nome', 'tipo_administracao']

dim_lotacao = df[cols_lotacao].drop_duplicates()
dim_lotacao = dim_lotacao.sort_values(['entidade_nome', 'unidade_nome', 'lotacao_nome']).reset_index(drop=True)
dim_lotacao['id_lotacao'] = dim_lotacao.index + 1

print(f"Dimensão Lotação criada: {len(dim_lotacao)} registros.")

Dimensão Lotação criada: 3024 registros.


In [13]:
cols_datas = ['data_admissao', 'data_desligamento', 'data_aposentadoria']
for col in cols_datas:
    df[col] = pd.to_datetime(df[col], errors='coerce')

cols_chave_servidor = ['matricula', 'entidade_nome', 'cpf']

dim_servidor = df.groupby(cols_chave_servidor).agg({
    'nome': 'max',
    'genero': 'max',
    'grau_instrucao': 'max',
    'data_admissao': 'min',
    'data_desligamento': 'max',
    'data_aposentadoria': 'max'
}).reset_index()

for col in cols_datas:
    dim_servidor[col] = dim_servidor[col].dt.date

dim_servidor['id_servidor'] = dim_servidor.index + 1

print(f"Dimensão Servidor criada: {len(dim_servidor)} registros únicos.")

Dimensão Servidor criada: 49966 registros únicos.


Realizamos o cruzamento do dataset principal com as dimensões criadas anteriormente. O objetivo é substituir os textos descritivos pelos IDs numéricos (Foreign Keys) e manter na tabela de fatos apenas as colunas de métricas financeiras e horas trabalhadas.

In [14]:
fato_folha = df.copy()

fato_folha['id_tempo'] = fato_folha['ano_folha'] * 100 + fato_folha['mes_folha']

fato_folha = pd.merge(fato_folha, dim_cargo, on=['cargo_nome', 'funcao_nome', 'categoria_nome'], how='left')
fato_folha = pd.merge(fato_folha, dim_situacao, on='situacao_nome', how='left')
fato_folha = pd.merge(fato_folha, dim_lotacao, on=['lotacao_nome', 'unidade_nome', 'entidade_nome', 'tipo_administracao'], how='left')
fato_folha = pd.merge(fato_folha, dim_servidor[['matricula', 'entidade_nome', 'cpf', 'id_servidor']],
                      on=['matricula', 'entidade_nome', 'cpf'], how='left')

cols_finais = [
    'id_servidor', 'id_cargo', 'id_lotacao', 'id_tempo', 'id_situacao',
    'jornada_mensal',
    'valor_salario_base', 'valor_remuneracao_bruta', 'valor_liquido',
    'valor_previdencia', 'valor_irrf', 'valor_descontos_total',
    'valor_ferias', 'valor_13_salario'
]

fato_folha_final = fato_folha[cols_finais]

print(f"Fato Folha montada: {fato_folha_final.shape}")
display(fato_folha_final.head())

Fato Folha montada: (1398332, 14)


Unnamed: 0,id_servidor,id_cargo,id_lotacao,id_tempo,id_situacao,jornada_mensal,valor_salario_base,valor_remuneracao_bruta,valor_liquido,valor_previdencia,valor_irrf,valor_descontos_total,valor_ferias,valor_13_salario
0,25471,1866,297,201904,3,220.0,1198.8,1643.4,1512.0,131.5,0.0,131.5,0.0,0.0
1,25576,1866,297,201904,3,220.0,1198.8,2147.8,1911.9,183.9,0.0,235.9,0.0,0.0
2,26617,1007,283,201904,3,110.0,0.0,377.0,377.0,0.0,0.0,0.0,0.0,0.0
3,26618,1007,179,201904,3,110.0,0.0,377.0,377.0,0.0,0.0,0.0,0.0,0.0
4,26619,1007,283,201904,3,110.0,0.0,377.0,377.0,0.0,0.0,0.0,0.0,0.0


## Carga no Data Warehouse

Por fim, enviamos as tabelas modeladas (`dim_` e `fato_`) para o banco de dados PostgreSQL, utilizando SQLALchemy. As variáveis de ambiente são definidas no arquivo `.env`.

In [15]:
load_dotenv()

db_user = os.getenv('POSTGRES_USER')
db_password = os.getenv('POSTGRES_PASSWORD')
db_host = os.getenv('POSTGRES_HOST')
db_port = os.getenv('POSTGRES_PORT')
db_name = os.getenv('POSTGRES_DB')

DATABASE_URL = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
engine = create_engine(DATABASE_URL)

print("Engine de banco de dados criada.")

Engine de banco de dados criada.


In [16]:
def carregar_tabela(df, nome_tabela, engine, schema='public'):
    print(f"Carregando tabela {nome_tabela}...")
    df.to_sql(nome_tabela, con=engine, if_exists='replace', index=False, schema=schema)
    print(f"Tabela {nome_tabela} carregada com sucesso!")

schema_destino = 'servidores_etl'

with engine.begin() as connection:
    connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_destino};"))

carregar_tabela(dim_tempo, 'dim_tempo', engine, schema_destino)
carregar_tabela(dim_cargo, 'dim_cargo', engine, schema_destino)
carregar_tabela(dim_situacao, 'dim_situacao', engine, schema_destino)
carregar_tabela(dim_lotacao, 'dim_lotacao', engine, schema_destino)
carregar_tabela(dim_servidor, 'dim_servidor', engine, schema_destino)
carregar_tabela(fato_folha_final, 'fato_folha', engine, schema_destino)

print("ETL concluído com sucesso!")

Carregando tabela dim_tempo...
Tabela dim_tempo carregada com sucesso!
Carregando tabela dim_cargo...
Tabela dim_cargo carregada com sucesso!
Carregando tabela dim_situacao...
Tabela dim_situacao carregada com sucesso!
Carregando tabela dim_lotacao...
Tabela dim_lotacao carregada com sucesso!
Carregando tabela dim_servidor...
Tabela dim_servidor carregada com sucesso!
Carregando tabela fato_folha...
Tabela fato_folha carregada com sucesso!
ETL concluído com sucesso!


## Resumo
Neste notebook, implementamos um pipeline de dados completo, reproduzindo a lógica de engenharia de dados moderna (ETL) utilizando Python e Pandas. As etapas percorridas foram:
1. **Extract (Extração):**

      * Ingestão de dados brutos de múltiplos anos (CSV).
      * Tratamento de arquivos com formatação inconsistente de aspas (`csv.QUOTE_NONE`), uma característica comum em dumps de sistemas legados.

2. **Transform (Transformação):**

      * Padronização de nomes de colunas, tipagem (conversão de strings para inteiros/datas) e tratamento de nulos (`NAO INFORMADO`, datas padrão `1900-01-01`).
      * Criação de um Esquema Estrela, separando entidades (Servidor, Cargo, Lotação) de eventos (Pagamentos na Fato), facilitando análises futuras.

3. **Load (Carga):**

      * Carga dos dados modelados em um banco de dados (PostgreSQL - Schema `servidores_etl`), prontos para consumo por ferramentas de BI ou comparação de qualidade com o pipeline ELT (dbt).