# 4 - Mineração de Dados: Detecção de Anomalias Baseada em Regras

Este notebook implementa um **sistema de classificação baseado em regras** (Rule-Based System) fundamentado em:

1. **Russell e Norvig (2020)**: Sistemas baseados em conhecimento especialista
2. **Chandola et al. (2009)**: Taxonomia de detecção de anomalias
3. **Lei 14.133/2021**: Framework legal para licitações públicas

## Por que Regras ao invés de Aprendizado de Máquina?

**Justificativa metodológica:**

- ✅ **Ausência de dados rotulados**: Não há histórico de licitações classificadas como "fraudulentas" ou "regulares"
- ✅ **Explicabilidade**: Auditores precisam entender exatamente por que uma licitação foi sinalizada
- ✅ **Alinhamento legal**: Regras derivadas diretamente da Lei 14.133/2021
- ✅ **Transparência**: Cada decisão é rastreável e justificável

> "Sistemas baseados em regras utilizam conhecimento especialista codificado em regras lógicas para classificação de instâncias, sendo particularmente adequados quando a explicabilidade é um requisito crítico" (RUSSELL; NORVIG, 2020, p. 287)


### Configuração

**Bibliotecas utilizadas.**
- `pandas`/`numpy`: manipulação vetorial.
- `Path`: gerenciamento de caminhos portável.
- `RobustScaler` (scikit-learn): normalização robusta definida por \(z_i = 
rac{x_i - 	ext{mediana}}{	ext{IQR}}\), onde `IQR = Q3 - Q1`. Esta técnica reduz o impacto de outliers, essenciais em valores monetários.

**Rotinas adicionais.**
- Configurações de exibição (`max_columns`, `float_format`).
- Garantia de existência do diretório `exports/` para salvar os resultados da mineração.


In [1]:

import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.preprocessing import RobustScaler

pd.options.display.max_columns = 50
pd.options.display.float_format = lambda x: f"{x:,.2f}"

exports_dir = Path('exports')
exports_dir.mkdir(exist_ok=True)



### Carregamento do dataset pré-processado

O código a seguir:
1. Monta o `Path` para `exports/licitacoes_informatica.csv` (saída do notebook de pré-processamento).
2. Verifica a existência do arquivo (pré-condição para esta etapa do KDD).
3. Carrega os dados com `pd.read_csv(..., sep=';')` e imprime uma amostra inicial.

**Variáveis principais.**
- `df`: `DataFrame` com 100% dos registros filtrados para o nicho de TI.
- `dataset_path`: referência reutilizada na etapa de exportação final.


In [2]:

dataset_path = exports_dir / '2_licitacoes_informatica.csv'

if not dataset_path.exists():
    raise FileNotFoundError('Execute o notebook `pre-processamento.ipynb` até a etapa de exportação antes de prosseguir.')

df = pd.read_csv(dataset_path, sep=';')
print(f'Dados carregados: {len(df):,} linhas')
df.head(3)


Dados carregados: 836 linhas


Unnamed: 0,numprocesso,ano,modalidade,situacao,objeto,orgao,criteriojulgamento,valor_estimado,valor_ou_desconto_homologado,dataabertura,datahomologacao,arquivo,objeto_normalizado,diferenca_valor,variacao_percentual,proporcao_homologado_estimado,flag_homologado_acima_estimado,flag_houve_desconto,lead_time_dias,criterio_menor_preco,flag_situacao_certame,flag_informatica
0,4,2025,Audiência Pública,Fase Certame,Audiência Pública que visa subsidiar o process...,SEAP-Secretaria de Estado da Administração e d...,Menor Preço,0.0,,2025-07-08,,TB_LICITACOES-2025.csv,audiência pública que visa subsidiar o process...,0.0,0.0,0.0,False,True,0.0,True,True,True
1,10,2025,Audiência Pública,Fase Certame,"relativo a contratação de empresa, ou consórci...",SGSD-Superintendência Geral de Governança de S...,Menor Preço,0.0,,2025-09-11,,TB_LICITACOES-2025.csv,"relativo a contratação de empresa, ou consórci...",0.0,0.0,0.0,False,True,0.0,True,True,True
2,23,2025,Chamamento Público,Fase Certame,Credenciamento de Permissionários para exercer...,DER-Departamento de Estradas de Rodagem do Est...,-,0.0,,2025-09-17,,TB_LICITACOES-2025.csv,credenciamento de permissionários para exercer...,0.0,0.0,0.0,False,True,0.0,False,True,True


