<a href="https://colab.research.google.com/github/danieltalon/trabalho_RAG/blob/main/2_verifica_respostas_taxa_acerto_IA_NO_RAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
# -*- coding: utf-8 -*-
"""
Avaliação de IA em Questões Médicas (Adaptado para Google Colab - COM BATCH API)

Este script avalia o desempenho de um modelo de IA generativa (Google Gemini)
em responder questões de múltipla escolha de provas de medicina,
filtradas por áreas específicas, enviando perguntas em lotes para a API.
"""

# @title 1. Instalar Bibliotecas e Importar Módulos Necessários
# Instala as bibliotecas necessárias no ambiente Colab
%pip install -q google-generativeai pandas # Pandas é útil para manipulação de dados e CSV

import json
import csv
import time
import re
import pandas as pd
import google.generativeai as genai
from google.colab import userdata, drive, files # Módulos específicos do Colab
from collections import defaultdict
import io # Para lidar com arquivos carregados

# @title 2. Definir Constantes e Configurações

# --- Constantes de Configuração ---

# Caminho para o arquivo JSON no Google Drive (AJUSTE CONFORME NECESSÁRIO)
JSON_FILE_PATH = '/content/questoes_2021_2022_2023_2024_limit700_classificadas_lote.json'

# Caminho para salvar o arquivo CSV de resultados no Google Drive (AJUSTE CONFORME NECESSÁRIO)
OUTPUT_CSV_FILE = '/content/drive/MyDrive/resultados_avaliacao_ia_lote.csv'

# Áreas médicas para filtrar as questões (Case-sensitive)
AREAS_TO_FILTER = ["Ginecologia", "Obstetrícia", "Pediatria"]

# Parâmetros de Processamento
BATCH_SIZE = 25
DELAY_BETWEEN_BATCHES = 5
MAX_RETRIES = 3
# Defina None ou um número maior que o total de questões filtradas para processar todas
MAX_QUESTIONS_TO_PROCESS = 700

# Modelo Gemini a ser usado
GEMINI_MODEL_NAME = 'gemini-1.5-flash'

# Nome da chave secreta no Colab para a API Key do Google
COLAB_SECRET_KEY_NAME = 'GOOGLE_API_KEY'
#-------------------------------------

# @title 3. Montar Google Drive (Necessário para acessar/salvar arquivos)
try:
    drive.mount('/content/drive')
    print("Google Drive montado com sucesso!")
except Exception as e:
    print(f"Erro ao montar Google Drive: {e}")
    print("Certifique-se de autorizar o acesso quando solicitado.")
    # exit()

# @title 4. Configurar API Key do Google (Usando Colab Secrets)
try:
    API_KEY = userdata.get(COLAB_SECRET_KEY_NAME)
    if not API_KEY:
        raise ValueError(f"Chave secreta '{COLAB_SECRET_KEY_NAME}' não encontrada ou vazia.")
    genai.configure(api_key=API_KEY)
    print("API Key do Google configurada com sucesso via Colab Secrets.")
except Exception as e:
    print(f"Erro ao configurar a API Key: {e}")
    print(f"Verifique se você adicionou a chave '{COLAB_SECRET_KEY_NAME}' nos Secrets do Colab.")
    # exit()

# @title 5. Funções Auxiliares (Carregar, Filtrar, Formatar LOTE, Chamar IA em LOTE, Parsear LOTE, Avaliar, Salvar)

def load_json_data(filepath):
    """Carrega os dados do arquivo JSON."""
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            return json.load(f)
    except FileNotFoundError:
        print(f"Erro: Arquivo JSON não encontrado em '{filepath}'. Verifique o caminho no Google Drive.")
        return None
    except json.JSONDecodeError:
        print(f"Erro: Falha ao decodificar o arquivo JSON '{filepath}'")
        return None
    except Exception as e:
        print(f"Erro inesperado ao carregar o JSON: {e}")
        return None

def filter_questions(data, areas):
    """Filtra as questões pelas áreas médicas especificadas."""
    filtered = []
    if not data:
        return filtered
    seen_ids = set()
    for i, question in enumerate(data):
        prova = question.get('prova', 'unk_prova')
        numero = question.get('numero', f'idx_{i}')
        question_id = f"{prova}_{numero}"
        if question_id in seen_ids:
            continue
        question_areas = question.get('areas_medicas', [])
        if any(area in question_areas for area in areas):
            if 'enunciado' in question and 'alternativas' in question and 'resposta' in question:
                if isinstance(question['alternativas'], dict):
                    filtered.append({
                        'id': question_id,
                        'enunciado': question['enunciado'],
                        'alternativas': question['alternativas'],
                        'resposta_correta': str(question['resposta']).strip().upper(),
                        'areas_medicas': question_areas,
                        'continha_imagem': question.get('contains_img', False)
                    })
                    seen_ids.add(question_id)
                else:
                    print(f"Aviso: Alternativas inválidas para a questão ID {question_id}. Pulando.")
            else:
                print(f"Aviso: Questão ID {question_id} faltando campos essenciais. Pulando.")
    return filtered

