In [None]:
import json
import pandas as pd
import os
import time
from datetime import datetime
import numpy as np
import re
from google.colab import drive
import torch
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

class FLANT5Evaluator:
    def __init__(self, questions_file: str, model_name: str = "google/flan-t5-large", max_consecutive_errors: int = 5):
        """
        Initialize evaluator with questions file and error threshold.
        model_name: HuggingFace model identifier ("google/flan-t5-large" by default)
        max_consecutive_errors: stops evaluation if this many errors occur in a row
        """
        self.max_consecutive_errors = max_consecutive_errors
        self.model_name = model_name
        self.load_questions(questions_file)
        self.results = []

        # Initialize model and tokenizer
        print(f"Loading {model_name}...")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

        # Move model to GPU if available
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self.model.to(self.device)
        print(f"Model loaded and moved to {self.device}")

    def load_questions(self, file_path: str):
        with open(file_path, 'r', encoding='utf-8') as f:
            self.questions = json.load(f)
            print(f"Loaded {len(self.questions)} questions")

    def format_prompt(self, question):
        return f"""Responde esta pregunta de opción múltiple. Responde solo con la letra de la respuesta correcta (A, B, C, o D).

Pregunta: {question['question']}

A- {question['option_a']}
B- {question['option_b']}
C- {question['option_c']}
D- {question['option_d']}

Respuesta:"""

    def extract_answer(self, raw_answer):
        """Extract the answer letter using multiple patterns"""
        patterns = [
            r"^\s*([ABCD])(\b|\.|$)",  # Just the letter
            r".*[^a-zA-Z]([ABCD])(\b|\.|$)",  # Letter at the end
            r"([ABCD])[^\w\s]*$",  # Letter followed by punctuation at end
            r".*\b([ABCD])\b.*",  # Letter as a word
            r".*([ABCD]).*"  # Any letter anywhere
        ]

        for pattern in patterns:
            match = re.search(pattern, raw_answer, re.IGNORECASE)
            if match:
                return match.group(1).upper()
        return None

    def get_model_response(self, prompt):
        """Get response from FLAN-T5 model"""
        # Tokenize input
        inputs = self.tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True)
        inputs = {k: v.to(self.device) for k, v in inputs.items()}

        # Generate response
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_length=10,  # Short as we only need a letter
                num_beams=4,
                length_penalty=0.6,
                early_stopping=True
            )

        # Decode response
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return response.strip()

    def evaluate_model(self, test_run: bool = False):
        """
        Evaluate questions using FLAN-T5 model.
        test_run: if True, only evaluates first 5 questions
        """
        consecutive_errors = 0
        questions_to_evaluate = self.questions[:5] if test_run else self.questions

        # For test run, we'll still abort on consecutive errors
        max_errors = self.max_consecutive_errors if test_run else float('inf')

        for i, question in enumerate(questions_to_evaluate):
            try:
                start_time = time.time()
                prompt = self.format_prompt(question)

                print("\nSending prompt:")
                print(prompt)

                # Get model response
                raw_answer = self.get_model_response(prompt)
                end_time = time.time()

                # Try to extract the answer using multiple patterns
                model_answer = self.extract_answer(raw_answer)

                if not model_answer:
                    print(f"Warning: Could not extract answer from response: {raw_answer}")
                    model_answer = "invalid"

                result = {
                    'question_id': question['id'],
                    'model': self.model_name,
                    'question_input': question['question'],
                    'model_raw_output': raw_answer,
                    'answer': model_answer if model_answer in ['A', 'B', 'C', 'D'] else None,
                    'correct_answer': question['correct_answer'],
                    'correct': model_answer == question['correct_answer'] if model_answer in ['A', 'B', 'C', 'D'] else False,
                    'time': end_time - start_time,
                    'error': None if model_answer in ['A', 'B', 'C', 'D'] else f"Invalid format: {raw_answer}"
                }

                self.results.append(result)

                if model_answer in ['A', 'B', 'C', 'D']:
                    consecutive_errors = 0
                else:
                    consecutive_errors += 1

                print(f"\nEvaluation:")
                print(f"Question {i+1}/{len(questions_to_evaluate)}")
                print(f"Raw answer: {raw_answer}")
                print(f"Extracted answer: {model_answer}")
                print(f"Correct answer: {question['correct_answer']}")
                print(f"Time: {result['time']:.2f}s")
                print("-" * 40)

                if consecutive_errors >= max_errors and test_run:
                    print(f"\nAborting test run: {consecutive_errors} consecutive errors")
                    break

            except Exception as e:
                error_msg = str(e)
                print(f"Error on question {i+1}: {error_msg}")

                self.results.append({
                    'question_id': question['id'],
                    'model': self.model_name,
                    'question_input': question['question'],
                    'model_raw_output': None,
                    'answer': None,
                    'correct_answer': question['correct_answer'],
                    'correct': False,
                    'time': None,
                    'error': error_msg
                })

                consecutive_errors += 1
                if consecutive_errors >= max_errors and test_run:
                    print(f"\nAborting test run: {consecutive_errors} consecutive errors")
                    break

        return self.save_results()

    def save_results(self, output_dir: str = '/content/drive/MyDrive/TFM2/TFM-DATASETS/evaluations'):
        """Save results and return summary metrics."""
        os.makedirs(output_dir, exist_ok=True)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        # Convert to DataFrame
        df = pd.DataFrame(self.results)

        # Calculate metrics
        metrics = {
            'model': self.model_name,
            'total_questions': len(df),
            'completed': df['error'].isna().sum(),
            'errors': df['error'].notna().sum(),
            'correct': df['correct'].sum(),
            'accuracy': df['correct'].mean(),
            'avg_time': df['time'].mean(),
            'invalid_formats': len(df[df['answer'].isna()]),
            'valid_responses': len(df[df['answer'].notna()])
        }

        # Convert NumPy types to standard Python types for JSON serialization
        metrics = {k: v.item() if isinstance(v, (np.int64, np.int32)) else v for k, v in metrics.items()}

        # Save results
        model_short_name = self.model_name.split('/')[-1]
        base_path = f"{output_dir}/eval_{model_short_name}_{timestamp}"
        df.to_csv(f"{base_path}_results.csv", index=False)

        with open(f"{base_path}_summary.json", 'w') as f:
            json.dump(metrics, f, indent=2)

        # Print summary
        print("\nEvaluation Summary:")
        print(f"Model: {self.model_name}")
        print(f"Total Questions: {metrics['total_questions']}")
        print(f"Valid Responses: {metrics['valid_responses']}")
        print(f"Invalid Formats: {metrics['invalid_formats']}")
        print(f"Correct Answers: {metrics['correct']}")
        print(f"Accuracy (of valid): {metrics['accuracy']:.2%}")
        if metrics['avg_time']:
            print(f"Avg Time: {metrics['avg_time']:.2f}s")

        return metrics