### Criação de Flags de Risco

As flags binárias são derivadas diretamente dos requisitos da **Lei 14.133/2021**:

| Flag | Art. Lei 14.133 | Interpretação |
|------|----------------|---------------|
| `flag_homologado_acima_estimado` | Art. 23, 59 | Valor homologado > estimado indica possível sobrepreço |
| `flag_sem_desconto` | Art. 34 | Ausência de desconto sugere baixa competitividade |
| `flag_nao_menor_preco` | Art. 34 | Critério diferente requer justificativa técnica |
| `flag_certame_em_andamento` | - | Processo incompleto (menor confiabilidade) |

**Referências:**
- Brasil (2021): Lei 14.133/2021
- Torres-Berru e Batista (2021): Parâmetros de avaliação de risco

In [3]:

columns_expected = [
    'diferenca_valor',
    'variacao_percentual',
    'proporcao_homologado_estimado',
    'lead_time_dias',
    'flag_homologado_acima_estimado',
    'flag_houve_desconto',
    'criterio_menor_preco',
    'flag_situacao_certame',
    'flag_informatica'
]
missing_cols = [col for col in columns_expected if col not in df.columns]
if missing_cols:
    raise ValueError(f'Colunas ausentes no dataset pré-processado: {missing_cols}')

risk_df = pd.DataFrame(index=df.index)

# Componentes numéricos: foca em sobrepreço (diferença negativa) e prazos longos.
risk_df['sobrepreco_valor'] = np.clip(-df['diferenca_valor'], a_min=0, a_max=None)
risk_df['sobrepreco_percentual'] = np.clip(-df['variacao_percentual'], a_min=0, a_max=None)
risk_df['lead_time_dias'] = df['lead_time_dias']

risk_df = risk_df.fillna(0)

scaler = RobustScaler()
risk_numeric = scaler.fit_transform(risk_df)
risk_numeric = np.clip(risk_numeric, a_min=0, a_max=None)
risk_numeric = pd.DataFrame(risk_numeric, columns=risk_df.columns, index=risk_df.index)
risk_numeric = risk_numeric.div(risk_numeric.max().replace(0, 1))

# Flags convertidas para booleanos (True sinaliza potencial risco)
flags = pd.DataFrame({
    'flag_homologado_acima_estimado': df['flag_homologado_acima_estimado'].astype(bool),
    'flag_certame_em_andamento': df['flag_situacao_certame'].astype(bool),
    'flag_informatica': df['flag_informatica'].astype(bool),
    'flag_sem_desconto': ~df['flag_houve_desconto'].astype(bool),
    'flag_nao_menor_preco': ~df['criterio_menor_preco'].astype(bool)
}, index=df.index)

flag_columns = flags.columns.tolist()

# Garante que as flags estejam disponíveis no DataFrame principal
for col in flag_columns:
    df[col] = flags[col]

risk_numeric.head()

Unnamed: 0,sobrepreco_valor,sobrepreco_percentual,lead_time_dias
0,0.0,0.0,0.0
1,0.0,0.0,0.0
2,0.0,0.0,0.0
3,0.0,0.0,0.06
4,0.0,0.0,0.0


### Sistema de Classificação Baseado em Regras

Implementação de um sistema de detecção de anomalias fundamentado em múltiplos critérios, conforme literatura especializada e framework legal.

#### Definição dos Thresholds (Valores de Corte)

**1. Sobrepreço Significativo: 10%**

