From 14c075c450410f0969880e18659e90a65980fedb Mon Sep 17 00:00:00 2001 From: Simon Strandgaard Date: Mon, 9 Mar 2026 13:59:03 +0100 Subject: [PATCH] Add LLM-as-judge task output scorer for pipeline quality evaluation Foundation for autonomous prompt optimization (#94) and A/B testing promotion (#59). Scores pipeline task outputs against a 5-dimension rubric (Specificity, Actionability, Completeness, Internal Consistency, Conciseness) using structured LLM output. Includes CLI helper for scoring tasks from completed run directories. Co-Authored-By: Claude Opus 4.6 --- .../worker_plan_internal/scoring/__init__.py | 0 .../scoring/score_run_task.py | 116 +++++++++ .../scoring/task_output_scorer.py | 245 ++++++++++++++++++ 3 files changed, 361 insertions(+) create mode 100644 worker_plan/worker_plan_internal/scoring/__init__.py create mode 100644 worker_plan/worker_plan_internal/scoring/score_run_task.py create mode 100644 worker_plan/worker_plan_internal/scoring/task_output_scorer.py diff --git a/worker_plan/worker_plan_internal/scoring/__init__.py b/worker_plan/worker_plan_internal/scoring/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/worker_plan/worker_plan_internal/scoring/score_run_task.py b/worker_plan/worker_plan_internal/scoring/score_run_task.py new file mode 100644 index 00000000..8ed2da75 --- /dev/null +++ b/worker_plan/worker_plan_internal/scoring/score_run_task.py @@ -0,0 +1,116 @@ +""" +CLI helper: score a single task output from a completed run directory. + +Loads a task output JSON file from a run_id_dir, reads the plan prompt +from 001-2-plan.txt, and scores it using TaskOutputScorer. + +PROMPT> python -m worker_plan_internal.scoring.score_run_task --run-id-dir /path/to/run --task-filename 014-1-swot_analysis_raw.json +""" +import argparse +import json +import logging +import sys +from pathlib import Path +from worker_plan_internal.scoring.task_output_scorer import TaskOutputScorer +from worker_plan_internal.llm_factory import get_llm + +logger = logging.getLogger(__name__) + + +def load_plan_prompt(run_id_dir: Path) -> str: + """Load the plan prompt from the run directory.""" + plan_file = run_id_dir / "001-2-plan.txt" + if not plan_file.exists(): + raise FileNotFoundError(f"Plan prompt file not found: {plan_file}") + return plan_file.read_text(encoding="utf-8").strip() + + +def load_task_output(run_id_dir: Path, task_filename: str) -> dict: + """Load a task output JSON file from the run directory.""" + task_file = run_id_dir / task_filename + if not task_file.exists(): + raise FileNotFoundError(f"Task output file not found: {task_file}") + return json.loads(task_file.read_text(encoding="utf-8")) + + +def derive_task_name(task_filename: str) -> str: + """Derive a human-readable task name from the filename. + + Example: '014-1-swot_analysis_raw.json' -> 'swot_analysis_raw' + """ + stem = Path(task_filename).stem + # Strip leading NNN-N- prefix + parts = stem.split("-", 2) + if len(parts) >= 3 and parts[0].isdigit() and parts[1].isdigit(): + return parts[2] + return stem + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Score a pipeline task output using an LLM judge." + ) + parser.add_argument( + "--run-id-dir", + type=str, + required=True, + help="Path to the completed run directory.", + ) + parser.add_argument( + "--task-filename", + type=str, + required=True, + help="Filename of the task output JSON within the run directory.", + ) + parser.add_argument( + "--llm-name", + type=str, + default=None, + help="LLM model name to use as judge. Uses default if not specified.", + ) + parser.add_argument( + "--output", + type=str, + default=None, + help="Optional output file path for the score JSON.", + ) + args = parser.parse_args() + + logging.basicConfig(level=logging.WARNING) + logging.getLogger("worker_plan_internal.scoring").setLevel(logging.INFO) + + run_id_dir = Path(args.run_id_dir) + if not run_id_dir.is_dir(): + print(f"Error: run-id-dir does not exist: {run_id_dir}", file=sys.stderr) + sys.exit(1) + + plan_prompt = load_plan_prompt(run_id_dir) + task_output = load_task_output(run_id_dir, args.task_filename) + task_name = derive_task_name(args.task_filename) + + logger.info(f"Scoring task '{task_name}' from {run_id_dir}") + + if args.llm_name: + llm = get_llm(args.llm_name) + else: + llm = get_llm() + + result = TaskOutputScorer.score( + llm=llm, + task_output_json=task_output, + plan_prompt=plan_prompt, + task_name=task_name, + ) + + result_dict = result.to_dict(include_system_prompt=False, include_user_prompt=False) + result_json = json.dumps(result_dict, indent=2) + + if args.output: + Path(args.output).write_text(result_json, encoding="utf-8") + print(f"Score saved to: {args.output}") + else: + print(result_json) + + +if __name__ == "__main__": + main() diff --git a/worker_plan/worker_plan_internal/scoring/task_output_scorer.py b/worker_plan/worker_plan_internal/scoring/task_output_scorer.py new file mode 100644 index 00000000..2b104003 --- /dev/null +++ b/worker_plan/worker_plan_internal/scoring/task_output_scorer.py @@ -0,0 +1,245 @@ +""" +LLM-as-judge that scores a single pipeline task output against a 5-dimension rubric. + +This is the core building block for autonomous prompt optimization (#94) +and A/B testing promotion (#59). + +PROMPT> python -m worker_plan_internal.scoring.task_output_scorer +""" +import json +import time +import logging +from math import ceil +from pathlib import Path +from dataclasses import dataclass +from typing import Optional +from pydantic import BaseModel, Field +from llama_index.core.llms import ChatMessage, MessageRole +from llama_index.core.llms.llm import LLM +from worker_plan_internal.format_json_for_use_in_query import format_json_for_use_in_query + +logger = logging.getLogger(__name__) + + +class ScoreDimension(BaseModel): + """A single scoring dimension with its score and justification.""" + dimension: str = Field(description="Name of the scoring dimension.") + score: int = Field(description="Score from 1 to 10.", ge=1, le=10) + justification: str = Field( + description="Why this score was given. ~30 words." + ) + + +class TaskOutputScore(BaseModel): + """Complete score for a task output across all dimensions.""" + dimensions: list[ScoreDimension] = Field( + description="Scores for each evaluation dimension." + ) + composite_score: float = Field( + description="Weighted average score across all dimensions (1.0-10.0)." + ) + overall_assessment: str = Field( + description="Brief overall assessment of the task output quality. 2-3 sentences." + ) + + +DEFAULT_WEIGHTS: dict[str, float] = { + "Specificity": 0.25, + "Actionability": 0.25, + "Completeness": 0.20, + "Internal Consistency": 0.15, + "Conciseness": 0.15, +} + +SCORER_SYSTEM_PROMPT = """You are an expert evaluator of AI-generated project planning outputs. \ +Your task is to score a single task output on exactly 5 dimensions using a 1-10 scale. + +DIMENSIONS AND WEIGHTS: +1. Specificity (weight 0.25) — Is the output concrete and grounded in the project context? \ +Does it reference specific details from the plan prompt rather than generic advice? \ +Score 1 = entirely generic, 10 = deeply tailored to this project. + +2. Actionability (weight 0.25) — Can someone act on this output directly? \ +Are there clear next steps, owners, or deliverables? \ +Score 1 = vague platitudes, 10 = ready to execute. + +3. Completeness (weight 0.20) — Are the obvious aspects covered? \ +Are there glaring omissions given the task type? \ +Score 1 = major gaps, 10 = thorough coverage. + +4. Internal Consistency (weight 0.15) — Does the output align with the upstream plan context? \ +Are there contradictions or unsupported leaps? \ +Score 1 = contradictory, 10 = fully coherent. + +5. Conciseness (weight 0.15) — Is the output free of filler, redundancy, and padding? \ +Score 1 = extremely bloated, 10 = every word earns its place. + +SCORING RULES: +- Score each dimension independently on a 1-10 integer scale. +- Provide a ~30 word justification for each score. +- Compute composite_score as the weighted average: \ +sum(score_i * weight_i) for all dimensions. +- Write an overall_assessment of 2-3 sentences. +- Be calibrated: reserve 9-10 for genuinely excellent output, 1-3 for poor output. \ +Most competent outputs should land in the 5-7 range. +""" + + +@dataclass +class TaskOutputScorer: + """Scores a single pipeline task output using an LLM judge.""" + system_prompt: str + user_prompt: str + response: TaskOutputScore + metadata: dict + markdown: str + + @classmethod + def score( + cls, + llm: LLM, + task_output_json: dict, + plan_prompt: str, + task_name: str, + weights: Optional[dict[str, float]] = None, + ) -> 'TaskOutputScorer': + """ + Score a task output against the 5-dimension rubric. + + Args: + llm: LLM instance to use as judge. + task_output_json: The task output to score (dict from JSON file). + plan_prompt: The original plan prompt for context. + task_name: Name of the pipeline task that produced this output. + weights: Optional custom weights. Defaults to DEFAULT_WEIGHTS. + """ + if not isinstance(llm, LLM): + raise ValueError("Invalid LLM instance.") + if not isinstance(task_output_json, dict): + raise ValueError("task_output_json must be a dict.") + if not isinstance(plan_prompt, str) or not plan_prompt.strip(): + raise ValueError("plan_prompt must be a non-empty string.") + + effective_weights = weights or DEFAULT_WEIGHTS + + # Strip metadata before presenting to judge + cleaned_output = format_json_for_use_in_query(task_output_json) + + system_prompt = SCORER_SYSTEM_PROMPT + + user_prompt = f"""Score the following task output. + +TASK NAME: {task_name} + +PLAN PROMPT (the original project description): +{plan_prompt} + +TASK OUTPUT TO SCORE: +{cleaned_output} + +WEIGHTS TO USE: +{json.dumps(effective_weights, indent=2)} + +Score each dimension, compute the weighted composite_score, and provide an overall_assessment.""" + + chat_message_list = [ + ChatMessage(role=MessageRole.SYSTEM, content=system_prompt), + ChatMessage(role=MessageRole.USER, content=user_prompt), + ] + + sllm = llm.as_structured_llm(TaskOutputScore) + start_time = time.perf_counter() + try: + chat_response = sllm.chat(chat_message_list) + except Exception as e: + logger.debug(f"LLM scoring interaction failed: {e}") + logger.error("LLM scoring interaction failed.", exc_info=True) + raise ValueError("LLM scoring interaction failed.") from e + + end_time = time.perf_counter() + duration = int(ceil(end_time - start_time)) + + task_output_score: TaskOutputScore = chat_response.raw + + metadata = dict(llm.metadata) + metadata["llm_classname"] = llm.class_name() + metadata["duration"] = duration + metadata["task_name"] = task_name + + markdown = cls._to_markdown(task_output_score, task_name) + + return cls( + system_prompt=system_prompt, + user_prompt=user_prompt, + response=task_output_score, + metadata=metadata, + markdown=markdown, + ) + + def to_dict( + self, + include_metadata: bool = True, + include_system_prompt: bool = True, + include_user_prompt: bool = True, + ) -> dict: + d: dict = { + "response": self.response.model_dump(), + } + if include_metadata: + d["metadata"] = self.metadata + if include_system_prompt: + d["system_prompt"] = self.system_prompt + if include_user_prompt: + d["user_prompt"] = self.user_prompt + return d + + def save_raw(self, file_path: str) -> None: + Path(file_path).write_text(json.dumps(self.to_dict(), indent=2)) + + def save_markdown(self, output_file_path: str) -> None: + with open(output_file_path, 'w', encoding='utf-8') as f: + f.write(self.markdown) + + @staticmethod + def _to_markdown(score: TaskOutputScore, task_name: str) -> str: + rows = [] + rows.append(f"# Task Output Score: {task_name}\n") + rows.append(f"**Composite Score: {score.composite_score:.1f}/10**\n") + rows.append("## Dimension Scores\n") + rows.append("| Dimension | Score | Justification |") + rows.append("|---|---|---|") + for dim in score.dimensions: + rows.append(f"| {dim.dimension} | {dim.score}/10 | {dim.justification} |") + rows.append(f"\n## Overall Assessment\n") + rows.append(score.overall_assessment) + return "\n".join(rows) + + +if __name__ == "__main__": + from worker_plan_internal.llm_factory import get_llm + + logging.basicConfig(level=logging.WARNING) + logging.getLogger("__main__").setLevel(logging.INFO) + logging.getLogger("httpx").setLevel(logging.WARNING) + + # Example: score a sample task output + sample_output = { + "strengths": ["Strong team", "Clear vision"], + "weaknesses": ["Limited budget"], + "opportunities": ["Growing market"], + "threats": ["Competition"], + "metadata": {"duration": 42}, + } + sample_plan_prompt = "Build a SaaS platform for project management targeting small teams." + + llm = get_llm("ollama-llama3.1") + result = TaskOutputScorer.score( + llm=llm, + task_output_json=sample_output, + plan_prompt=sample_plan_prompt, + task_name="swot_analysis", + ) + + print(json.dumps(result.to_dict(include_system_prompt=False, include_user_prompt=False), indent=2)) + print("\nMarkdown:") + print(result.markdown)