# EXERCÍCIO 2 - HABILIDADES DE ENGENHARIA DE DADOS 
Objetivo do Exercício 
Construir um pipeline completo de engenharia de dados que: 
1. Faça a ingestão dos dados brutos (estruturados e semi-estruturados). 
2. Realize a transformação e tratamento dos dados, assegurando qualidade e 
consistência. 
3. Modele um Data Warehouse simplificado ou crie estruturas de Data Lake 
otimizadas. 
4. Disponibilize dados limpos e prontos para o time de modelagem e analytics. 

# Parte 1:  Ingestão de Dados

In [1]:
# imporntação das bibliotecas

import pandas as pd
import numpy as np

# Leitura dos dados
df_clientes = pd.read_csv('clientes.csv')
df_contratos = pd.read_json('contratos_credito.json')
df_transacoes = pd.read_csv('transacoes_financeiras.csv')

# Visualização rápida
print("Clientes")
display(df_clientes.head())

print("Contratos de Crédito")
display(df_contratos.head())

print("Transações Financeiras")
display(df_transacoes.head())


Clientes


Unnamed: 0,id_cliente,nome,cpf,data_nascimento,renda_mensal,regiao
0,1,Kaique Novaes,127.435.896-55,2004-09-29,8116.26,Sul
1,2,Nicole Farias,170.946.583-20,1960-07-04,4485.26,Sul
2,3,Dr. Henrique Costela,756.801.293-02,1969-08-10,12340.15,Nordeste
3,4,Raquel Costa,961.730.458-93,1986-09-07,3963.9,Centro-Oeste
4,5,Augusto Mendonça,359.871.406-84,1989-01-20,9725.73,Sul


Contratos de Crédito


Unnamed: 0,id_contrato,id_cliente,valor_contrato,saldo_devedor,data_contratacao,prazo_meses,status_contrato
0,1,284,64082.95,48802.23,2024-02-24,36,liquidado
1,2,200,23469.5,21652.34,2020-06-26,60,inadimplente
2,3,581,74005.94,48737.01,2020-07-20,60,ativo
3,4,200,9394.41,7528.93,2020-07-11,60,inadimplente
4,5,408,11431.39,3714.05,2024-06-11,48,ativo


Transações Financeiras


Unnamed: 0,id_transacao,id_cliente,data_transacao,valor_transacao,tipo_transacao
0,1,438,2024-12-15,9498.66,DOC
1,2,814,2024-12-02,9909.42,DOC
2,3,912,2024-03-06,416.96,saque
3,4,974,2023-04-30,1734.11,pagamento
4,5,573,2024-10-29,1915.23,saque


In [2]:
# Checar duplicidades de chaves primárias
print("Duplicados em id_cliente:", df_clientes['id_cliente'].duplicated().sum())
print("Duplicados em id_contrato:", df_contratos['id_contrato'].duplicated().sum())
print("Duplicados em id_transacao:", df_transacoes['id_transacao'].duplicated().sum())


Duplicados em id_cliente: 0
Duplicados em id_contrato: 0
Duplicados em id_transacao: 0


In [3]:
# Clientes referenciados em contratos mas que não existem na base de clientes
contratos_orfaos = df_contratos[~df_contratos['id_cliente'].isin(df_clientes['id_cliente'])]
print("Contratos com clientes inexistentes:", len(contratos_orfaos))

# Clientes referenciados em transações mas que não existem na base de clientes
transacoes_orfas = df_transacoes[~df_transacoes['id_cliente'].isin(df_clientes['id_cliente'])]
print("Transações com clientes inexistentes:", len(transacoes_orfas))


Contratos com clientes inexistentes: 0
Transações com clientes inexistentes: 0


### Aparentemente nenhum dado duplicado por id, ou dados orfãos. Achei diferente e dupliquei alguns dados para testar ver se tava lendo certo e sim tá lendo certo(Print no Readme.md).

# Parte 2: Qualidade e Tratamento de Dados 

In [4]:
# Verificando dados ausentes
print("Clientes:")
display(df_clientes.isnull().sum())

print("Contratos:")
display(df_contratos.isnull().sum())

print("Transações:")
display(df_transacoes.isnull().sum())


Clientes:


id_cliente         0
nome               0
cpf                0
data_nascimento    0
renda_mensal       0
regiao             0
dtype: int64

Contratos:


id_contrato         0
id_cliente          0
valor_contrato      0
saldo_devedor       0
data_contratacao    0
prazo_meses         0
status_contrato     0
dtype: int64

Transações:


id_transacao       0
id_cliente         0
data_transacao     0
valor_transacao    0
tipo_transacao     0
dtype: int64

### não tem nenhum dado ausente, fiz testes manuais para ver se tava indeficando certinho conforme evidencias no readme.md

In [5]:
# importação da biblioteca
import re

def validar_cpf(cpf):
    cpf = re.sub(r'\D', '', str(cpf))  # Remove não dígitos
    if len(cpf) != 11 or cpf == cpf[0] * 11:
        return False
    return True

df_clientes['cpf_valido'] = df_clientes['cpf'].apply(validar_cpf)
df_clientes_invalidos = df_clientes[~df_clientes['cpf_valido']]
print("CPFs inválidos encontrados:", len(df_clientes_invalidos))