Fundamentação:
- **Lei 14.133/2021, Art. 59, III**: Valor estimado como teto da contratação
- **TCU (2023)**: Manual de Auditoria considera sobrepreço acima de 10% como significativo
- **Torres-Berru e Batista (2021)**: Parâmetros anômalos em aquisições públicas

**2. Lead Time Anômalo: Percentil 95**

Fundamentação:
- **Chandola et al. (2009)**: Outliers estatísticos como indicadores de anomalia
- **Carvalho e Filho (2024)**: Prazos excessivos podem indicar direcionamento

**3. Múltiplos Sinais: 3 ou mais flags**

Fundamentação:
- **Potin et al. (2023)**: Combinação de padrões aumenta acurácia na detecção de fraudes
- **Torres-Berru e Batista (2021)**: Múltiplos indicadores simultâneos elevam risco

#### Cálculo dos Thresholds Empíricos

In [4]:
# Threshold 1: Sobrepreço de 10% (fundamentado em TCU 2023)
THRESHOLD_SOBREPRECO_PCT = 10.0

# Threshold 2: Lead time no percentil 95 (Chandola et al., 2009)
# Apenas licitações com lead_time > 0 (já finalizadas)
lead_times_validos = df[df['lead_time_dias'] > 0]['lead_time_dias']
THRESHOLD_LEAD_TIME_P95 = lead_times_validos.quantile(0.95)

# Threshold 3: Mínimo de 3 sinais simultâneos (Potin et al., 2023)
THRESHOLD_MIN_SINAIS = 3

print(f"Thresholds definidos:")
print(f"  - Sobrepreço: {THRESHOLD_SOBREPRECO_PCT}%")
print(f"  - Lead time (P95): {THRESHOLD_LEAD_TIME_P95:.1f} dias")
print(f"  - Mínimo de sinais: {THRESHOLD_MIN_SINAIS}")

Thresholds definidos:
  - Sobrepreço: 10.0%
  - Lead time (P95): 142.2 dias
  - Mínimo de sinais: 3


#### Função de Classificação Multi-Critério

In [5]:
def classificar_licitacao(row):
    """
    Classifica licitação quanto à presença de anomalias utilizando sistema baseado em regras.
    
    Fundamentação:
    - Russell & Norvig (2020): Sistemas baseados em conhecimento
    - Lei 14.133/2021: Framework legal
    - Torres-Berru & Batista (2021): Parâmetros de risco
    
    Critérios implementados:
    1. Sobrepreço grave (> 10% acima do estimado)
    2. Ausência de competitividade (não menor preço + sem desconto)
    3. Lead time anômalo (> percentil 95)
    4. Combinação de múltiplos sinais (≥ 3 flags)
    
    Retorna:
        tuple: (classificacao, justificativa)
            - classificacao: 'Anomalia Crítica', 'Suspeita Moderada' ou 'Regular'
            - justificativa: Lista de critérios violados
    """
    justificativas = []
    
    # CRITÉRIO 1: Sobrepreço Significativo
    # Art. 23, Lei 14.133: valor estimado é teto da contratação
    sobrepreco_grave = (
        row['flag_homologado_acima_estimado'] == True and
        abs(row['variacao_percentual']) > THRESHOLD_SOBREPRECO_PCT
    )
    if sobrepreco_grave:
        justificativas.append(
            f"Sobrepreço de {abs(row['variacao_percentual']):.1f}% "
            f"(acima do limite de {THRESHOLD_SOBREPRECO_PCT}%)"
        )
    
    # CRITÉRIO 2: Ausência de Competitividade
    # Art. 34, Lei 14.133: critério diferente de menor preço requer justificativa
    sem_competitividade = (
        row['flag_nao_menor_preco'] == True and
        row['flag_sem_desconto'] == True
    )
    if sem_competitividade:
        justificativas.append(
            "Ausência de competitividade: critério diferente de menor preço "
            "sem desconto obtido"
        )
    
    # CRITÉRIO 3: Lead Time Anômalo
    # Apenas avalia se houver lead_time válido (> 0)
    lead_time_anomalo = False
    if row['lead_time_dias'] > 0:
        lead_time_anomalo = row['lead_time_dias'] > THRESHOLD_LEAD_TIME_P95
        if lead_time_anomalo:
            justificativas.append(
                f"Prazo excessivo: {row['lead_time_dias']:.0f} dias "
                f"(P95: {THRESHOLD_LEAD_TIME_P95:.0f} dias)"
            )
    
    # CRITÉRIO 4: Combinação de Sinais de Alerta
    sinais_ativos = sum([
        row['flag_homologado_acima_estimado'],
        row['flag_sem_desconto'],
        row['flag_nao_menor_preco'],
        row['flag_certame_em_andamento']
    ])
    
    multiplos_sinais = sinais_ativos >= THRESHOLD_MIN_SINAIS
    if multiplos_sinais:
        justificativas.append(
            f"Múltiplos sinais de alerta: {sinais_ativos} flags ativas"
        )
    
    # DECISÃO DE CLASSIFICAÇÃO
    if sobrepreco_grave or multiplos_sinais:
        return 'Anomalia Crítica', '; '.join(justificativas)
    elif sem_competitividade or lead_time_anomalo:
        return 'Suspeita Moderada', '; '.join(justificativas)
    else:
        return 'Regular', 'Nenhum critério de anomalia identificado'


