# Pipeline ETL

## Extração


Em ETL/extração.py, os dados são extraídos dos arquivos .CSV da prefeitura que estão na pasta data/

Utilizando a bibilioteca Pandas, criamos um dataframe para cada arquivo entre 2006 e 2017, armazenamos todos em uma lista e depois os concatenamos e salvamos em um único grande arquivo .csv que será transformado posteriormente.


In [None]:
import pandas as pd
from pathlib import Path

pasta = Path(r"data")

# Usa glob() do objeto Path de pathlib para conseguir o path para todos os arquivos CSV entre 2006 e 2017
arquivos_csv = sorted(
    [arq for arq in pasta.glob("recife-dados-despesas-*.csv") 
     if "2006" <= arq.stem.split('-')[-1] <= "2017"]
) 

lista_dfs = []

# Tabelas são carregads como dataframes e armazenadas na lista
for arq in arquivos_csv:
    data_frame = pd.read_csv(arq, sep=';', encoding='utf-8') 
    lista_dfs.append(data_frame)

# Concatenação das tabelas na lista
df_final = pd.concat(lista_dfs) 

# Salva dataframe unificado para transformação
df_final.to_csv("despesas_recife.csv",index=False)


## Transformação


Em ETL/transformação.py os dados são transformados, garantindo consistência dos dados antes destes serem carregados no banco de dados Postgres.

Primeiramente carregamos o CSV unificado criado no primeiro passo e indentificamos as colunas cujos elementos devem ser valores inteiros(Colunas relacionados a anos, meses e códigos de identificação) e aquelas colunas cujos valores devem ser números decimais (Valores monetários)

In [None]:
import pandas as pd

# Carrega dataframe unificado
df = pd.read_csv("despesas_recife.csv", encoding='utf-8')


# Teste se existia alguma valor faltando em alguma coluna da tabela --- resultado 0 valores faltando nas colunas
# assim não foi preciso tratar valores faltantes
# print(df.isnull().sum())


COL_INT = ('empenho_ano', 'ano_movimentacao', 'mes_movimentacao', 'orgao_codigo',
            'grupo_despesa_codigo','modalidade_aplicacao_codigo','elemento_codigo',
            'subelemento_codigo','funcao_codigo','subfuncao_codigo','programa_codigo',
            'acao_codigo','fonte_recurso_codigo','empenho_numero','subempenho', 'credor_codigo',
            'modalidade_licitacao_codigo')

# Opta-se por numeric ao invés de float para evitar problemas com arredondamento e conseguir mais precisão
COL_NUMERIC = ('valor_empenhado', 'valor_liquidado', 'valor_pago')


Em seguida, efetuamos as transformações. Removemos espaços vazios e tornamos todos os caracteres de todas as colunas minúsculos para garantir que não haja uma repetição errônea de dados (Exemplo: Várias entradas para o mesmo Orgão só porque algumas dessas entradas têm uma quantidade diferente de espaços brancos.)

Então, convertemos as colunas previamente identificadas para int e numeric. No caso de numeric, como descobrimos uma inconsistência em como os valores monetários são armazenadas na tabela (Pré-2016: separação de decimais com . Pós-2016: separação de decimais com ,) primeiro trocamos ',' por '.' nas colunas de dinheiro pós-2016 e só então convertemos para numeric.

In [None]:
# Padronização das colunas
df.columns = (
    df.columns
    .str.strip()         # Remove espaços vazios no começo e fim
    .str.lower()         # Deixa tudo minúsculo
    .str.replace(" ", "_")  # Substitui espaços por underline
)

for coluna in COL_INT:
    df[coluna] = df[coluna].astype(int)

# Erro na transformação, a partir de 2016 'valor_empenhado', 'valor_liquidado', 'valor_pago' começaram a vir
# com , e não com . como nos anos anteriores
for coluna in COL_NUMERIC:
    # Para os anos a partir de 2016, troca vírgula por ponto antes de converter
    mask_2016 = df['ano_movimentacao'] >= 2016
    df.loc[mask_2016, coluna] = (
        df.loc[mask_2016, coluna]
        .astype(str)
        .str.replace('.', '', regex=False)   # Remove separador de milhar, se existir
        .str.replace(',', '.', regex=False)
    )

    # Aqui converte para numeric e errors='coerce' faz valores inválidos virarem NAN por precaução
    df[coluna] = pd.to_numeric(df[coluna],errors='coerce')