def run_test_evaluation(model_name: str = "google/flan-t5-large"):
    """Run a test evaluation with just 5 questions."""
    questions_file = '/content/drive/MyDrive/TFM2/TFM-DATASETS/structured_questions.json'
    evaluator = FLANT5Evaluator(questions_file, model_name)
    return evaluator.evaluate_model(test_run=True)

def run_full_evaluation(model_name: str = "google/flan-t5-large"):
    """Run the full evaluation."""
    questions_file = '/content/drive/MyDrive/TFM2/TFM-DATASETS/structured_questions.json'
    evaluator = FLANT5Evaluator(questions_file, model_name)
    return evaluator.evaluate_model()

# First install required packages
!pip install -q transformers torch

# Mount Google Drive
drive.mount('/content/drive')

# Run test evaluation
print("\nRunning test evaluation with 5 questions...")
test_metrics = run_test_evaluation()

# Ask user if they want to continue
if input("\nContinue with full evaluation? (y/n): ").lower() == 'y':
    print("\nRunning full evaluation...")
    full_metrics = run_full_evaluation()

Mounted at /content/drive

Running test evaluation with 5 questions...
Loaded 174 questions
Loading google/flan-t5-large...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/2.54k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.42M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/2.20k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/662 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.13G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

Model loaded and moved to cpu

Sending prompt:
Responde esta pregunta de opción múltiple. Responde solo con la letra de la respuesta correcta (A, B, C, o D).

Pregunta: En relación con el metabolismo del hierro y su control mediado por hepcidina, es cierto que:

A- La caída en la presión parcial de oxígeno promueve la activación del factor inducible por hipoxia (HIF), que aumenta la expresión de hepcidina.
B- El aumento de hierro sérico o la inflamación estimulan la síntesis de hepcidina en el hígado, que regula negativamente la función de la ferroportina.
C- La hepcidina disminuye la absorción intestinal de hierro a través de la inactivación del transportador de metales bivalentes 1 (DMT1).
D- En la hemocromatosis hereditaria de tipo 1, las mutaciones en la proteína de la hemocromatosis humana (HFE) causan un aumento de la producción de hepcidina.

Respuesta:

Evaluation:
Question 1/5
Raw answer: C
Extracted answer: C
Correct answer: B
Time: 9.17s
-------------------------------------