In [1]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset
from datetime import datetime
from typing import Dict, List, Optional, Union
from tqdm import tqdm
import pandas as pd
import os
import time

# Evaluate base model on MMLU VS MMLU_RU performance

## This class is used to properly manager performance results

In [4]:
class ModelPerformanceTracker:
    """
    Handles tracking and managing model performance metrics for various evaluations.
    """
    def __init__(self, dataset_path="model_performance_metrics"):
        self.dataset_path = dataset_path
        self.metrics_file = os.path.join(dataset_path, "metrics.csv")
        self.ensure_dataset_structure()
        
    def ensure_dataset_structure(self):
        """Creates necessary directories and files if they don't exist."""
        os.makedirs(self.dataset_path, exist_ok=True)
        if not os.path.exists(self.metrics_file):
            df = pd.DataFrame(columns=[
                'model_name', 'metric_type', 'metric_name', 'value',
                'task', 'language', 'model_params', 'timestamp', 
                'run_id', 'training_config', 'notes'
            ])
            df.to_csv(self.metrics_file, index=False)

    def log_metrics(self, 
                   metrics: Dict[str, Union[float, Dict[str, float]]], 
                   model_name: str,
                   metric_type: str,
                   task: Optional[str] = None,
                   language: Optional[str] = None,
                   model_params: Optional[dict] = None,
                   training_config: Optional[dict] = None,
                   notes: Optional[str] = None) -> str:
        """
        Logs various model metrics to the dataset.
        
        Args:
            metrics: Dictionary of metric_name:value pairs or metric_name:dict pairs
            model_name: Name of the model being evaluated
            metric_type: Type of metric (e.g., 'accuracy', 'speed', 'memory')
            task: Task name if applicable
            language: Language of evaluation if applicable
            model_params: Model parameters/configuration
            training_config: Training configuration if model was fine-tuned
            notes: Additional notes about the evaluation
        
        Returns:
            run_id: Unique identifier for this evaluation run
        """
        df = pd.read_csv(self.metrics_file)
        run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        new_entries = []
        for metric_name, value in metrics.items():
            if isinstance(value, dict):
                # Handle nested metrics (e.g., per-task accuracies)
                for sub_name, sub_value in value.items():
                    entry = {
                        'model_name': model_name,
                        'metric_type': metric_type,
                        'metric_name': f"{metric_name}/{sub_name}",
                        'value': sub_value,
                        'task': task,
                        'language': language,
                        'model_params': str(model_params) if model_params else None,
                        'timestamp': datetime.now().isoformat(),
                        'run_id': run_id,
                        'training_config': str(training_config) if training_config else None,
                        'notes': notes
                    }
                    new_entries.append(entry)
            else:
                entry = {
                    'model_name': model_name,
                    'metric_type': metric_type,
                    'metric_name': metric_name,
                    'value': value,
                    'task': task,
                    'language': language,
                    'model_params': str(model_params) if model_params else None,
                    'timestamp': datetime.now().isoformat(),
                    'run_id': run_id,
                    'training_config': str(training_config) if training_config else None,
                    'notes': notes
                }
                new_entries.append(entry)
        
        df_new = pd.DataFrame(new_entries)
        df = pd.concat([df, df_new], ignore_index=True)
        df.to_csv(self.metrics_file, index=False)
        
        return run_id

    def get_metrics(self, 
                   model_name: Optional[str] = None,
                   metric_type: Optional[str] = None,
                   task: Optional[str] = None,
                   run_id: Optional[str] = None) -> pd.DataFrame:
        """
        Retrieves metrics based on specified filters.
        """
        df = pd.read_csv(self.metrics_file)
        
        if model_name:
            df = df[df['model_name'] == model_name]
        if metric_type:
            df = df[df['metric_type'] == metric_type]
        if task:
            df = df[df['task'] == task]
        if run_id:
            df = df[df['run_id'] == run_id]
            
        return df

    def compare_models(self, 
                      model_names: List[str],
                      metric_type: Optional[str] = None) -> pd.DataFrame:
        """
        Compares metrics between different models.
        """
        df = pd.read_csv(self.metrics_file)
        df = df[df['model_name'].isin(model_names)]
        
        if metric_type:
            df = df[df['metric_type'] == metric_type]
            
        comparison = df.groupby(['model_name', 'metric_type', 'metric_name']).agg({
            'value': ['mean', 'std', 'count'],
            'timestamp': 'max'
        }).round(4)
        
        return comparison

