In [1]:
# Install Dependencies
!pip install -q --upgrade pip

# Install vLLM and its core dependencies
# Pinning versions known to work with vllm 0.8.5
print("--- Installing vLLM and dependencies ---")
!pip install -q --no-cache-dir "vllm==0.8.5"

# --- 3. Install other requirements (UPGRADE TQDM) ---
print("\n--- Installing other requirements (upgrading tqdm) ---")
# Explicitly upgrade tqdm first, then install the rest
!pip install -q --no-cache-dir --upgrade tqdm
!pip install -q --no-cache-dir huggingface_hub pandas seaborn matplotlib "tqdm[asyncio]" openai tabulate nest_asyncio
# Note: Installing tqdm[asyncio] might re-trigger the tqdm upgrade if the first one didn't stick

# --- 4. Install datasets ---
print("\n--- Installing datasets ---")
!pip install -q --no-cache-dir "datasets" # Let it try to pull dependencies

# --- 5. Force Pin vLLM 0.8.5 specific dependencies (CRITICAL) ---
print("\n--- Pinning vLLM 0.8.5 required versions (opentelemetry, numba) ---")
!pip install -q --no-cache-dir --upgrade "opentelemetry-api==1.26.0" "opentelemetry-sdk==1.26.0"
!pip install -q --no-cache-dir --upgrade "numba==0.61.2"
# Using --force-reinstall to ensure these exact versions are present at the end

# --- 6. Force Pin fsspec potentially for datasets ---
# Datasets usually needs fsspec <= version X. GCSFS needs a newer one. Prioritize datasets.
print("\n--- Pinning fsspec potentially for datasets ---")
!pip install -q --no-cache-dir --upgrade "fsspec<=2025.3.0"


# --- Verify final versions ---
print("\n--- Verifying final key package versions ---")
!pip show vllm torch datasets pandas openai tqdm nest_asyncio protobuf opentelemetry-api opentelemetry-sdk numba fsspec matplotlib seaborn tabulate

print("\n--- Dependencies installation attempt finished ---")

--- Installing vLLM and dependencies ---

