In [1]:
import pandas as pd
import unicodedata
import re
import hashlib
from sqlalchemy import create_engine, inspect, text
import os
import time
import csv
import io
import numpy as np

In [2]:
df = pd.read_csv(
    "../Data Layer/silver/Viagens_2025_LIMPAS.csv",
    sep=";",
    encoding="latin1",
    decimal=",",
    dayfirst=True,
    parse_dates=[
        'Período - Data de início',
        'Período - Data de fim'
    ]
)

In [3]:
df.columns = (
    df.columns
      .str.normalize('NFKD')
      .str.encode('ascii', errors='ignore')
      .str.decode('utf-8')
      .str.lower()
      .str.replace(r'[^\w]+', '_', regex=True)
      .str.strip('_')
)
df = df.rename(columns={
    'periodo_data_de_inicio': 'data_inicio',
    'periodo_data_de_fim': 'data_fim',
    'codigo_do_orgao_superior' : 'codigo_orgao_superior',
    'nome_do_orgao_superior' : 'nome_orgao_superior'
})

In [4]:
df['data_inicio'] = pd.to_datetime(df['data_inicio'])
df['data_fim'] = pd.to_datetime(df['data_fim'])

In [5]:
df['duracao_viagem_dias'] = (df['data_fim'] - df['data_inicio']).dt.days + 1

In [6]:
df['custo_medio_diario'] = (
    df['total_gasto']
    .where(df['duracao_viagem_dias'] > 0)
    / df['duracao_viagem_dias']
).round(2)

In [7]:
DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://bi_bancos2:1q2w3e@db:5432/viagens_db')
engine = create_engine(DATABASE_URL)

In [8]:
def hash_to_bigint_unique(value: str, used_ids: set) -> int:
    salt = 0

    while True:
        raw = f"{value}|{salt}"
        h = hashlib.sha256(raw.encode()).hexdigest()
        candidate = int(h[:16], 16) & 0x7FFFFFFFFFFFFFFF

        if candidate not in used_ids:
            used_ids.add(candidate)
            return candidate

        salt += 1

In [9]:
def limpar_tabelas(engine):
    with engine.connect() as conn:
        conn.execute(text("TRUNCATE TABLE gold.fat_viagem, gold.dim_tempo, gold.dim_orgao_superior CASCADE;"))
        conn.commit()
        print("Tabelas limpas para nova carga.")

limpar_tabelas(engine)

Tabelas limpas para nova carga.


In [10]:
def sanitize(df):

    df = df.copy()

    for col in df.columns:
        if df[col].dtype == 'object' or pd.api.types.is_categorical_dtype(df[col]):
            
            df[col] = df[col].astype(str).replace(['None', 'nan', 'NaN', 'NAT'], np.nan).str.strip()
            
            df[col] = (df[col].str.normalize('NFKD')
                       .str.encode('ascii', errors='ignore')
                       .str.decode('utf-8'))
            
            df[col] = df[col].str.replace(r'[\x00-\x1F\x7F-\x9F]', '', regex=True)
            
            df[col] = df[col].str.replace(r'\s+', ' ', regex=True)

        elif pd.api.types.is_float_dtype(df[col]):
            if (df[col].dropna() % 1 == 0).all():
                df[col] = df[col].astype('Int64')

    return df

In [11]:
mapa_dias = {
    0: 'Segunda-feira',
    1: 'Terça-feira',
    2: 'Quarta-feira',
    3: 'Quinta-feira',
    4: 'Sexta-feira',
    5: 'Sábado',
    6: 'Domingo'
}

dim_tempo = (
    df[['data_inicio']]
    .drop_duplicates()
    .assign(
        ano=lambda x: x['data_inicio'].dt.year,
        mes_numero=lambda x: x['data_inicio'].dt.month,
        mes_nome=lambda x: x['data_inicio'].dt.month_name(),
        dia=lambda x: x['data_inicio'].dt.day,
        dia_semana_nome=lambda x: x['data_inicio'].dt.weekday.map(mapa_dias)
    )
    .reset_index(drop=True)
)

used_ids = set()

dim_tempo['tempo_id'] = dim_tempo['data_inicio'].astype(str).apply(
    lambda x: hash_to_bigint_unique(x, used_ids)
)

dim_tempo = dim_tempo[[
    'tempo_id',
    'data_inicio',
    'ano',
    'mes_numero',
    'mes_nome',
    'dia_semana_nome'
]]

In [12]:
dim_orgao_superior = df[['codigo_orgao_superior', 'nome_orgao_superior']].drop_duplicates().copy()

dim_orgao_superior['codigo_orgao_superior'] = dim_orgao_superior['codigo_orgao_superior'].astype('Int64')


dim_orgao_superior['orgao_superior_id'] = dim_orgao_superior.apply(
    lambda row: hash_to_bigint_unique(
        f"{row['codigo_orgao_superior']}-{row['nome_orgao_superior']}",
        used_ids
    ),
    axis=1
).astype('int64')