## These are the main evaluation function that are used to evaluate the model on any task in mmlu

In [5]:
def construct_prompt(example, lang="en"):
    """
    Constructs a prompt from a dataset example.
    Uses language-specific keys if available and falls back to generic keys.
    """
    if lang.lower() == "ru":
        question_key = "question_ru"
        choices_key = "choices_ru"
    else:
        question_key = "question_en"
        choices_key = "choices_en"
        
    if choices_key not in example:
        for key in ["choices", "options", "possible_answers", "answers"]:
            if key in example:
                choices_key = key
                break
    if choices_key not in example:
        raise KeyError("Example must contain a valid choices key.")
        
    question_text = example.get(question_key, example.get("question", ""))
    prompt = f"Question: {question_text}\nChoices:\n"
    for idx, choice in enumerate(example[choices_key]):
        letter = chr(65 + idx)  # Map 0->A, 1->B, etc.
        prompt += f"{letter}. {choice}\n"
    prompt += "Answer:"
    return prompt

In [6]:
def evaluate_dataset(dataset, model, tokenizer, device):
    """
    Iterates over the dataset, generates the model output for each prompt,
    extracts the predicted answer (first capital letter found), and compares
    it to the ground truth to compute accuracy.
    """
    correct = 0
    total = len(dataset)
    for example in tqdm(dataset, desc="Evaluating"):
        prompt = construct_prompt(example, lang="ru")
        inputs = tokenizer(prompt, return_tensors="pt").to(device)
        outputs = model.generate(
            **inputs,
            max_new_tokens=10,
            temperature=0.0,
            do_sample=False
        )
        response = tokenizer.decode(outputs[0], skip_special_tokens=True)
        generated_text = response[len(prompt):].strip()
        answer_pred = ""
        for char in generated_text:
            if char.upper() in ["A", "B", "C", "D", "E", "F", "G"]:
                answer_pred = char.upper()
                break
        ground_truth = example["answer"]
        if isinstance(ground_truth, int):
            ground_truth = chr(65 + ground_truth)
        else:
            ground_truth = ground_truth.strip().upper()
        if answer_pred == ground_truth:
            correct += 1
    accuracy = correct / total * 100
    return accuracy

## Initializing the model

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model_name = "Qwen/Qwen2.5-7B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto"
)
model.eval()

tracker = ModelPerformanceTracker()

tokenizer_config.json:   0%|          | 0.00/7.30k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/663 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/27.8k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/4 [00:00<?, ?it/s]

model-00001-of-00004.safetensors:   0%|          | 0.00/3.95G [00:00<?, ?B/s]

In [None]:
mmlu_tasks = [
  'abstract_algebra', 'anatomy', 'astronomy', 'auxiliary_train',
  'business_ethics', 'clinical_knowledge', 'college_biology', 'college_chemistry',
  'college_computer_science', 'college_mathematics', 'college_medicine', 'college_physics',
  'computer_security', 'conceptual_physics', 'econometrics', 'electrical_engineering',
  'elementary_mathematics', 'formal_logic', 'global_facts', 'high_school_biology',
  'high_school_chemistry', 'high_school_computer_science', 'high_school_european_history',
  'high_school_geography', 'high_school_government_and_politics', 'high_school_macroeconomics',
  'high_school_mathematics', 'high_school_microeconomics', 'high_school_physics',
  'high_school_psychology', 'high_school_statistics', 'high_school_us_history',
  'high_school_world_history', 'human_aging', 'human_sexuality', 'international_law',
  'jurisprudence', 'logical_fallacies', 'machine_learning', 'management', 'marketing',
  'medical_genetics', 'miscellaneous', 'moral_disputes', 'moral_scenarios', 'nutrition',
  'philosophy', 'prehistory', 'professional_accounting', 'professional_law',
  'professional_medicine', 'professional_psychology', 'public_relations', 'security_studies',
  'sociology', 'us_foreign_policy', 'virology', 'world_religions'
]