# Remover ou marcar como inválidos
df_clientes = df_clientes[df_clientes['cpf_valido']].drop(columns=['cpf_valido'])


CPFs inválidos encontrados: 0


In [6]:

# Conversão das datas
df_clientes['data_nascimento'] = pd.to_datetime(df_clientes['data_nascimento'], errors='coerce')
df_contratos['data_contratacao'] = pd.to_datetime(df_contratos['data_contratacao'], errors='coerce')
df_transacoes['data_transacao'] = pd.to_datetime(df_transacoes['data_transacao'], errors='coerce')

# Verificação de datas inválidas (NaT)
print("Clientes com datas de nascimento inválidas:", df_clientes['data_nascimento'].isna().sum())
print("Contratos com datas inválidas:", df_contratos['data_contratacao'].isna().sum())
print("Transações com datas inválidas:", df_transacoes['data_transacao'].isna().sum())



Clientes com datas de nascimento inválidas: 0
Contratos com datas inválidas: 0
Transações com datas inválidas: 0


In [7]:
# Padronização de nomes para Nome Próprio
df_clientes['nome'] = df_clientes['nome'].str.title().str.strip()

# Normalização de regiões
df_clientes['regiao'] = df_clientes['regiao'].str.lower().str.strip()


In [8]:
# Verificando os Resultados Parciais
print("Clientes tratados:")
display(df_clientes.head())

print("Contratos tratados:")
display(df_contratos.head())

print("Transações tratadas:")
display(df_transacoes.head())


Clientes tratados:


Unnamed: 0,id_cliente,nome,cpf,data_nascimento,renda_mensal,regiao
0,1,Kaique Novaes,127.435.896-55,2004-09-29,8116.26,sul
1,2,Nicole Farias,170.946.583-20,1960-07-04,4485.26,sul
2,3,Dr. Henrique Costela,756.801.293-02,1969-08-10,12340.15,nordeste
3,4,Raquel Costa,961.730.458-93,1986-09-07,3963.9,centro-oeste
4,5,Augusto Mendonça,359.871.406-84,1989-01-20,9725.73,sul


Contratos tratados:


Unnamed: 0,id_contrato,id_cliente,valor_contrato,saldo_devedor,data_contratacao,prazo_meses,status_contrato
0,1,284,64082.95,48802.23,2024-02-24,36,liquidado
1,2,200,23469.5,21652.34,2020-06-26,60,inadimplente
2,3,581,74005.94,48737.01,2020-07-20,60,ativo
3,4,200,9394.41,7528.93,2020-07-11,60,inadimplente
4,5,408,11431.39,3714.05,2024-06-11,48,ativo


Transações tratadas:


Unnamed: 0,id_transacao,id_cliente,data_transacao,valor_transacao,tipo_transacao
0,1,438,2024-12-15,9498.66,DOC
1,2,814,2024-12-02,9909.42,DOC
2,3,912,2024-03-06,416.96,saque
3,4,974,2023-04-30,1734.11,pagamento
4,5,573,2024-10-29,1915.23,saque


### deixei os nomes de região minisculo, nomes de pessoas em padrão de nomes próprio, tratei as datas e cpf. O tipo de transção tem DOC e TED em caixa alta e os demais em caixa baixa não vi a nescessidade de deixar tudo caixa alta ou baixa. Mas se mais para a frente precisar eu volto e normalizo

# Parte 3 Transformação e Modelagem 

In [9]:
import os

# Criação das pastas do Data Lake, caso não exista
pastas_lake = [
    'dados/bronze',
    'dados/silver',
    'dados/gold'
]

for pasta in pastas_lake:
    os.makedirs(pasta, exist_ok=True)
    print(f'Pasta criada/verificada: {pasta}')


Pasta criada/verificada: dados/bronze
Pasta criada/verificada: dados/silver
Pasta criada/verificada: dados/gold


In [10]:
# Padronização dos dados para Bronze
clientes_bronze = pd.read_csv('dados/raw/clientes.csv').rename(columns=str.lower)
contratos_bronze = pd.read_json('dados/raw/contratos_credito.json').rename(columns=str.lower)
transacoes_bronze = pd.read_csv('dados/raw/transacoes_financeiras.csv').rename(columns=str.lower)

# Salvando em Bronze
clientes_bronze.to_csv('dados/bronze/clientes_bronze.csv', index=False)
contratos_bronze.to_csv('dados/bronze/contratos_bronze.csv', index=False)
transacoes_bronze.to_csv('dados/bronze/transacoes_bronze.csv', index=False)


In [11]:
# tratamento - clientes
clientes_silver = clientes_bronze.copy()

# Remover caracteres não numéricos do CPF
clientes_silver['cpf'] = clientes_silver['cpf'].astype(str).str.replace(r'\D', '', regex=True)

# Validar CPF: 11 dígitos e não repetido
clientes_silver = clientes_silver[clientes_silver['cpf'].apply(lambda x: len(x) == 11 and x != x[0]*11)]

# Padronizar nome e região
clientes_silver['nome'] = clientes_silver['nome'].str.title().str.strip()
clientes_silver['regiao'] = clientes_silver['regiao'].str.upper().str.strip()