In [13]:
dim_orgao_solicitante = (
    df[['codigo_orgao_solicitante', 'nome_orgao_solicitante', 'codigo_orgao_superior']]
    .drop_duplicates()
    .reset_index(drop=True)
)

dim_orgao_solicitante['codigo_orgao_solicitante'] = (
    dim_orgao_solicitante['codigo_orgao_solicitante'].astype('Int64')
)
dim_orgao_solicitante['codigo_orgao_superior'] = (
    dim_orgao_solicitante['codigo_orgao_superior'].astype('Int64')
)

dim_orgao_solicitante['orgao_solicitante_id'] = dim_orgao_solicitante.apply(
    lambda row: hash_to_bigint_unique(
        f"{row['codigo_orgao_solicitante']}-"
        f"{row['nome_orgao_solicitante']}-"
        f"{row['codigo_orgao_superior']}",
        used_ids
    ),
    axis=1
).astype('int64')

dim_orgao_solicitante = dim_orgao_solicitante[[
    'orgao_solicitante_id',
    'codigo_orgao_solicitante',
    'nome_orgao_solicitante',
    'codigo_orgao_superior'
]]

In [14]:
dim_viajante = (
    df[['cpf_viajante', 'nome', 'cargo', 'descricao_funcao']]
    .fillna('')
    .drop_duplicates()
    .reset_index(drop=True)
)

dim_viajante['viajante_id'] = dim_viajante.apply(
    lambda row: hash_to_bigint_unique(
        f"{row['cpf_viajante']}-"
        f"{row['nome']}-"
        f"{row['cargo']}-"
        f"{row['descricao_funcao']}",
        used_ids
    ),
    axis=1
).astype('int64')

dim_viajante = dim_viajante[[
    'viajante_id',
    'cpf_viajante',
    'nome',
    'cargo',
    'descricao_funcao'
]]


In [15]:
dim_motivo = df.loc[:, ['motivo']].copy()

dim_motivo = sanitize(dim_motivo)

dim_motivo = (
    dim_motivo
    .fillna('')
    .drop_duplicates()
    .reset_index(drop=True)
)

dim_motivo['motivo_id'] = dim_motivo.apply(
    lambda row: hash_to_bigint_unique(
        row['motivo'],
        used_ids
    ),
    axis=1
).astype('int64')

dim_motivo = dim_motivo[[
    'motivo_id',
    'motivo'
]]

In [16]:
fat_viagens = (
    df
    .merge(dim_tempo, on='data_inicio', how='left')
    .merge(dim_orgao_superior, on='codigo_orgao_superior', how='left')
    .merge(dim_orgao_solicitante, on='codigo_orgao_solicitante', how='left')
    .merge(dim_viajante, on='cpf_viajante', how='left')
    .merge(dim_motivo, on='motivo', how='left')
    [[
        'tempo_id',
        'orgao_superior_id',
        'orgao_solicitante_id',
        'viajante_id',
        'motivo_id',
        'valor_diarias',
        'valor_passagens',
        'valor_outros_gastos',
        'valor_devolucao',
        'total_gasto',
        'duracao_viagem_dias',
        'custo_medio_diario'
    ]]
)

In [17]:
with engine.connect() as conn:
    conn.execute(text("SET search_path TO gold, public"))
    conn.commit() 

In [18]:
def psql_bulk_copy(df, table_name, engine, schema='gold'):
    output = io.StringIO()
    df.to_csv(output, sep='\t', header=False, index=False, quoting=csv.QUOTE_MINIMAL)
    output.seek(0)
    
    raw_conn = engine.raw_connection()
    try:
        cursor = raw_conn.cursor()
        
        full_table_name = f'{table_name}'
        
        columns = [f'{col}' for col in df.columns]
        
        cursor.copy_from(output, full_table_name, sep='\t', columns=columns, null="")
        raw_conn.commit()
        print(f"Sucesso: {len(df)} linhas inseridas em {full_table_name}")
    except Exception as e:
        raw_conn.rollback()
        raise e
    finally:
        cursor.close()
        raw_conn.close()

In [19]:
psql_bulk_copy(dim_tempo, 'dim_tempo', engine)
psql_bulk_copy(dim_orgao_superior, 'dim_orgao_superior', engine)
psql_bulk_copy(dim_viajante, 'dim_viajante', engine)

Sucesso: 362 linhas inseridas em dim_tempo
Sucesso: 34 linhas inseridas em dim_orgao_superior
Sucesso: 33214 linhas inseridas em dim_viajante


In [20]:
psql_bulk_copy(dim_motivo, 'dim_motivo', engine)

CharacterNotInRepertoire: invalid byte sequence for encoding "UTF8": 0x82
CONTEXT:  COPY dim_motivo, line 39489: "161773708426948204	Participar de reuniao com os reitores das universidades federais e das instituico..."


In [None]:
psql_bulk_copy(dim_orgao_solicitante, 'dim_orgao_solicitante', engine)

In [None]:
# 6. Tabela de Fatos
psql_bulk_copy(fat_viagens, 'fat_viagem', engine)