--- Installing other requirements (upgrading tqdm) ---
[0m
--- Installing datasets ---

--- Pinning vLLM 0.8.5 required versions (opentelemetry, numba) ---

--- Pinning fsspec potentially for datasets ---

--- Verifying final key package versions ---
Name: vllm
Version: 0.8.5
Summary: A high-throughput and memory-efficient inference and serving engine for LLMs
Home-page: https://github.com/vllm-project/vllm
Author: vLLM Team
Author-email: 
License-Expression: Apache-2.0
Location: /usr/local/lib/python3.11/dist-packages
Requires: aiohttp, blake3, cachetools, cloudpickle, compressed-tensors, depyf, einops, fastapi, filelock, gguf, huggingface-hub, importlib_metadata, lark, llguidance, lm-format-enforcer, mistral_common, msgspec, ninja, numba, numpy, openai, opencv-python-headless, opentelemetry-api, opentelemetry-exporter-otlp, opentelemetry-sdk, opentelemetry-semantic-conventions-ai, outlines, partial-json-parser, pillow, prometheus-fastapi-ins

# Qualitative Model Evaluation

This notebook runs a set of predefined prompts against multiple language models using vLLM and saves the generated outputs for qualitative comparison. It reuses the server management and model handling logic from the dataset evaluation notebook.

In [2]:
# Cell 1: Imports and Basic Setup
import os
import time
import json
import pandas as pd
import subprocess
import re
import requests
import logging
import asyncio
import datetime
import glob
import traceback
import shlex
import IPython
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any

from google.colab import drive
from huggingface_hub import snapshot_download
from openai import AsyncOpenAI
from tqdm.asyncio import tqdm_asyncio
import nest_asyncio

# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Mount Google Drive
try:
    drive.mount('/content/drive')
except Exception as e:
    logging.warning(f"Could not mount Google Drive: {e}. Using local Colab storage.")

print("Imports and basic setup complete.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Imports and basic setup complete.


## Configuration

Define paths, vLLM settings, models to test, and generation parameters.

In [3]:
# Cell 2: Configuration Class

@dataclass
class Config:
    """Configuration settings for the qualitative evaluation."""
    # --- Paths ---
    # Use the same base path as the original notebook for consistency
    gdrive_base_path: str = "/content/drive/MyDrive/Colab_Notebooks/Multi_MATH_Eval"
    results_dir_name: str = "results"
    models_dir_name: str = "models"
    qualitative_results_subdir: str = "qualitative_results" # Subdirectory for these results

    # --- vLLM Configuration ---
    vllm_port: int = 8000
    gpu_utilization: float = 0.9
    vllm_log_file_template: str = "vllm_server_{model_name}_qualitative.log" # Relative to base path

    # --- Model Configuration (Copy from original notebook or define needed models) ---
    model_configs: Dict[str, str] = field(default_factory=lambda: {
        "deepseek-1.5b-qwen-base": "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
        "deepseek-1.5b-qwen-ft-control": "/content/drive/MyDrive/Colab_Notebooks/LiveBenchRun/models/deepseek-r1-distill-qwen-1.5b-ft-control",
        "deepseek-1.5b-scrambled": "/content/drive/MyDrive/Colab_Notebooks/LiveBenchRun/models/deepseek-r1-distill-qwen-1.5b-scrambled",
        "deepseek-1.5b-val-modified": "/content/drive/MyDrive/Colab_Notebooks/LiveBenchRun/models/deepseek-r1-distill-qwen-1.5b-val-modified",
        "deepseek-1.5b-length-val-modified": "/content/drive/MyDrive/Colab_Notebooks/LiveBenchRun/models/deepseek-r1-distill-qwen-1.5b-length-val-modified",
        "deepseek-1.5b-gradient-ascent": "/content/drive/MyDrive/Colab_Notebooks/LiveBenchRun/models/deepseek-r1-distill-qwen-1.5b-gradient-ascent",
        "deepseek-r1-distill-qwen-1.5b-reduced-eos-gradient-ascent": "/content/drive/MyDrive/Colab_Notebooks/LiveBenchRun/models/deepseek-r1-distill-qwen-1.5b-reduced-eos-gradient-ascent",
        # Add or remove models as needed for qualitative testing
    })

    # --- Generation Parameters ---
    max_new_tokens: int = 8192 # Adjust as needed for qualitative prompts
    temperature: float = 0.6
    top_p: float = 1.0
    parallel_requests: int = 32

    # --- Derived Paths (Initialized after creation) ---
    results_dir: str = ""
    models_base_path: str = ""

    def __post_init__(self):
        """Set derived paths."""
        # Use local path if Drive isn't mounted
        if not os.path.exists("/content/drive/MyDrive"):
            self.gdrive_base_path = f"/content/Qualitative_Eval_Local"
            logging.warning(f"Using local base path: {self.gdrive_base_path}")

        # Specific results directory for qualitative outputs
        self.results_dir = os.path.join(self.gdrive_base_path, self.results_dir_name, self.qualitative_results_subdir)
        self.models_base_path = os.path.join(self.gdrive_base_path, self.models_dir_name)

        os.makedirs(self.gdrive_base_path, exist_ok=True)
        os.makedirs(self.results_dir, exist_ok=True)
        os.makedirs(self.models_base_path, exist_ok=True)

        logging.info(f"Google Drive Base Path: {self.gdrive_base_path}")
        logging.info(f"Qualitative Results Path: {self.results_dir}")
        logging.info(f"Models Base Path: {self.models_base_path}")
        logging.info(f"Parallel requests: {self.parallel_requests}")

# Instantiate the config
config = Config()

## vLLM Server Manager

Class responsible for starting and stopping the vLLM OpenAI-compatible API server for each model.

In [4]:
# Cell 3: vLLM Server Manager Class (Copied from Original_Dataset_Results)

class VLLMServerManager:
    """Manages the lifecycle of the vLLM OpenAI API server."""

    def __init__(self, config: Config):
        self.config = config
        self.current_log_file = None

    def stop(self):
        """Attempts to stop the vLLM server using pkill."""
        port = self.config.vllm_port
        logging.info(f"Attempting to stop vLLM server on port {port} using pkill...")
        self._pkill_server(port)
        self.current_log_file = None
        time.sleep(5) # Give OS time to release port

    def _pkill_server(self, port: int):
        """Uses pkill to find and kill the server process."""
        try:
            command = f"pkill -15 -f 'vllm.*--port {port}'"
            subprocess.run(command, shell=True, check=False, capture_output=True)
            logging.info(f"Sent SIGTERM signal via pkill to process listening on port {port}.")
        except Exception as e:
            logging.error(f"Error trying to stop vLLM server with pkill: {e}")

    def start(self, model_weights_path: str, model_name: str) -> bool:
        """Starts the vLLM server in the background using IPython.get_ipython().system() and checks readiness."""
        self.stop() # Ensure any previous server is stopped

        port = self.config.vllm_port
        gpu_util = self.config.gpu_utilization
        log_file_name = self.config.vllm_log_file_template.format(model_name=model_name)
        # Save log in the main base path, not results dir
        self.current_log_file = os.path.join(self.config.gdrive_base_path, log_file_name)

        logging.info(f"Starting vLLM server for model: {model_name} from path: {model_weights_path}")
        logging.info(f"Saving server log to: {self.current_log_file}")

        if os.path.exists(self.current_log_file):
            os.remove(self.current_log_file)

        # Construct the command list (without nohup)
        command_list = [
            "python", "-m", "vllm.entrypoints.openai.api_server",
            "--model", model_weights_path,
            "--served-model-name", model_name,
            "--host", "0.0.0.0",
            "--port", str(port),
            "--tensor-parallel-size", "1",
            "--gpu-memory-utilization", str(gpu_util),
            "--trust-remote-code",
        ]

        # Properly quote arguments and create the shell command string for background execution
        command_str = " ".join(shlex.quote(arg) for arg in command_list)
        # Add redirection and background execution
        full_command_str = f"{command_str} > {shlex.quote(self.current_log_file)} 2>&1 &"

        try:
            ipython = IPython.get_ipython()
            if ipython is None:
                logging.error("Not running in an IPython environment. Cannot use IPython.system().")
                return False

            logging.info(f"Executing command via IPython.system(): {full_command_str}")
            # Execute the command in the background using IPython's system call
            ipython.system(full_command_str)

            # Wait a brief moment to allow the process to potentially start/fail quickly
            time.sleep(2)

            logging.info(f"vLLM server command issued via IPython. Waiting for initialization...")
            # Proceed to check readiness (no process object to track)
            return self._wait_for_server_ready(model_name)

        except Exception as e:
            logging.error(f"Failed to start vLLM server process via IPython.system(): {e}")
            traceback.print_exc()
            self.current_log_file = None
            return False

    def _wait_for_server_ready(self, model_name: str) -> bool:
        """Checks if the server is up and the specified model is loaded."""
        max_wait = 500
        start_time = time.time()
        api_url = f"http://localhost:{self.config.vllm_port}/v1/models"

        while time.time() - start_time < max_wait:
            try:
                response = requests.get(api_url, timeout=10)
                if response.status_code == 200:
                    response_data = response.json()
                    loaded_models = [m.get('id') for m in response_data.get('data', [])]
                    if model_name in loaded_models:
                        logging.info(f"vLLM server is ready and model '{model_name}' is served.")
                        return True
                    else:
                        logging.warning(f"vLLM server responding, but model '{model_name}' not confirmed yet: {loaded_models}. Retrying...")
                else:
                     logging.warning(f"Server not ready yet (status code {response.status_code}). Waiting...")
            except requests.exceptions.ConnectionError:
                 logging.warning(f"Server not reachable yet (Connection Error). Waiting...")
            except requests.exceptions.RequestException as req_e:
                logging.warning(f"Server not reachable yet ({req_e}). Waiting...")
            except Exception as e:
                logging.error(f"Error checking server status: {e}. Waiting...")
            time.sleep(5)

        logging.error(f"vLLM server failed to start/serve model '{model_name}' within {max_wait} seconds. Check log: {self.current_log_file}")
        self._log_server_tail()
        self.stop() # Attempt cleanup
        return False

    def _log_server_tail(self, lines=20):
        """Logs the last few lines of the server log file."""
        if not self.current_log_file or not os.path.exists(self.current_log_file):
            logging.error("Server log file not found.")
            return
        try:
            with open(self.current_log_file, 'r') as f:
                log_lines = f.readlines()
            logging.error(f"--- Last {min(lines, len(log_lines))} lines of vLLM log ({self.current_log_file}) ---")
            for line in log_lines[-lines:]:
                logging.error(f"VLLM LOG: {line.strip()}")
            logging.error("--- End of log tail ---")
        except Exception as log_e:
            logging.error(f"Could not read log file {self.current_log_file}: {log_e}")

# Instantiate the server manager
vllm_manager = VLLMServerManager(config)

## Model Manager

Class responsible for downloading models from Hugging Face Hub if they don't exist locally and providing their paths.

In [5]:
# Cell 4: Model Manager Class (Copied from Original_Dataset_Results)

class ModelManager:
    """Handles downloading and path management for models."""

    def __init__(self, config: Config):
        self.config = config

    def get_model_path(self, local_model_name: str, hf_id_or_path: str) -> Optional[str]:
        """Determines the local path for a model, downloading if necessary."""
        if os.path.isdir(hf_id_or_path):
            logging.info(f"Using pre-defined local path for {local_model_name}: {hf_id_or_path}")
            return hf_id_or_path
        else:
            # Use the models_base_path from config
            local_path = os.path.join(self.config.models_base_path, local_model_name)
            if self._download_if_needed(hf_repo_id=hf_id_or_path, local_path=local_path):
                return local_path
            else:
                logging.error(f"Failed to obtain model weights for {local_model_name} ({hf_id_or_path}).")
                return None

    def _download_if_needed(self, hf_repo_id: str, local_path: str) -> bool:
        """Downloads model weights if they don't exist locally."""
        logging.info(f"Checking model weights for '{hf_repo_id}' at: {local_path}")
        config_file = os.path.join(local_path, "config.json")

        if not os.path.exists(local_path) or not os.path.exists(config_file):
            logging.info(f"Model weights directory is incomplete or does not exist. Downloading '{hf_repo_id}'...")
            try:
                snapshot_download(
                    repo_id=hf_repo_id,
                    local_dir=local_path,
                    local_dir_use_symlinks=False,
                    max_workers=4
                )
                logging.info(f"Model download complete for {hf_repo_id}.")
                # Verify contents briefly
                logging.info(f"--- Contents of {local_path} (first few items) ---")
                for item in os.listdir(local_path)[:10]:
                     logging.info(f"  - {item}")
                return True
            except Exception as e:
                logging.error(f"Error during model download for {hf_repo_id}: {e}")
                # Clean up potentially incomplete download
                if os.path.exists(local_path):
                    try:
                        import shutil
                        shutil.rmtree(local_path)
                        logging.info(f"Removed incomplete download directory: {local_path}")
                    except Exception as rm_e:
                        logging.error(f"Failed to remove incomplete directory {local_path}: {rm_e}")
                return False
        else:
            logging.info(f"Model weights directory already exists. Skipping download for {hf_repo_id}.")
            return True

# Instantiate the model manager
model_manager = ModelManager(config)

## Define Prompts

Specify the list of prompts that will be sent to each model.

In [6]:
# Cell 5: Define Prompts

prompts_to_test = [
    # 1. Factual Recall & Explanation
    "Explain the concept of quantum entanglement in simple terms, as if you were explaining it to a high school student.",

    # 2. Creative Writing & Style Imitation (Updated)
    "Write a short poem (4-6 lines) about a rainy day from the perspective of a cat, in the style of Dr. Seuss.",

    # 3. Summarization & Information Synthesis
    "Summarize the main arguments for and against the use of nuclear energy in five bullet points.",

    # 4. Problem Solving & Logical Reasoning
    "A farmer has 17 sheep. All but 9 die. How many sheep are left?",

    # 5. Code Generation & Explanation
    "Write a simple Python function that takes a list of strings and returns a new list containing only the strings that are longer than 5 characters. Add comments explaining the code.",

    # 6. Brainstorming & Idea Generation
    "Brainstorm five unconventional and eco-friendly alternatives to plastic packaging.",

    # 7. Role-playing & Scenario Simulation
    "Imagine you are a historian analyzing a newly discovered diary from a citizen living through the French Revolution. Write a brief entry describing your initial thoughts and the potential significance of the find.",

    # 8. Comparison & Contrast (Updated)
    "Compare and contrast two different common leadership styles (e.g., autocratic vs. democratic, or transformational vs. transactional). Highlight one key advantage and one key disadvantage of each style.",

    # 9. Instruction Following & Formatting
    "Create a short grocery list containing items for making spaghetti bolognese. Organize the list into three categories: Produce, Meat, and Pantry Staples.",

    # 10. Open-ended Philosophical Question
    "If humanity were to establish a colony on another planet, what single principle do you think should be most central to its new governing charter, and why?"
]

logging.info(f"Defined {len(prompts_to_test)} prompts for qualitative evaluation.")

## Query Logic and Orchestration

Contains the asynchronous function to query a model with a single prompt and the main function to orchestrate the evaluation across all models and prompts.

In [7]:
# Cell 6: Query Logic and Orchestration

async def query_model(client: AsyncOpenAI, prompt: str, model_name: str, semaphore: asyncio.Semaphore) -> Optional[Dict[str, Any]]:
    """Sends a single prompt to the specified model asynchronously."""
    async with semaphore:
        logging.info(f"Sending prompt to {model_name}: '{prompt[:50]}...'" )
        try:
            start_time = time.time()
            response = await client.chat.completions.create(
                model=model_name,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=config.max_new_tokens,
                temperature=config.temperature,
                top_p=config.top_p,
            )
            end_time = time.time()
            generated_text = response.choices[0].message.content.strip()
            latency = end_time - start_time

            result = {
                "model": model_name,
                "prompt": prompt,
                "generation": generated_text,
                "latency": latency
            }
            logging.info(f"Received response from {model_name} in {latency:.2f}s")
            return result

        except Exception as e:
            logging.error(f"Error querying model {model_name} for prompt '{prompt[:50]}...': {e}")
            traceback.print_exc()
            return {
                "model": model_name,
                "prompt": prompt,
                "generation": f"ERROR: {e}",
                "latency": -1.0
            }

async def run_qualitative_evaluation(prompts: List[str]) -> List[Dict[str, Any]]:
    """Runs the qualitative evaluation loop across models and prompts."""
    all_qualitative_results = []
    semaphore = asyncio.Semaphore(config.parallel_requests)

    for local_model_name, hf_model_id_or_path in config.model_configs.items():
        logging.info(f"\n--- Processing Model for Qualitative Eval: {local_model_name} ---")
        model_start_time = time.time()

        model_weights_path = model_manager.get_model_path(local_model_name, hf_model_id_or_path)
        if not model_weights_path:
            logging.error(f"Skipping model {local_model_name} due to issues obtaining weights.")
            # Add error entries for this model
            for prompt in prompts:
                 all_qualitative_results.append({
                    "model": local_model_name,
                    "prompt": prompt,
                    "generation": "ERROR: Could not load model weights.",
                    "latency": -1.0
                 })
            continue

        server_started = False
        try:
            server_started = vllm_manager.start(model_weights_path, local_model_name)

            if server_started:
                client = AsyncOpenAI(
                    base_url=f"http://localhost:{config.vllm_port}/v1",
                    api_key="dummy-key"
                )

                tasks = [query_model(client, prompt, local_model_name, semaphore) for prompt in prompts]
                model_results = await tqdm_asyncio.gather(*tasks, desc=f"Querying {local_model_name}")
                all_qualitative_results.extend([res for res in model_results if res is not None])

            else:
                logging.error(f"Skipping queries for {local_model_name} because vLLM server failed to start.")
                # Add error entries for this model
                for prompt in prompts:
                    all_qualitative_results.append({
                        "model": local_model_name,
                        "prompt": prompt,
                        "generation": "ERROR: vLLM server failed to start.",
                        "latency": -1.0
                    })

        except Exception as e:
            logging.error(f"An unexpected error occurred during evaluation for {local_model_name}: {e}")
            traceback.print_exc()
            # Add error entries for this model if something else went wrong
            for prompt in prompts:
                 if not any(r['model'] == local_model_name and r['prompt'] == prompt for r in all_qualitative_results):
                     all_qualitative_results.append({
                        "model": local_model_name,
                        "prompt": prompt,
                        "generation": f"ERROR: {e}",
                        "latency": -1.0
                     })
        finally:
            if server_started:
                 vllm_manager.stop()
                 logging.info(f"Stopped vLLM server for {local_model_name}.")
            model_end_time = time.time()
            logging.info(f"--- Finished processing model {local_model_name} in {model_end_time - model_start_time:.2f} seconds ---")

    # Final cleanup
    vllm_manager.stop()
    logging.info("\n--- Qualitative Evaluation Loop Completed ---")
    return all_qualitative_results

## Save Results

Function to save the collected prompt-generation pairs to a JSON file.

In [8]:
# Cell 7: Save Results

def save_qualitative_results(results_list: List[Dict[str, Any]]):
    """Saves the qualitative results to a JSON file."""
    if not results_list:
        logging.warning("No qualitative results generated. Skipping saving.")
        return

    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    results_filename = f"qualitative_evaluation_results_{timestamp}.json"
    results_filepath = os.path.join(config.results_dir, results_filename)

    try:
        with open(results_filepath, 'w', encoding='utf-8') as f:
            json.dump(results_list, f, ensure_ascii=False, indent=4)
        logging.info(f"Qualitative results saved successfully to JSON: {results_filepath}")
        print(f"\n--- Sample Qualitative Result ---")
        print(json.dumps(results_list[0], indent=2))
    except Exception as e:
        logging.error(f"Failed to save qualitative results to JSON: {e}")

## Run Pipeline

Execute the main evaluation function and save the results.

In [9]:
# Cell 8: Run Pipeline

collected_qualitative_results = []

async def main():
    global collected_qualitative_results
    logging.info("--- Starting Qualitative Evaluation Pipeline ---")
    collected_qualitative_results = await run_qualitative_evaluation(prompts_to_test)
    save_qualitative_results(collected_qualitative_results)
    logging.info("--- Qualitative Evaluation Pipeline Finished ---")

# Apply nest_asyncio patch and run the pipeline
nest_asyncio.apply()
asyncio.run(main())

Querying deepseek-1.5b-qwen-base: 100%|██████████| 10/10 [00:17<00:00,  1.78s/it]
Querying deepseek-1.5b-qwen-ft-control: 100%|██████████| 10/10 [00:17<00:00,  1.78s/it]
Querying deepseek-1.5b-scrambled: 100%|██████████| 10/10 [00:43<00:00,  4.38s/it]
Querying deepseek-1.5b-val-modified: 100%|██████████| 10/10 [02:10<00:00, 13.04s/it]
Querying deepseek-1.5b-length-val-modified: 100%|██████████| 10/10 [00:28<00:00,  2.87s/it]
Querying deepseek-1.5b-gradient-ascent: 100%|██████████| 10/10 [00:09<00:00,  1.03it/s]
Querying deepseek-r1-distill-qwen-1.5b-reduced-eos-gradient-ascent: 100%|██████████| 10/10 [00:11<00:00,  1.16s/it]



--- Sample Qualitative Result ---
{
  "model": "deepseek-1.5b-qwen-base",
  "prompt": "Explain the concept of quantum entanglement in simple terms, as if you were explaining it to a high school student.",
  "generation": "Okay, so I need to explain quantum entanglement to a high school student. Hmm, I remember from my classes that it's something to do with particles being connected in a way that their properties are linked. But I'm not entirely sure I grasp all the details. Let me try to break it down.\n\nFirst, I think entanglement is when particles, like electrons or photons, share information. But wait, how exactly does that work? Do they communicate with each other, or are there physical connections? I remember something about particles being in multiple places at once, but that doesn't sound right. Maybe it's more about the state of the particles.\n\nI think entanglement has to do with quantum superposition. When a particle is in a superposition of states, like being in two place