## Evaluation

In [None]:
# Evaluate MMLU_RU performance
mmlu_results = {}
for task in mmlu_tasks:
    try:
        dataset_mmlu = load_dataset("NLPCoreTeam/mmlu_ru", task, split="test")
        accuracy = evaluate_dataset(dataset_mmlu, model, tokenizer, device)
        mmlu_results[task] = accuracy
    except Exception as e:
        print(f"Error evaluating {task}: {e}")

# Log MMLU_RU results
run_id = tracker.log_metrics(
    metrics={'task_accuracy': mmlu_results},
    model_name=model_name,
    metric_type='accuracy',
    language="ru",
    model_params={
        "model_size": "7B",
        "dtype": "float16",
        "device": str(device)
    },
    notes="MMLU-RU evaluation"
)

In [None]:
# Evaluate MMLU performance
mmlu_results = {}
for task in mmlu_tasks:
    try:
        # Load the original MMLU dataset
        dataset_mmlu = load_dataset("cais/mmlu", task, split="test")
        # Evaluate the dataset
        accuracy = evaluate_dataset(dataset_mmlu, model, tokenizer, device)
        # Store the results
        mmlu_results[task] = accuracy
    except Exception as e:
        print(f"Error evaluating {task}: {e}")

# Log MMLU results
run_id = tracker.log_metrics(
    metrics={'task_accuracy': mmlu_results},
    model_name=model_name,
    metric_type='accuracy',
    language="en",
    model_params={
        "model_size": "7B",
        "dtype": "float16",
        "device": str(device)
    },
    notes="MMLU evaluation"
)


# Evaluate the speed of text generation in Russian VS English

In [None]:
def evaluate_generation_speed(model, tokenizer, device, 
                            num_samples=100, 
                            input_lengths=[128, 512], 
                            output_lengths=[128, 512]):
    """
    Evaluates model generation speed for different input and output lengths.
    
    Returns:
        Dictionary containing various speed metrics:
        - tokens_per_second: Generation speed
        - latency_ms: Average latency per generation
        - throughput: Tokens processed per second including input
    """
    metrics = {}
    
    # Generate sample input text of different lengths
    sample_text = "The quick brown fox jumps over the lazy dog. " * 100
    
    for input_len in input_lengths:
        for output_len in output_lengths:
            total_generation_time = 0
            total_tokens = 0
            
            # Truncate input text to desired length
            input_text = sample_text[:input_len]
            
            # Warm-up run
            inputs = tokenizer(input_text, return_tensors="pt").to(device)
            _ = model.generate(**inputs, max_new_tokens=output_len)
            
            # Timed runs
            for _ in range(num_samples):
                inputs = tokenizer(input_text, return_tensors="pt").to(device)
                start_time = time.time()
                outputs = model.generate(**inputs, max_new_tokens=output_len)
                end_time = time.time()
                
                generation_time = end_time - start_time
                total_generation_time += generation_time
                total_tokens += len(outputs[0]) - len(inputs['input_ids'][0])
            
            # Calculate metrics
            avg_latency = (total_generation_time / num_samples) * 1000  # ms
            tokens_per_second = total_tokens / total_generation_time
            
            metrics[f"input_{input_len}_output_{output_len}"] = {
                'latency_ms': avg_latency,
                'tokens_per_second': tokens_per_second,
            }
    
    return metrics

In [None]:
# Evaluate generation speed
speed_metrics = evaluate_generation_speed(
    model, 
    tokenizer, 
    device,
    num_samples=10  # Reduced for testing, increase for better statistics
)

# Log speed metrics
tracker.log_metrics(
    metrics=speed_metrics,
    model_name=model_name,
    metric_type='speed',
    model_params={
        "model_size": "7B",
        "dtype": "float16",
        "device": str(device)
    },
    notes="Generation speed evaluation",
    run_id=run_id  # Use same run_id to group related metrics
)

# Print summary
print("\nEvaluation Summary:")
print(tracker.get_metrics(run_id=run_id))