# Datas e renda
clientes_silver['data_nascimento'] = pd.to_datetime(clientes_silver['data_nascimento'], errors='coerce')
clientes_silver['renda_mensal'] = clientes_silver['renda_mensal'].fillna(clientes_silver['renda_mensal'].median())


In [12]:
# tratamento - contratos
contratos_silver = contratos_bronze.copy()

# Conversão de data
contratos_silver['data_contratacao'] = pd.to_datetime(contratos_silver['data_contratacao'], errors='coerce')

# Validação de FK (id_cliente existente)
contratos_silver = contratos_silver[contratos_silver['id_cliente'].isin(clientes_silver['id_cliente'])]

# Tipagem e nulos
contratos_silver['valor_contrato'] = pd.to_numeric(contratos_silver['valor_contrato'], errors='coerce')
contratos_silver['saldo_devedor'] = pd.to_numeric(contratos_silver['saldo_devedor'], errors='coerce')
contratos_silver = contratos_silver.dropna(subset=['valor_contrato', 'saldo_devedor', 'prazo_meses', 'status_contrato'])

# Garantir tipo inteiro no prazo
contratos_silver['prazo_meses'] = contratos_silver['prazo_meses'].astype(int)


In [13]:
# tratamento transações
transacoes_silver = transacoes_bronze.copy()

# Conversão de data
transacoes_silver['data_transacao'] = pd.to_datetime(transacoes_silver['data_transacao'], errors='coerce')

# Validar FK (id_cliente existente)
transacoes_silver = transacoes_silver[transacoes_silver['id_cliente'].isin(clientes_silver['id_cliente'])]

# Remover registros incompletos
transacoes_silver = transacoes_silver.dropna(subset=['data_transacao', 'valor_transacao', 'tipo_transacao'])

# Tipagem e padronização
transacoes_silver['valor_transacao'] = pd.to_numeric(transacoes_silver['valor_transacao'], errors='coerce')
transacoes_silver['tipo_transacao'] = transacoes_silver['tipo_transacao'].str.lower().str.strip()


In [14]:
# salvando os aquivos na pasta silver
clientes_silver.to_csv('dados/silver/clientes_silver.csv', index=False)
contratos_silver.to_csv('dados/silver/contratos_silver.csv', index=False)
transacoes_silver.to_csv('dados/silver/transacoes_silver.csv', index=False)

print("Dados salvos com sucesso na camada Silver!")


Dados salvos com sucesso na camada Silver!


In [15]:
# Leitura da camada Silver
clientes_silver = pd.read_csv('dados/silver/clientes_silver.csv', parse_dates=['data_nascimento'])
contratos_silver = pd.read_csv('dados/silver/contratos_silver.csv', parse_dates=['data_contratacao'])
transacoes_silver = pd.read_csv('dados/silver/transacoes_silver.csv', parse_dates=['data_transacao'])

### Criação dos fatos e dimensões

In [16]:

dim_clientes = clientes_silver.copy()

# Adiciona campo de controle
dim_clientes['data_carga'] = pd.Timestamp.today()

# Salvando na pasta gold
dim_clientes.to_csv('dados/gold/dim_clientes.csv', index=False)


In [17]:
fato_contratos = contratos_silver.copy()

# Adiciona campo de controle
fato_contratos['data_carga'] = pd.Timestamp.today()

# Salvar
fato_contratos.to_csv('dados/gold/fato_contratos.csv', index=False)


In [18]:
# Criar coluna ano-mês (agregação)
transacoes_silver['ano_mes'] = transacoes_silver['data_transacao'].dt.to_period('M').dt.to_timestamp()

# Agregar
fato_transacoes = transacoes_silver.groupby(['id_cliente', 'ano_mes', 'tipo_transacao']) \
    .agg(valor_total=('valor_transacao', 'sum'),
         qtd_transacoes=('valor_transacao', 'count')) \
    .reset_index()

# Adicionar data da carga
fato_transacoes['data_carga'] = pd.Timestamp.today()

# Salvar
fato_transacoes.to_csv('dados/gold/fato_transacoes.csv', index=False)


In [21]:
print("Dimensão Clientes:")
display(dim_clientes.head())

print("Fato Contratos:")
display(fato_contratos.head())

print("Fato Transações (cliente/mês):")
display(fato_transacoes.head())


Dimensão Clientes:


Unnamed: 0,id_cliente,nome,cpf,data_nascimento,renda_mensal,regiao,data_carga
0,1,Kaique Novaes,12743589655,2004-09-29,8116.26,SUL,2025-03-28 16:57:48.352982
1,2,Nicole Farias,17094658320,1960-07-04,4485.26,SUL,2025-03-28 16:57:48.352982
2,3,Dr. Henrique Costela,75680129302,1969-08-10,12340.15,NORDESTE,2025-03-28 16:57:48.352982
3,4,Raquel Costa,96173045893,1986-09-07,3963.9,CENTRO-OESTE,2025-03-28 16:57:48.352982
4,5,Augusto Mendonça,35987140684,1989-01-20,9725.73,SUL,2025-03-28 16:57:48.352982