def format_batch_for_ai(batch_questions):
    """Formata um LOTE de questões para um único prompt da IA."""
    prompt = "Responda às seguintes questões de múltipla escolha. Para cada questão, forneça APENAS a letra da alternativa correta, precedida pelo número da questão no lote e um ponto (ex: 1. A, 2. C, 3. B).\n\n"
    for i, question_data in enumerate(batch_questions):
        prompt += f"--- Questão {i+1} ---\n"
        prompt += f"Enunciado: {question_data['enunciado']}\n\nAlternativas:\n"
        alternatives = question_data['alternativas']
        try:
            sorted_keys = sorted(alternatives.keys())
        except TypeError:
            sorted_keys = alternatives.keys()
        for key in sorted_keys:
            prompt += f"{key}: {alternatives[key]}\n"
        if question_data['continha_imagem']:
            prompt += "(Observação: Esta questão originalmente continha uma imagem, que não pode ser exibida aqui.)\n"
        prompt += "\n" # Espaço entre questões
    prompt += "Respostas (formato: 1. Letra, 2. Letra, ...):\n"
    return prompt

def parse_ai_batch_response(response_text, batch_size):
    """Analisa a resposta da IA para um lote de questões."""
    answers = {} # Dicionário para armazenar {indice_no_lote: resposta}
    if not response_text:
        print("Aviso: Resposta da IA para o lote está vazia.")
        # Marca todas as questões do lote como erro
        for i in range(batch_size):
            answers[i] = "Erro: Resposta Vazia no Lote"
        return answers

    # Tenta extrair respostas no formato "Número. Letra"
    # Regex para encontrar "Número. Letra" (com ou sem espaço após o ponto)
    pattern = re.compile(r"^\s*(\d+)\s*\.\s*([A-E])\b", re.MULTILINE)
    found_answers = pattern.findall(response_text.strip().upper())

    if found_answers:
        for num_str, letter in found_answers:
            try:
                question_index = int(num_str) - 1 # Ajusta para índice 0-based
                if 0 <= question_index < batch_size:
                    if question_index not in answers: # Pega a primeira ocorrência para um número
                         answers[question_index] = letter
                    else:
                         print(f"Aviso: Resposta duplicada para questão {question_index + 1} no lote. Usando a primeira encontrada.")
                else:
                    print(f"Aviso: Índice de questão inválido ({num_str}) encontrado na resposta do lote.")
            except ValueError:
                print(f"Aviso: Não foi possível converter o número da questão '{num_str}' em inteiro.")
    else:
        # Se o formato principal falhar, tenta uma abordagem mais simples (ex: A, B, C em linhas separadas)
        # Isso é menos confiável e pode exigir ajustes
        lines = response_text.strip().split('\n')
        potential_answers = [line.strip() for line in lines if len(line.strip()) == 1 and 'A' <= line.strip().upper() <= 'E']
        if len(potential_answers) == batch_size:
             print("Aviso: Usando formato alternativo de parsing (uma letra por linha).")
             for i, ans in enumerate(potential_answers):
                 answers[i] = ans.upper()
        else:
             print(f"Aviso: Não foi possível parsear a resposta do lote no formato esperado. Resposta recebida:\n{response_text}")


    # Preenche as respostas faltantes com erro
    for i in range(batch_size):
        if i not in answers:
            answers[i] = "Erro de Parsing no Lote"

    # Converte o dicionário para uma lista ordenada pelo índice
    answer_list = [answers.get(i, "Erro: Resposta Ausente no Lote") for i in range(batch_size)]
    return answer_list


