<a href="https://colab.research.google.com/github/RMPDFT/RMPDFT/blob/main/C%C3%B3digo_com_Simula%C3%A7%C3%A3o_de_IA_Scraping_MPDFT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ==============================================================================
# FILE: anomaly_detector.py (NOVO ARQUIVO)
# Módulo SIMULADO para análise de anomalias e conformidade com IA.
# Em um projeto real, este módulo conteria os modelos de Machine Learning
# e NLP treinados, conforme o plano de projeto.
# ==============================================================================
import random
import logging

def analisar_contrato_com_ia(contrato_data):
    """
    SIMULAÇÃO de uma análise de IA em um contrato.

    Esta função recebe um dicionário com dados de um contrato e retorna
    uma pontuação de risco e uma lista de flags (sinais de alerta).

    Args:
        contrato_data (dict): Dicionário contendo informações do contrato
                              (ex: objeto, valor, contratado, etc.).

    Returns:
        dict: Um dicionário com 'risk_score' e 'risk_flags'.
    """
    logging.info(f"IA: Analisando contrato '{contrato_data.get('numero_contrato', 'N/A')}'...")

    flags = []
    score = 0

    objeto = contrato_data.get('objeto', '').lower()
    situacao = contrato_data.get('situacao', '').lower()

    # Simulação de regras baseadas em palavras-chave e padrões, conforme Fase 2 do plano.
    if "dispensa de licitação" in objeto or "inexigibilidade" in objeto:
        flags.append("Verificar enquadramento da contratação direta")
        score += 25

    if "aditivo" in objeto or "termo aditivo" in objeto:
        flags.append("Contrato com aditivos, verificar histórico e limites")
        score += 20

    if "emergencial" in objeto or "calamidade" in objeto:
        flags.append("Contratação emergencial, requer justificativa robusta")
        score += 30

    # Simulação de análise de situação
    if "suspenso" in situacao or "rescindido" in situacao:
        flags.append(f"Atenção: Contrato com status '{situacao.upper()}'")
        score += 40

    # Adiciona um pouco de aleatoriedade para simular a incerteza de um modelo real
    score += random.randint(0, 15)

    # Normaliza o score para ficar entre 0 e 100
    risk_score = min(score, 100)

    if not flags:
        flags.append("Nenhuma anomalia óbvia detectada pela simulação.")

    return {
        "risk_score": risk_score,
        "risk_flags": flags
    }

# ==============================================================================
# FILE: scraper_mpdft.py
# VERSÃO 4: Sem alterações neste arquivo. Mantido para integridade do projeto.
# ==============================================================================

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from urllib.parse import urljoin
import os
import re

HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Accept-Language': 'pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7'
}
BASE_URL = "https://www.mpdft.mp.br/transparencia/index.php"

def clean_url(url_string):
    if not isinstance(url_string, str): return url_string
    match = re.search(r'https?://[^\s")\]]+', url_string)
    return match.group(0) if match else url_string.strip()

def fetch_page_content(url, params=None, retries=3, delay=5):
    cleaned_url = clean_url(url)
    for attempt in range(retries):
        try:
            response = requests.get(cleaned_url, headers=HEADERS, params=params, timeout=30)
            response.raise_for_status()
            response.encoding = response.apparent_encoding
            return BeautifulSoup(response.text, 'html.parser')
        except requests.exceptions.RequestException as e:
            logging.error(f"Erro na requisição para {cleaned_url} (tentativa {attempt + 1}/{retries}): {e}")
            if attempt < retries - 1: time.sleep(delay)
    logging.critical(f"Falha ao buscar a página {cleaned_url} após {retries} tentativas.")
    return None

def parse_contratos_list(soup):
    contratos_data = []
    if not soup: return contratos_data
    content_div = soup.find('div', class_='conteudo')
    if not content_div: return contratos_data
    main_table = content_div.find('table')
    if not main_table: return contratos_data
    tbody = main_table.find('tbody')
    if not tbody: return contratos_data
    contrato_items = tbody.find_all('tr')
    for item in contrato_items:
        cols = item.find_all('td')
        if len(cols) < 5: continue
        try:
            num_contrato_tag = cols[0].find('a')
            num_contrato = num_contrato_tag.get_text(strip=True) if num_contrato_tag else cols[0].get_text(strip=True)
            link_contrato = urljoin(BASE_URL, num_contrato_tag['href']) if num_contrato_tag and num_contrato_tag.has_attr('href') else None
            contratado_info = cols[2].get_text(strip=True).split('CNPJ:')
            contrato = {
                'numero_contrato': num_contrato,
                'data_publicacao': cols[1].get_text(strip=True),
                'contratado_nome': contratado_info[0].strip(),
                'contratado_cnpj': contratado_info[1].strip() if len(contratado_info) > 1 else 'N/D',
                'objeto': cols[3].get_text(strip=True),
                'situacao': cols[4].get_text(strip=True),
                'link_detalhes': link_contrato,
            }
            contratos_data.append(contrato)
        except Exception as e:
            logging.error(f"Erro ao parsear linha de contrato: {e}")
    return contratos_data