Fato Contratos:


Unnamed: 0,id_contrato,id_cliente,valor_contrato,saldo_devedor,data_contratacao,prazo_meses,status_contrato,data_carga
0,1,284,64082.95,48802.23,2024-02-24,36,liquidado,2025-03-28 16:57:50.265422
1,2,200,23469.5,21652.34,2020-06-26,60,inadimplente,2025-03-28 16:57:50.265422
2,3,581,74005.94,48737.01,2020-07-20,60,ativo,2025-03-28 16:57:50.265422
3,4,200,9394.41,7528.93,2020-07-11,60,inadimplente,2025-03-28 16:57:50.265422
4,5,408,11431.39,3714.05,2024-06-11,48,ativo,2025-03-28 16:57:50.265422


Fato Transações (cliente/mês):


Unnamed: 0,id_cliente,ano_mes,tipo_transacao,valor_total,qtd_transacoes,data_carga
0,1,2023-06-01,ted,4054.0,1,2025-03-28 16:57:52.140795
1,1,2023-08-01,doc,2170.21,1,2025-03-28 16:57:52.140795
2,1,2023-09-01,pagamento,2490.85,1,2025-03-28 16:57:52.140795
3,1,2023-09-01,ted,6615.52,1,2025-03-28 16:57:52.140795
4,1,2023-11-01,pagamento,940.03,1,2025-03-28 16:57:52.140795


In [19]:
# Parte 4
from pathlib import Path
import pandas as pd

def carregar_transacoes_incrementais(pasta_raw: str) -> pd.DataFrame:
    arquivos = sorted(Path(pasta_raw).glob("transacoes_*.csv"))
    lista_dfs = []

    for arquivo in arquivos:
        print(f"Lendo: {arquivo.name}")
        df = pd.read_csv(arquivo)
        df['data_transacao'] = pd.to_datetime(df['data_transacao'], errors='coerce')
        lista_dfs.append(df)

    return pd.concat(lista_dfs, ignore_index=True)

# Simulação de carga
transacoes_novas = carregar_transacoes_incrementais("dados/raw/")


Lendo: transacoes_2025-03-27.csv
Lendo: transacoes_2025-03-28.csv
Lendo: transacoes_financeiras.csv


In [51]:
def atualizar_fato_transacoes_scd2(transacoes_novas: pd.DataFrame, fato_antigo: pd.DataFrame = None):
    # 1. Agregação por cliente/mês/tipo
    transacoes_novas['ano_mes'] = transacoes_novas['data_transacao'].dt.to_period('M').dt.to_timestamp()

    fato_novo = transacoes_novas.groupby(['id_cliente', 'ano_mes', 'tipo_transacao']).agg(
        valor_total=('valor_transacao', 'sum'),
        qtd_transacoes=('valor_transacao', 'count')
    ).reset_index()

    # 2. Se não há fato anterior, é a primeira carga
    if fato_antigo is None or fato_antigo.empty:
        fato_novo['validade_inicio'] = pd.Timestamp.today()
        fato_novo['validade_fim'] = pd.NaT
        fato_novo['versao'] = 1
        return fato_novo

    # 3. Seleciona apenas as versões atuais (sem validade_fim)
    fato_atual = fato_antigo[fato_antigo['validade_fim'].isna()].copy()

    # 4. Faz merge com dados novos
    fato_merge = pd.merge(
        fato_novo,
        fato_atual,
        on=['id_cliente', 'ano_mes', 'tipo_transacao'],
        how='left',
        suffixes=('_novo', '_atual')
    )

    # 5. Identifica registros alterados ou novos
    registros_alterados = fato_merge[
        (fato_merge['valor_total_novo'] != fato_merge['valor_total_atual']) |
        (fato_merge['qtd_transacoes_novo'] != fato_merge['qtd_transacoes_atual']) |
        (fato_merge['versao'].isna())  # novos registros
    ]

    if registros_alterados.empty:
        print("Nenhuma alteração detectada. Nada a versionar.")
        return fato_antigo

    # 6. Fecha validade dos registros antigos que mudaram
    chaves_para_atualizar = registros_alterados[['id_cliente', 'ano_mes', 'tipo_transacao']]
    fato_antigo_atualizado = fato_antigo.copy()
    idxs_a_fechar = fato_antigo_atualizado.merge(
        chaves_para_atualizar,
        on=['id_cliente', 'ano_mes', 'tipo_transacao'],
        how='inner'
    ).index

    fato_antigo_atualizado.loc[idxs_a_fechar, 'validade_fim'] = pd.Timestamp.today()

    # 7. Cria novas versões
    novas_linhas = registros_alterados[['id_cliente', 'ano_mes', 'tipo_transacao', 'valor_total_novo', 'qtd_transacoes_novo']].copy()
    novas_linhas.columns = ['id_cliente', 'ano_mes', 'tipo_transacao', 'valor_total', 'qtd_transacoes']
    novas_linhas['validade_inicio'] = pd.Timestamp.today()
    novas_linhas['validade_fim'] = pd.NaT
    novas_linhas['versao'] = fato_antigo['versao'].max() + 1

    # 8. Retorna fato final com histórico
    fato_final = pd.concat([fato_antigo_atualizado, novas_linhas], ignore_index=True)
    return fato_final