def get_ai_answers_for_batch(batch_questions, model_name=GEMINI_MODEL_NAME, max_retries=MAX_RETRIES):
    """Envia um LOTE de questões para a IA e retorna a lista de respostas."""
    prompt = format_batch_for_ai(batch_questions)
    model = genai.GenerativeModel(model_name)
    attempts = 0
    batch_size = len(batch_questions)

    # Configuração de segurança
    safety_settings = [
        {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
        {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
        {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
        {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
    ]

    # Define a configuração de geração com baixa temperatura
    generation_config = genai.types.GenerationConfig(
        temperature=0.2 # Valor baixo para maior determinismo e precisão
        # Outros parâmetros como top_p, top_k podem ser ajustados se necessário,
        # mas a temperatura é o principal para este caso.
    )


    while attempts < max_retries:
        try:
            # response = model.generate_content(prompt, safety_settings=safety_settings)
            response = model.generate_content(
                prompt,
                generation_config=generation_config,
                safety_settings=safety_settings
            )

            # Verifica bloqueio ou resposta vazia
            if not response.parts:
                 block_reason = "Razão Desconhecida"
                 finish_reason = "Não especificado"
                 if hasattr(response, 'prompt_feedback') and response.prompt_feedback:
                     block_reason = response.prompt_feedback.block_reason or block_reason
                 if hasattr(response, 'candidates') and response.candidates and hasattr(response.candidates[0], 'finish_reason'):
                     finish_reason = response.candidates[0].finish_reason or finish_reason

                 if finish_reason != 'STOP':
                     print(f"Aviso: Resposta bloqueada para o LOTE. Razão: {block_reason} / Finish Reason: {finish_reason}")
                     return [f"Bloqueado ({block_reason})" for _ in range(batch_size)]
                 else:
                     print("Aviso: Resposta vazia ou sem partes para o LOTE, mas Finish Reason é STOP.")
                     return ["Erro: Resposta Vazia no Lote" for _ in range(batch_size)]

            ai_answers_raw = response.text
            parsed_answers = parse_ai_batch_response(ai_answers_raw, batch_size)
            return parsed_answers
        except Exception as e:
            attempts += 1
            print(f"Erro ao chamar a API Gemini para o LOTE (Tentativa {attempts}/{max_retries}): {e}")
            if attempts < max_retries:
                wait_time = 2 ** attempts
                print(f"  Aguardando {wait_time}s antes de tentar novamente...")
                time.sleep(wait_time)
            else:
                print(f"Falha ao obter resposta para o LOTE após {max_retries} tentativas.")
                return ["Erro de API" for _ in range(batch_size)] # Retorna erro para todas no lote

    return ["Erro: Máximo de tentativas atingido" for _ in range(batch_size)] # Retorna erro para todas

def evaluate_questions(questions_to_process):
    """Processa as questões em lotes, obtém respostas da IA em lote e avalia."""
    results = []
    total_questions = len(questions_to_process)
    print(f"Iniciando avaliação de {total_questions} questões em lotes de {BATCH_SIZE}...")

    for i in range(0, total_questions, BATCH_SIZE):
        batch_questions_data = questions_to_process[i:min(i + BATCH_SIZE, total_questions)]
        current_batch_size = len(batch_questions_data)
        print(f"\n--- Processando Lote {i // BATCH_SIZE + 1}/{(total_questions + BATCH_SIZE - 1) // BATCH_SIZE} (Questões {i + 1} a {i + current_batch_size}) ---")

        # Chama a IA UMA VEZ para o lote inteiro
        ai_answers_for_batch = get_ai_answers_for_batch(batch_questions_data)

        # Processa os resultados para cada questão no lote
        for j, question_data in enumerate(batch_questions_data):
            ai_answer = ai_answers_for_batch[j] # Pega a resposta correspondente do lote
            correct_answer = question_data['resposta_correta']

            is_valid_ai_answer = not ai_answer.startswith("Erro") and not ai_answer.startswith("Bloqueado")
            is_correct = is_valid_ai_answer and (ai_answer == correct_answer)

            results.append({
                'id': question_data['id'],
                'enunciado': question_data['enunciado'],
                'resposta_ia': ai_answer,
                'resposta_correta': correct_answer,
                'resultado': 'CERTO' if is_correct else ('ERRADO' if is_valid_ai_answer else 'ERRO_IA'),
                'areas_medicas': ", ".join(question_data['areas_medicas']),
                'continha_imagem': question_data['continha_imagem']
            })

        # Pausa entre os LOTES de chamadas à API
        if i + BATCH_SIZE < total_questions:
            print(f"--- Fim do Lote. Aguardando {DELAY_BETWEEN_BATCHES} segundos antes da próxima chamada à API... ---")
            time.sleep(DELAY_BETWEEN_BATCHES)

    print("\nAvaliação de todos os lotes concluída.")
    return results

# Função calculate_accuracy (sem alterações, já calcula com base nos resultados individuais)
def calculate_accuracy(results, areas_of_interest):
    """Calcula a acurácia geral e por área médica."""
    total_processed = len(results)
    total_correct = 0
    area_counts = defaultdict(lambda: {'total': 0, 'correct': 0, 'errors': 0})
    errors_parsing = 0
    errors_api = 0
    errors_blocked = 0
    errors_total_ia = 0

    for result in results:
        is_error = False
        if result['resultado'] == 'ERRO_IA':
            errors_total_ia += 1
            is_error = True
            # Detalha o tipo de erro
            if "Parsing" in result['resposta_ia']:
                errors_parsing += 1
            elif "API" in result['resposta_ia'] or "Erro:" in result['resposta_ia']:
                 errors_api += 1
            elif "Bloqueado" in result['resposta_ia']:
                 errors_blocked += 1
            elif "Ausente" in result['resposta_ia']: # Erro específico do parsing de lote
                 errors_parsing += 1 # Ou pode criar uma categoria nova
            elif "Vazia" in result['resposta_ia']:
                 errors_api +=1 # Ou categoria nova
        elif result['resultado'] == 'CERTO':
            total_correct += 1

        # Calcula acurácia por área
        question_areas = result['areas_medicas'].split(', ')
        processed_for_area_calc = set()
        for area in question_areas:
            if area in areas_of_interest and area not in processed_for_area_calc:
                area_counts[area]['total'] += 1
                if result['resultado'] == 'CERTO':
                    area_counts[area]['correct'] += 1
                elif result['resultado'] == 'ERRO_IA':
                     area_counts[area]['errors'] += 1
                processed_for_area_calc.add(area)

    overall_accuracy = (total_correct / total_processed * 100) if total_processed > 0 else 0
    valid_processed_for_accuracy = total_processed - errors_total_ia
    accuracy_excluding_errors = (total_correct / valid_processed_for_accuracy * 100) if valid_processed_for_accuracy > 0 else 0

    area_accuracy = {}
    for area, counts in area_counts.items():
        valid_area_processed = counts['total'] - counts['errors']
        area_accuracy[area] = (counts['correct'] / valid_area_processed * 100) if valid_area_processed > 0 else 0

    summary = {
        'total_questions_processed': total_processed,
        'total_correct': total_correct,
        'overall_accuracy': overall_accuracy,
        'accuracy_excluding_errors': accuracy_excluding_errors,
        'area_accuracy': area_accuracy,
        'area_counts': area_counts,
        'errors_parsing': errors_parsing,
        'errors_api': errors_api,
        'errors_blocked': errors_blocked,
        'errors_total_ia': errors_total_ia,
        'valid_processed_for_accuracy': valid_processed_for_accuracy
    }
    return summary

# Função save_results_to_csv (sem alterações)
def save_results_to_csv(results, filepath):
    """Salva os resultados detalhados em um arquivo CSV usando Pandas."""
    if not results:
        print("Nenhum resultado para salvar.")
        return
    try:
        df = pd.DataFrame(results)
        cols = ['id', 'enunciado', 'resposta_ia', 'resposta_correta', 'resultado', 'areas_medicas', 'continha_imagem']
        df = df.reindex(columns=cols, fill_value='')
        df.to_csv(filepath, index=False, encoding='utf-8-sig')
        print(f"Resultados salvos com sucesso em '{filepath}' no Google Drive.")
    except IOError:
        print(f"Erro: Não foi possível escrever no arquivo CSV '{filepath}'. Verifique o caminho e as permissões no Google Drive.")
    except Exception as e:
        print(f"Erro inesperado ao salvar CSV com Pandas: {e}")

# @title 6. Execução Principal do Script
if __name__ == "__main__":
    print("--- Iniciando Script de Avaliação (Processamento em Lote para IA) ---")

    if 'API_KEY' not in locals() or not API_KEY:
        print("!!! ERRO CRÍTICO: API Key não configurada. Interrompendo. !!!")
    else:
        print("\nCarregando dados do JSON...")
        all_data = load_json_data(JSON_FILE_PATH)

        if all_data:
            print(f"Total de questões no arquivo: {len(all_data)}")
            print(f"Filtrando por áreas: {', '.join(AREAS_TO_FILTER)}...")
            filtered_data = filter_questions(all_data, AREAS_TO_FILTER)
            print(f"Número de questões filtradas: {len(filtered_data)}")

            if not filtered_data:
                print("Nenhuma questão encontrada para as áreas especificadas.")
            else:
                questions_to_process = filtered_data
                if MAX_QUESTIONS_TO_PROCESS is not None and MAX_QUESTIONS_TO_PROCESS > 0 and MAX_QUESTIONS_TO_PROCESS < len(filtered_data):
                    print(f"Limitando o processamento às primeiras {MAX_QUESTIONS_TO_PROCESS} questões filtradas.")
                    questions_to_process = filtered_data[:MAX_QUESTIONS_TO_PROCESS]
                elif MAX_QUESTIONS_TO_PROCESS is not None and MAX_QUESTIONS_TO_PROCESS > 0:
                     print(f"Processando todas as {len(filtered_data)} questões filtradas (MAX_QUESTIONS_TO_PROCESS >= total filtrado).")
                elif MAX_QUESTIONS_TO_PROCESS == 0:
                    print("MAX_QUESTIONS_TO_PROCESS definido como 0. Nenhuma questão será processada.")
                    questions_to_process = []
                else: # MAX_QUESTIONS_TO_PROCESS é None
                    print(f"Processando todas as {len(filtered_data)} questões filtradas (MAX_QUESTIONS_TO_PROCESS não definido).")

                if questions_to_process:
                    evaluation_results = evaluate_questions(questions_to_process)
                    accuracy_summary = calculate_accuracy(evaluation_results, AREAS_TO_FILTER)
                    save_results_to_csv(evaluation_results, OUTPUT_CSV_FILE)

                    # Mostra o resumo da acurácia
                    print("\n--- Resumo da Avaliação ---")
                    print(f"Modelo IA Utilizado: {GEMINI_MODEL_NAME}")
                    print(f"Total de Questões Processadas: {accuracy_summary['total_questions_processed']}")
                    print(f"Total de Acertos: {accuracy_summary['total_correct']}")
                    print(f"Total de Erros da IA (API/Parsing/Bloqueio/Ausente): {accuracy_summary['errors_total_ia']}")
                    print(f"  - Erros de Parsing/Ausente no Lote: {accuracy_summary['errors_parsing']}")
                    print(f"  - Erros de API/Outros: {accuracy_summary['errors_api']}")
                    print(f"  - Respostas Bloqueadas: {accuracy_summary['errors_blocked']}")
                    print(f"Questões Válidas para Cálculo de Acurácia (sem erros IA): {accuracy_summary['valid_processed_for_accuracy']}")
                    print(f"\nAcurácia Geral (sobre todas processadas): {accuracy_summary['overall_accuracy']:.2f}%")
                    print(f"Acurácia (excluindo erros da IA): {accuracy_summary['accuracy_excluding_errors']:.2f}%")

                    print("\nAcurácia por Área Médica (calculada sobre questões válidas para cada área):")
                    if accuracy_summary['area_counts']:
                        for area in AREAS_TO_FILTER:
                             if area in accuracy_summary['area_counts']:
                                 counts = accuracy_summary['area_counts'][area]
                                 acc = accuracy_summary['area_accuracy'].get(area, 0)
                                 valid_area_proc = counts['total'] - counts['errors']
                                 print(f"  - {area}: {acc:.2f}% ({counts['correct']} de {valid_area_proc} válidas / {counts['total']} total / {counts['errors']} erros IA)")
                             else:
                                 print(f"  - {area}: Nenhuma questão processada.")
                    else:
                         print("  Nenhuma questão encontrada para as áreas especificadas.")

                    print(f"\nResultados detalhados salvos em: {OUTPUT_CSV_FILE}")
                else:
                    print("Nenhuma questão selecionada para processamento.")
        else:
            print("Não foi possível carregar os dados do JSON.")

    print("\n--- Fim do Script ---")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Google Drive montado com sucesso!
API Key do Google configurada com sucesso via Colab Secrets.
--- Iniciando Script de Avaliação (Processamento em Lote para IA) ---

Carregando dados do JSON...
Total de questões no arquivo: 670
Filtrando por áreas: Ginecologia, Obstetrícia, Pediatria...
Número de questões filtradas: 209
Processando todas as 209 questões filtradas (MAX_QUESTIONS_TO_PROCESS >= total filtrado).
Iniciando avaliação de 209 questões em lotes de 25...

--- Processando Lote 1/9 (Questões 1 a 25) ---
--- Fim do Lote. Aguardando 5 segundos antes da próxima chamada à API... ---

--- Processando Lote 2/9 (Questões 26 a 50) ---
--- Fim do Lote. Aguardando 5 segundos antes da próxima chamada à API... ---

--- Processando Lote 3/9 (Questões 51 a 75) ---
--- Fim do Lote. Aguardando 5 segundos antes da próxima chamada à API... ---

--- Processando Lote 4/9 (Q