Por fim, asseguramos que todas as demais colunas sejam string e armazenamos o dateframe transformado em outro arquivo csv para o carregamento no banco de dados.

In [None]:
# Pega o conjunto de todas as colunas
todas_colunas = set(df.columns)

# Subtrai as colunas numéricas e inteiras
colunas_string = todas_colunas - set(COL_INT) - set(COL_NUMERIC)

# Converte as colunas restantes para string
for coluna in colunas_string:
    df[coluna] = df[coluna].astype(str)


# Salvando tratamento em um novo arquivo csv
df.to_csv("despesas_recife_tratadas.csv", index=False, encoding='utf-8')


## Carregamento

Inicialmente configuramos a conexão com o banco de dados postgres utilizando SQLAlchemy e váriaveis definidas no arquivo .env

In [None]:
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os

load_dotenv()

DATABASE_URL = f"postgresql+psycopg2://{os.getenv('DATABASE_USER')}:{os.getenv('DATABASE_PASSWORD')}@{os.getenv('DATABASE_HOST')}:{os.getenv('DATABASE_PORT')}/{os.getenv('DATABASE_NAME')}"
engine = create_engine(DATABASE_URL)

Em ETL/carregamento.py realizamos o carregamento dos dados tratados para o postgres. Primeiramente definimos o tipo que cada coluna deve ter no banco de dados e então utilizamos o método .to_sql da biblioteca pandas em conjunta com a conexão ao banco de dados Postgres feita com SQLAlchemy para realizar o upload da tabela transformada.

In [None]:
from postgres.engine import engine
from sqlalchemy.types import Integer, Numeric, String
import pandas as pd

# Carrega o CSV JÁ TRATADO
df = pd.read_csv("despesas_recife_tratadas.csv", encoding='utf-8')

# Apenas mapeia tipos do Pandas para o PostgreSQL evitando que ele interprete sozinho e converta os valores que eu já havia transformado
tipos_colunas_sqlalchemy = {
    # Colunas inteiras
    'ano_movimentacao': Integer(),
    'mes_movimentacao': Integer(),
    'orgao_codigo': Integer(),
    'grupo_despesa_codigo': Integer(),
    'modalidade_aplicacao_codigo': Integer(),
    'elemento_codigo': Integer(),
    'subelemento_codigo': Integer(),
    'funcao_codigo': Integer(),
    'subfuncao_codigo': Integer(),
    'programa_codigo': Integer(),
    'acao_codigo': Integer(),
    'fonte_recurso_codigo': Integer(),
    'empenho_ano': Integer(),
    'empenho_numero': Integer(),
    'subempenho': Integer(),
    'credor_codigo': Integer(),
    'modalidade_licitacao_codigo': Integer(),

    # Colunas numéricas
    'valor_empenhado': Numeric(18, 2),
    'valor_liquidado': Numeric(18, 2),
    'valor_pago': Numeric(18, 2),

    #  colunas que são strings
    'orgao_nome': String(255),
    'unidade_codigo': String(50),  # Códigos não numéricos são strings
    'unidade_nome': String(255),
    'categoria_economica_codigo': String(50),
    'categoria_economica_nome': String(255),
    'grupo_despesa_nome': String(255),
    'modalidade_aplicacao_nome': String(255),
    'elemento_nome': String(255),
    'subelemento_nome': String(255),
    'funcao_nome': String(255),
    'subfuncao_nome': String(255),
    'programa_nome': String(255),
    'acao_nome': String(255),
    'fonte_recurso_nome': String(255),
    'empenho_modalidade_nome': String(255),
    'empenho_modalidade_codigo': String(50),
    'indicador_subempenho': String(50),
    'credor_nome': String(255),
    'modalidade_licitacao_nome': String(255)
}

# Envia ao banco (os dados já estão tratados)
print("Carregamento de dados iniciado")
df.to_sql(
    name="despesas_recife",
    con=engine,
    if_exists="replace",
    index=False,
    dtype=tipos_colunas_sqlalchemy
)

print("Carregamento de dados finalizado")