In [None]:
import pandas as pd
import glob
from pathlib import Path
import logging
from datetime import datetime
import sys

sys.path.append('..')
from config.settings import DATA_SOURCES, QUALITY_CHECKS, PROCESSING_CONFIG, AGIBANK_FILTERS

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

---

In [None]:
"""
Esta função assegura que os arquivos CSV esperados estão realmente disponíveis no diretório. Observações técnicas:

1. Usa glob.glob() para listar os arquivos.
2. Conta o total encontrado e dispara um erro (FileNotFoundError) se não houver nenhum.
3. Utiliza Path(file).name para extrair o nome do arquivo isolado (sem o caminho completo).
4. A verificação "2025" not in Path(file).name cria uma sanity check básica para garantir que o arquivo é do período correto.
"""
def validate_files():
    """Task 1: Validação dos arquivos disponíveis"""
    logger.info("Validando arquivos disponíveis...")

    consumidor_files = glob.glob("../data/bronze/consumidor_gov/*.csv") # ⚠️ DATA_SOURCES['bronze']['consumidor_gov']
    print(f"Temos {len(consumidor_files)} arquivos do Consumidor.gov")

    if len(consumidor_files) == 0:
        raise FileNotFoundError("Nenhum arquivo Consumidor.gov encontrado!")
    
    for file in consumidor_files:
        if "2025" not in Path(file).name:
            logger.warning(f"Arquivo pode não ser de 2025: {Path(file).name}")

    logger.info(f"Validação de arquivos concluída:")
    return consumidor_files


"""
Esta função tem como objetivo inspecionar a estrutura interna de um dos arquivos (colunas, formatos e dimensões).
"""
def explore_data_structure(file_path):
    """Task 2: Exploração da estrutura dos dados"""
    logger.info(f"Explorando estrutura: {Path(file_path).name}")

    sample_df = pd.read_csv(file_path, encoding='utf-8')

    info = {
        'columns': list(sample_df.columns),
        'dtypes': sample_df.dtypes.to_dict(),
        'sample_shape': sample_df.shape,
        'file_name': Path(file_path).name
    }

    logger.info(f"  Colunas: {len(info['columns'])}")
    logger.info(f"  Primeiras colunas: {info['columns'][:5]}")
    logger.info(f"  Info dtype: {info['dtypes']}")

    return info


"""
Adiciona colunas úteis de rastreabilidade e contexto, permitindo saber:

1. De onde o dado veio (data_source)
2. Que arquivo originou (file_origin)
3. Quando foi processado (processed_at)
4. Qual período ele representa (file_month)
5. Flag de algum filtro específico (is_agibank)
"""
def add_metadata_columns(df, file_path, source_type):
    """Task 3: Adicionar colunas de metadados"""
    logger.info("Adicionando metadados...")

    file_name = Path(file_path).name

    df['data_source'] = source_type
    df['file_origin'] = file_name
    df['processed_at'] = datetime.now()

    if source_type == 'consumidor_gov':
        try:
            month_year = file_name.split('2025-')[1].split('.')[0]
            df['file_month'] = f"{month_year}/2025"
        except:
            df['file_month'] = 'Desconhecido'

    df['is_agibank'] = False

    logger.info("Metadados adicionados com sucessso")
    logger.info(df.columns[-5:])
    return df


"""
Identificar quais registros dentro do dataset pertencem ao Agibank e marcar esses registros na coluna is_agibank.
Observações técnicas:

1. O condicionamento 'if company_col' entra caso a função tenha encontrado uma coluna que represente o nome da empresa.
2. O operador `'|'.join(lista)` junta os elementos da lista colocando o caractere pipe (|) entre eles.
3. `.str.contains()` aplica busca por strings em todas as linhas de uma coluna simultaneamente.
"""
def filter_agibank_records(df):
    """Task 4: Identificar e marcar registros do Agibank"""
    logger.info("Identificando registros Agibank...")

    possible_company_cols = ['Nome Fantasia']
    company_col = None

    for col in possible_company_cols:
        if col in df.columns:
            company_col = col
            break

    if company_col:
        agibank_mask = df[company_col].str.contains(
            '|'.join(AGIBANK_FILTERS['bank_names']),
            case=False,
            na=False
        )

        df['is_agibank'] = agibank_mask
        agibank_count = agibank_mask.sum()

        logger.info(f"Registros Agibank encontrados: {agibank_count}")
        logger.info(f"Percentual Agibank: {(agibank_count/len(df)*100):.2f}%")
    else:
        logger.warning("Coluna de empresa não identificada")

    return df