def get_all_contratos_mpdft(ano):
    all_contratos = []
    pagina = 1
    while True:
        params = {'item': 'contratos', 'resp': 'CONTRATOS', 'ano_contrato': str(ano), 'pagina': pagina}
        logging.info(f"Buscando contratos - Ano: {ano}, Página: {pagina}")
        soup = fetch_page_content(BASE_URL, params=params)
        if not soup: break
        contratos_pagina = parse_contratos_list(soup)
        if not contratos_pagina:
            logging.info(f"Nenhuma outra página de contratos encontrada para o ano {ano}.")
            break
        all_contratos.extend(contratos_pagina)
        pagina += 1
        time.sleep(1)
    return all_contratos

# ==============================================================================
# FILE: utils.py
# VERSÃO 4: Sem alterações neste arquivo.
# ==============================================================================

DATA_DIR = "data"

def save_to_csv(dataframe, filename):
    if not os.path.exists(DATA_DIR):
        os.makedirs(DATA_DIR)
    filepath = os.path.join(DATA_DIR, filename)
    dataframe.to_csv(filepath, index=False, encoding='utf-8-sig')
    logging.info(f"DataFrame salvo com sucesso em '{filepath}'")

# ==============================================================================
# FILE: main.py
# Ponto de entrada principal
# VERSÃO 4: Integrada a chamada ao módulo de IA simulado.
# ==============================================================================

from datetime import datetime
# Importa o novo módulo de IA, que agora faz parte do mesmo script para simplicidade
# Em um projeto maior, estaria em anomaly_detector.py

def run_contratos_scraper_com_analise(anos):
    """Executa o scraper de contratos e depois a análise de IA."""
    logging.info(f"--- INICIANDO SCRAPER DE CONTRATOS PARA ANOS: {anos} ---")
    todos_contratos = []
    for ano in anos:
        todos_contratos.extend(get_all_contratos_mpdft(ano))

    if not todos_contratos:
        logging.info("Nenhum contrato encontrado.")
        return

    df = pd.DataFrame(todos_contratos)

    # --- ETAPA DE ANÁLISE COM IA (FASE 5 DO PLANO) ---
    logging.info("--- INICIANDO ANÁLISE DE CONTRATOS COM MÓDULO DE IA (SIMULADO) ---")

    # Converte o DataFrame para uma lista de dicionários para iterar
    contratos_list = df.to_dict('records')
    analysis_results = [analisar_contrato_com_ia(contrato) for contrato in contratos_list]

    # Adiciona os resultados da análise ao DataFrame original
    df_analysis = pd.DataFrame(analysis_results)
    df['risk_score'] = df_analysis['risk_score']
    # Converte a lista de flags em uma string legível
    df['risk_flags'] = df_analysis['risk_flags'].apply(lambda x: ', '.join(x))

    # Reordena colunas para melhor visualização, trazendo a análise para a frente
    cols_to_move = ['risk_score', 'risk_flags', 'numero_contrato', 'objeto', 'contratado_nome']
    df = df[cols_to_move + [col for col in df.columns if col not in cols_to_move]]

    # Salva o resultado final enriquecido
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    save_to_csv(df, f"contratos_analisados_com_ia_{timestamp}.csv")
    logging.info(f"Análise de IA concluída. Resultados salvos em 'data/contratos_analisados_com_ia_{timestamp}.csv'")


if __name__ == '__main__':
    # Configuração da execução
    ANOS_PARA_BUSCAR = [datetime.now().year] # Busca apenas o ano corrente

    # Executa o fluxo principal de coleta e análise
    run_contratos_scraper_com_analise(ANOS_PARA_BUSCAR)

    logging.info("Processo principal concluído.")