# Aplicar classificação a todas as licitações
resultados = df.apply(classificar_licitacao, axis=1)
df['classificacao'] = resultados.apply(lambda x: x[0])
df['justificativa_anomalia'] = resultados.apply(lambda x: x[1])

# Flag binária para compatibilidade com código existente
df['flag_anomalia'] = df['classificacao'].isin(['Anomalia Crítica', 'Suspeita Moderada'])

# Estatísticas da classificação
print("\n" + "="*60)
print("DISTRIBUIÇÃO DAS CLASSIFICAÇÕES")
print("="*60)
print(df['classificacao'].value_counts())
print(f"\nTotal de anomalias detectadas: {df['flag_anomalia'].sum()} "
      f"({df['flag_anomalia'].mean()*100:.1f}%)")


DISTRIBUIÇÃO DAS CLASSIFICAÇÕES
classificacao
Regular              799
Suspeita Moderada     34
Anomalia Crítica       3
Name: count, dtype: int64

Total de anomalias detectadas: 37 (4.4%)



### Exportação dos resultados

Salva dois conjuntos de saída para documentação e iteração futura:
- `licitacoes_com_score.csv`: todos os registros com colunas originais + `score_anomalia`, `nivel_risco`, `flag_anomalia`.
- `licitacoes_anomalias.csv`: subconjunto filtrado (`flag_anomalia = True`).

Ambos são escritos com `sep=';'` para manter compatibilidade com os dados de origem e possíveis ferramentas do órgão.


In [6]:

todos_path = exports_dir / '3_licitacoes_com_score.csv'
anomalias_path = exports_dir / '4_licitacoes_anomalias.csv'

# Exportar dataset completo com classificações
df.to_csv(todos_path, sep=';', index=False)

# Exportar apenas anomalias (Críticas + Suspeitas) com justificativas
anomalias = df[df['flag_anomalia']].copy()
anomalias.to_csv(anomalias_path, sep=';', index=False)

print(f"\n✓ Datasets exportados:")
print(f"  - Completo: {todos_path} ({len(df):,} registros)")
print(f"  - Anomalias: {anomalias_path} ({len(anomalias):,} registros)")
print(f"\nNovos campos adicionados:")
print(f"  - classificacao: Categoria de risco (3 níveis)")
print(f"  - justificativa_anomalia: Critérios violados")
print(f"  - flag_anomalia: Binário (True para Crítica + Suspeita)")



✓ Datasets exportados:
  - Completo: exports/3_licitacoes_com_score.csv (836 registros)
  - Anomalias: exports/4_licitacoes_anomalias.csv (37 registros)

Novos campos adicionados:
  - classificacao: Categoria de risco (3 níveis)
  - justificativa_anomalia: Critérios violados
  - flag_anomalia: Binário (True para Crítica + Suspeita)