"""
Essa função usa os parâmetros configurados em QUALITY_CHECKS pra avaliar a qualidade dos dados do arquivo em ingestão.
"""
def quality_check(df, file_name):
    """Task 5: Verificação de qualidade dos dados"""
    logger.info(f"Executando verificações de qualidade: {file_name}")

    issues = []


    if len(df) < QUALITY_CHECKS['min_rows_expected']:
        issues.append(f"Poucas linhas: {len(df)} < {QUALITY_CHECKS['min_rows_expected']}")


    null_percentages = df.isnull().mean()
    high_null_cols = null_percentages[null_percentages > QUALITY_CHECKS['max_null_percentage']]

    if not high_null_cols.empty:
        issues.append(f"Colunas com muitos nulos: {list(high_null_cols)}")


    duplicates = df.duplicated().sum()
    if duplicates > 0:
        issues.append(f"Registros duplicados: {duplicates}")

    if issues:
        for issue in issues:
            logger.warning(f"{issue}")
    else:
        logger.info("Todos os checks de qualidade passaram")

    df['quality_score'] = 1.0 - (len(issues) * 0.1)

    return df, issues


"""
O objetivo é percorrer cada arquivo CSV dentro da pasta bronze e processá‑lo separadamente utilizando as funções estruturadas anteriormente.
"""
def process_consumidor_gov():
    """Task 6: Processamento completo Consumidor.gov"""
    logger.info("Iniciando processamento Consumidor.gov...")

    consumidor_files = glob.glob("../data/bronze/consumidor_gov/*.csv")
    all_dataframes = []
    all_issues = []

    for file_path in sorted(consumidor_files): # assegura que a lista 'consumidor_files' esteja em ordem alfabetica e/ou numérica
        logger.info(f"  Processando: {Path(file_path).name}")

        try:
            
            df = pd.read_csv(file_path, encoding='utf-8')
            original_rows = len(df)

            df = add_metadata_columns(df, file_path, 'consumidor_gov')
            df = filter_agibank_records(df)
            df, issues = quality_check(df, Path(file_path).name)

            all_issues.extend(issues)
            all_dataframes.append(df)

            logger.info(f"  {Path(file_path).name}: {original_rows} -> {len(df)} registros")

        except Exception as e:
            logger.error(f" Erro processando {Path(file_path).name}: {str(e)}")
            continue # faz o loop seguir para o próximo arquivo sem parar a execução global

    if all_dataframes:
        combined_df = pd.concat(all_dataframes, ignore_index= True)
        logger.info(f"Consumidor.gov processado {len(combined_df)} registros totais")
        return combined_df, all_issues
    else:
        raise Exception("Nenhum arquivo foi processado com sucesso!")


"""
Essa função salva o DataFrame consolidado em disco. Evita que o script falhe caso rode em um novo ambiente vazio ou num servidor limpo.
"""
def save_bronze_output(df, output_path):
    """Task 7: Salvar dados processados"""
    logger.info(f"Salvando dados bronze: {output_path}")

    Path(output_path).parent.mkdir(parents=True, exist_ok=True)

    df.to_csv(output_path, index=False, encoding='utf-8')

    logger.info(f"Total de registros: {len(df)}")
    logger.info(f"Registro Agibank: {df['is_agibank'].sum()}")
    logger.info(f"Colunas: {len(df.columns)}")

    return True



---

In [None]:
"""
Esta inicia toda a pipeline bronze quando você usa o arquivo como script independendente.
"""
def bronze_dag():
    """DAG principal da camada bronze"""
    logger.info("Iniciando DAG Bronze...")
    start_time = datetime.now()

    try:
        consumidor_files = validate_files() # retorna uma array de arquivos

        if consumidor_files:
            explore_data_structure(consumidor_files[0]) # quick check de colunas e tipos no primeiro arquivo (verificação amostral)

            df_consumidor, issues = process_consumidor_gov()

            save_bronze_output(df_consumidor, "../data/silver/consumidor_gov_bronze.csv")

            end_time = datetime.now()
            duration = end_time - start_time

            logger.info("-"*70)
            logger.info("RELATÓRIO BRONZE DAG\n")
            logger.info(f"Duração: {duration}")
            logger.info(f"Registros processados: {len(df_consumidor)}")
            logger.info(f"Registros Agibank: {df_consumidor['is_agibank'].sum()}")
            logger.info(f"Issues de qualidade: {len(issues)}")
            logger.info("\nDAG Bronze concluida com sucesso!")
            logger.info("-"*70)

    except Exception as e:
        logger.error(f"Erro DAG Bronze: {str(e)}")
        raise

if __name__ == "__main__":
    bronze_dag()

---

In [21]:
validate_files()

2026-01-21 17:51:52,087 - INFO - Validando arquivos disponíveis...
2026-01-21 17:51:52,090 - INFO - Validação de arquivos concluída:


Temos 12 arquivos do Consumidor.gov


['../data/bronze/consumidor_gov\\basecompleta2025-01.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-02.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-03.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-04.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-05.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-06.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-07.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-08.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-09.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-10.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-11.csv',
 '../data/bronze/consumidor_gov\\basecompleta2025-12.csv']