In [52]:
# 1ª carga
fato_transacoes_scd2 = atualizar_fato_transacoes_scd2(transacoes_novas)

# Salvar após a 1ª carga
fato_transacoes_scd2.to_csv('dados/gold/fato_transacoes_historico.csv', index=False)

# 2ª carga (com o histórico anterior)
# Recarrega o histórico anterior
fato_anterior = pd.read_csv('dados/gold/fato_transacoes_historico.csv', parse_dates=['ano_mes', 'validade_inicio', 'validade_fim'])

# Nova simulação de carga (pode ser idêntica ou com mudança)
fato_transacoes_scd2_atualizada = atualizar_fato_transacoes_scd2(transacoes_novas, fato_anterior)

# Salvar nova versão
fato_transacoes_scd2_atualizada.to_csv('dados/gold/fato_transacoes_historico.csv', index=False)


### Fazendo mais simulações

### Vou criar dados fictícios mais completos e variados para simular uma carga realista com:

- 3 clientes

- 2 dias de transações

- Alterações reais para simular SCD Tipo 2 corretamente

In [21]:
# carga: dados originais
transacoes_dia1 = pd.DataFrame({
    'id_transacao': [1, 2, 3, 4],
    'id_cliente': [101, 102, 103, 101],
    'data_transacao': ['2025-03-25'] * 4,
    'valor_transacao': [100.0, 250.0, 400.0, 150.0],
    'tipo_transacao': ['pagamento', 'saque', 'pagamento', 'ted']
})

transacoes_dia1.to_csv('dados/raw/transacoes_2025-03-27.csv', index=False)

### Dia 2 – transacoes_2025-03-28.csv
Alterações:

- Cliente 101 fez novos pagamentos com valores maiores

- Cliente 103 repetiu uma transação (sem mudança)

- Cliente 102 não aparece

In [22]:
# simula alterações e novas transações
transacoes_dia2 = pd.DataFrame({
    'id_transacao': [5, 6, 7],
    'id_cliente': [101, 101, 103],
    'data_transacao': ['2025-03-25'] * 3,
    'valor_transacao': [200.0, 300.0, 400.0],  # cliente 101 mudou soma total, cliente 103 mantém
    'tipo_transacao': ['pagamento', 'ted', 'pagamento']
})

transacoes_dia2.to_csv('dados/raw/transacoes_2025-03-28.csv', index=False)


# testando as cargas

In [23]:
# carga 1

transacoes_dia1 = carregar_transacoes_incrementais("dados/raw/")
fato_transacoes_hist = atualizar_fato_transacoes_scd2(transacoes_dia1)
fato_transacoes_hist.to_csv('dados/gold/fato_transacoes_historico.csv', index=False)


Lendo: transacoes_2025-03-27.csv
Lendo: transacoes_2025-03-28.csv
Lendo: transacoes_financeiras.csv


In [24]:
# carga 2

fato_anterior = pd.read_csv('dados/gold/fato_transacoes_historico.csv', parse_dates=['ano_mes', 'validade_inicio', 'validade_fim'])
transacoes_dia2 = carregar_transacoes_incrementais("dados/raw/")
fato_atualizado = atualizar_fato_transacoes_scd2(transacoes_dia2, fato_anterior)
fato_atualizado.to_csv('dados/gold/fato_transacoes_historico.csv', index=False)


Lendo: transacoes_2025-03-27.csv
Lendo: transacoes_2025-03-28.csv
Lendo: transacoes_financeiras.csv


# Vizualizar o SCD

In [36]:

# Carregar a tabela com SCD
fato_transacoes = pd.read_csv("dados/gold/fato_transacoes_historico.csv", parse_dates=['ano_mes', 'validade_inicio', 'validade_fim'])

# Visualizar todas as versões de um cliente específico (ex: 101)
display(
    fato_transacoes.query("id_cliente == 101")
    .sort_values(["id_cliente", "ano_mes", "tipo_transacao", "versao"])
)


Unnamed: 0,id_cliente,ano_mes,tipo_transacao,valor_total,qtd_transacoes,validade_inicio,validade_fim,versao
981,101,2023-07-01,pagamento,2881.31,1,2025-03-28 18:38:50.196042,NaT,1
982,101,2023-10-01,TED,7105.43,1,2025-03-28 18:38:50.196042,NaT,1
983,101,2023-12-01,TED,5685.16,1,2025-03-28 18:38:50.196042,NaT,1
984,101,2023-12-01,saque,1198.23,1,2025-03-28 18:38:50.196042,NaT,1
985,101,2024-05-01,pagamento,3161.99,1,2025-03-28 18:38:50.196042,NaT,1
986,101,2024-06-01,TED,4044.77,1,2025-03-28 18:38:50.196042,NaT,1
987,101,2024-09-01,pagamento,1137.63,1,2025-03-28 18:38:50.196042,NaT,1
988,101,2024-10-01,DOC,8599.03,1,2025-03-28 18:38:50.196042,NaT,1
989,101,2025-03-01,pagamento,478.61,3,2025-03-28 18:38:50.196042,NaT,1
990,101,2025-03-01,saque,1210.12,1,2025-03-28 18:38:50.196042,NaT,1


In [28]:
# Ver histórico completo, ordenado
fato_ordenado = fato_transacoes.sort_values(by=["id_cliente", "ano_mes", "tipo_transacao", "versao"])
display(fato_ordenado)


Unnamed: 0,id_cliente,ano_mes,tipo_transacao,valor_total,qtd_transacoes,validade_inicio,validade_fim,versao
0,1,2023-06-01,TED,4054.00,1,2025-03-28 18:38:50.196042,2025-03-28 18:38:52.811486,1
1,1,2023-08-01,DOC,2170.21,1,2025-03-28 18:38:50.196042,2025-03-28 18:38:52.811486,1
2,1,2023-09-01,TED,6615.52,1,2025-03-28 18:38:50.196042,2025-03-28 18:38:52.811486,1
3,1,2023-09-01,pagamento,2490.85,1,2025-03-28 18:38:50.196042,2025-03-28 18:38:52.811486,1
4,1,2023-11-01,TED,3297.44,1,2025-03-28 18:38:50.196042,2025-03-28 18:38:52.811486,1
...,...,...,...,...,...,...,...,...
9536,1000,2024-07-01,pagamento,599.64,1,2025-03-28 18:38:50.196042,NaT,1
9537,1000,2024-08-01,DOC,6510.32,1,2025-03-28 18:38:50.196042,NaT,1
9538,1000,2024-09-01,pagamento,3695.32,1,2025-03-28 18:38:50.196042,NaT,1
9539,1000,2024-11-01,pagamento,4792.85,1,2025-03-28 18:38:50.196042,NaT,1


# teste

In [37]:
from pathlib import Path
import pandas as pd

# Função: carregar todos os arquivos de transações incrementalmente
def carregar_transacoes_incrementais(pasta_raw: str) -> list:
    arquivos = sorted(Path(pasta_raw).glob("transacoes_*.csv"))

    # Mantém só os arquivos com nome no padrão de data
    arquivos_filtrados = []
    for arq in arquivos:
        nome = arq.stem.replace("transacoes_", "")
        try:
            pd.to_datetime(nome, format="%Y-%m-%d")  # tenta converter
            arquivos_filtrados.append(arq)
        except:
            print(f"Ignorado (nome fora do padrão): {arq.name}")
            continue

    print(f"Arquivos válidos encontrados: {[a.name for a in arquivos_filtrados]}")
    return arquivos_filtrados


# Função: processar SCD Tipo 2 arquivo por arquivo
def atualizar_fato_transacoes_scd2(lista_arquivos: list) -> pd.DataFrame:
    fato_historico = pd.DataFrame()
    versao = 1

    for arquivo in lista_arquivos:
        print(f"Processando arquivo: {arquivo.name}")
        df = pd.read_csv(arquivo, parse_dates=['data_transacao'])

        # Agregação por cliente, mês e tipo
        df['ano_mes'] = df['data_transacao'].dt.to_period('M').dt.to_timestamp()
        fato_novo = df.groupby(['id_cliente', 'ano_mes', 'tipo_transacao']).agg(
            valor_total=('valor_transacao', 'sum'),
            qtd_transacoes=('valor_transacao', 'count')
        ).reset_index()

        # Se primeira carga
        if fato_historico.empty:
            fato_novo['validade_inicio'] = pd.Timestamp(arquivo.stem.split("_")[1])
            fato_novo['validade_fim'] = pd.NaT
            fato_novo['versao'] = versao
            fato_historico = fato_novo.copy()
            continue

        # Seleciona registros ativos
        fato_atual = fato_historico[fato_historico['validade_fim'].isna()].copy()

        # Merge para detectar mudanças
        chaves = ['id_cliente', 'ano_mes', 'tipo_transacao']
        fato_merge = pd.merge(fato_novo, fato_atual, on=chaves, how='left', suffixes=('_novo', '_atual'))

        # Verifica se mudou ou é novo
        mudou = (
            (fato_merge['valor_total_novo'] != fato_merge['valor_total_atual']) |
            (fato_merge['qtd_transacoes_novo'] != fato_merge['qtd_transacoes_atual']) |
            (fato_merge['valor_total_atual'].isna())
        )

        novas_versoes = fato_merge.loc[mudou, chaves + ['valor_total_novo', 'qtd_transacoes_novo']].copy()
        novas_versoes.columns = chaves + ['valor_total', 'qtd_transacoes']
        versao += 1
        data_execucao = pd.Timestamp(arquivo.stem.split("_")[1])
        novas_versoes['validade_inicio'] = data_execucao
        novas_versoes['validade_fim'] = pd.NaT
        novas_versoes['versao'] = versao

        # Fecha validade das versões antigas
        fato_temp = fato_historico.copy()
        idxs_a_fechar = fato_temp.merge(novas_versoes[chaves], on=chaves, how='inner').index
        fato_temp.loc[idxs_a_fechar, 'validade_fim'] = data_execucao

        # Junta ao histórico
        fato_historico = pd.concat([fato_temp, novas_versoes], ignore_index=True)

    return fato_historico.sort_values(by=chaves + ['versao']).reset_index(drop=True)

# Rodar o pipeline completo
arquivos_transacoes = carregar_transacoes_incrementais("dados/raw/")
fato_transacoes_historico = atualizar_fato_transacoes_scd2(arquivos_transacoes)

# salvar:
fato_transacoes_historico.to_csv("dados/gold/fato_transacoes_historico2.csv", index=False)

# Exibir resultado final
fato_transacoes_historico.head()


Ignorado (nome fora do padrão): transacoes_financeiras.csv
Arquivos válidos encontrados: ['transacoes_2025-03-27.csv', 'transacoes_2025-03-28.csv']
Processando arquivo: transacoes_2025-03-27.csv
Processando arquivo: transacoes_2025-03-28.csv


Unnamed: 0,id_cliente,ano_mes,tipo_transacao,valor_total,qtd_transacoes,validade_inicio,validade_fim,versao
0,101,2025-03-01,pagamento,100.0,1,2025-03-27,2025-03-28,1
1,101,2025-03-01,pagamento,200.0,1,2025-03-28,NaT,2
2,101,2025-03-01,ted,150.0,1,2025-03-27,2025-03-28,1
3,101,2025-03-01,ted,300.0,1,2025-03-28,NaT,2
4,102,2025-03-01,saque,250.0,1,2025-03-27,NaT,1


# Parte 5

# Visões

In [70]:
# Ver registros vigentes (atuais)
fato_versao_atual = fato_transacoes_historico[fato_transacoes_historico['validade_fim'].isna()].copy()
fato_versao_atual = fato_versao_atual.sort_values(by=['id_cliente', 'ano_mes', 'tipo_transacao'])

# Visualização no Jupyter
fato_versao_atual.head(20)  # Mostra os 20 primeiros


Unnamed: 0,id_cliente,ano_mes,tipo_transacao,valor_total,qtd_transacoes,validade_inicio,validade_fim,versao
1,101,2025-03-01,pagamento,200.0,1,2025-03-28,NaT,2
3,101,2025-03-01,ted,300.0,1,2025-03-28,NaT,2
4,102,2025-03-01,saque,250.0,1,2025-03-27,NaT,1
5,103,2025-03-01,pagamento,400.0,1,2025-03-27,NaT,1


In [44]:
# Agrupar e contar quantas versões existem por cliente/tipo
visao_historica = (
    fato_transacoes_historico
    .groupby(['id_cliente', 'tipo_transacao'])
    .agg(qtd_versoes=('versao', 'nunique'))
    .reset_index()
    .sort_values('qtd_versoes', ascending=False)
)

# Exibir a visão
visao_historica.head(20)


Unnamed: 0,id_cliente,tipo_transacao,qtd_versoes
0,101,pagamento,2
1,101,ted,2
2,102,saque,1
3,103,pagamento,1


In [47]:
import os
import pandas as pd

# Lista dos arquivos CSV existentes
arquivos_csv = {
    "fato_transacoes": "dados/gold/fato_transacoes.csv",
    "dim_clientes": "dados/gold/dim_clientes.csv",
    "fato_contratos": "dados/gold/fato_contratos.csv"
}

# Criar pasta de saída para Parquet
pasta_saida = "dados/gold/parquet"
os.makedirs(pasta_saida, exist_ok=True)

# Exportar cada CSV para Parquet
for nome, caminho_csv in arquivos_csv.items():
    try:
        df = pd.read_csv(caminho_csv, parse_dates=True)
        caminho_parquet = os.path.join(pasta_saida, f"{nome}.parquet")
        df.to_parquet(caminho_parquet, index=False)
        print(f"{nome}.parquet salvo em: {caminho_parquet}")
    except Exception as e:
        print(f"Erro ao converter {nome}: {e}")


fato_transacoes.parquet salvo em: dados/gold/parquet\fato_transacoes.parquet
dim_clientes.parquet salvo em: dados/gold/parquet\dim_clientes.parquet
fato_contratos.parquet salvo em: dados/gold/parquet\fato_contratos.parquet


# Banco de Dados

In [58]:
!pip install pymysql




In [67]:
import pandas as pd
from sqlalchemy import create_engine

# Dados de conexão MySQL
usuario = "root"
senha = "root"
host = "localhost"
porta = 3306
banco = "Engenharia_credisis"

# Criar conexão SQLAlchemy
engine = create_engine(f"mysql+pymysql://{usuario}:{senha}@{host}:{porta}/{banco}")

# Carregar e ajustar os DataFrames
dim_clientes = pd.read_csv("dados/gold/dim_clientes.csv", parse_dates=['data_nascimento'])
fato_contratos = pd.read_csv("dados/gold/fato_contratos.csv", parse_dates=['data_contratacao'])
fato_transacoes = pd.read_csv("dados/gold/fato_transacoes_historico.csv", parse_dates=['ano_mes', 'validade_inicio', 'validade_fim'])

# Garantir tipos corretos
dim_clientes['id_cliente'] = dim_clientes['id_cliente'].astype(int)
fato_contratos['id_cliente'] = fato_contratos['id_cliente'].astype(int)
fato_contratos['id_contrato'] = fato_contratos['id_contrato'].astype(int)
fato_transacoes['id_cliente'] = fato_transacoes['id_cliente'].astype(int)

# Enviar para o banco de dados
dim_clientes.to_sql("dim_clientes", con=engine, if_exists="append", index=False)
fato_contratos.to_sql("fato_contratos", con=engine, if_exists="append", index=False)
fato_transacoes.to_sql("fato_transacoes", con=engine, if_exists="append", index=False)

print("Tabelas carregadas com sucesso no MySQL.")


Tabelas carregadas com sucesso no MySQL (modo append).


# pegando os dados do banco de dados e fazendo um teste de salvar os dados vindo do banco de dados

In [72]:
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
import os

# Configurações de conexão
usuario = "root"
senha = "root"
host = "localhost"
porta = 3306
banco = "Engenharia_credisis"

# Criar conexão SQLAlchemy
engine = create_engine(f"mysql+pymysql://{usuario}:{senha}@{host}:{porta}/{banco}")

# Diretório de saída
pasta_export = "dados/raw/teste_salvando_dados_bd"
os.makedirs(pasta_export, exist_ok=True)

# Data atual formatada
data_hoje = datetime.today().strftime("%Y-%m-%d")

# Função automatizada
def exportar_com_data(tabela):
    caminho_csv = os.path.join(pasta_export, f"{tabela}_{data_hoje}.csv")
    print(f"Exportando {tabela} para {caminho_csv}")
    df = pd.read_sql(f"SELECT * FROM {tabela}", con=engine)
    df.to_csv(caminho_csv, index=False)
    print(f"{tabela} exportada com sucesso!")

# Tabelas a exportar
tabelas = ["dim_clientes", "fato_contratos", "fato_transacoes"]

for tabela in tabelas:
    exportar_com_data(tabela)


Exportando dim_clientes para dados/raw/teste_salvando_dados_bd\dim_clientes_2025-03-28.csv
dim_clientes exportada com sucesso!
Exportando fato_contratos para dados/raw/teste_salvando_dados_bd\fato_contratos_2025-03-28.csv
fato_contratos exportada com sucesso!
Exportando fato_transacoes para dados/raw/teste_salvando_dados_bd\fato_transacoes_2025-03-28.csv
fato_transacoes exportada com sucesso!


# Parte 6
- Algumas partes como arquitetura vai está no readme.md

### Controle básico de Qualidade de Dados (Data Quality Checks)

In [75]:
def checar_qualidade(df: pd.DataFrame, tabela: str):
    print(f"\n Verificando qualidade da tabela: {tabela}")
    
    # Registros totais
    print(f"Total de registros: {len(df)}")

    # Verificar nulos
    nulos = df.isna().sum()
    if nulos.any():
        print("Colunas com valores nulos:")
        print(nulos[nulos > 0])
    else:
        print("Sem valores nulos.")

    # Verificar duplicados (baseado em PK sugerida)
    if 'id_cliente' in df.columns:
        duplicados = df.duplicated(subset=['id_cliente']).sum()
        print(f"Duplicados por id_cliente: {duplicados}")


In [74]:
checar_qualidade(dim_clientes, "dim_clientes")
checar_qualidade(fato_contratos, "fato_contratos")
checar_qualidade(fato_transacoes, "fato_transacoes")


 Verificando qualidade da tabela: dim_clientes
Total de registros: 1000
Sem valores nulos.
Duplicados por id_cliente: 0

 Verificando qualidade da tabela: fato_contratos
Total de registros: 3000
Sem valores nulos.
Duplicados por id_cliente: 2060

 Verificando qualidade da tabela: fato_transacoes
Total de registros: 9597
⚠Colunas com valores nulos:
validade_fim    9541
dtype: int64
Duplicados por id_cliente: 8598


### Como podemos ver a dim_clientes não tem clientes repitidos que o certo e as demais tem clientes e dados repitidos o que é normal pois um usuario vai fazer mais de uma ação

## Fluxo básico Orquestrado com Apache Airflow

- o apache airflow não está instalado em ambientes notebooks e teria uma estrutura maior de conxeção, mas abaixo tem um códifo básico de como imagino que seria:

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def ingestao_raw(): print("Ingestão de dados brutos...")
def bronze_transform(): print("Transformação Bronze...")
def silver_process(): print("Tratamento Silver...")
def gold_export(): print("Exportação Gold...")
def carregar_mysql(): print("Carga MySQL final...")

with DAG("pipeline_engenharia_dados", start_date=datetime(2025, 3, 27), schedule_interval="@daily", catchup=False) as dag:
    tarefa_1 = PythonOperator(task_id="ingestao_raw", python_callable=ingestao_raw)
    tarefa_2 = PythonOperator(task_id="transform_bronze", python_callable=bronze_transform)
    tarefa_3 = PythonOperator(task_id="process_silver", python_callable=silver_process)
    tarefa_4 = PythonOperator(task_id="export_gold", python_callable=gold_export)
    tarefa_5 = PythonOperator(task_id="salvar_mysql", python_callable=carregar_mysql)

    tarefa_1 >> tarefa_2 >> tarefa_3 >> tarefa_4 >> tarefa_5
