From c891d6ce4e0420d8a3d618496b9440dc8f02c9d9 Mon Sep 17 00:00:00 2001 From: Simon Strandgaard Date: Sun, 3 May 2026 16:44:15 +0200 Subject: [PATCH] worker_plan: represent plan_raw.json as InitialPlanRawTask MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `plan_raw.json` is supplied by the caller before the pipeline runs. Three tasks needed to read it: - `SetupTask` (always, to produce `plan.txt`) - `PromptAdherenceTask` (wants the bare prompt) - `ScreenPlanningPromptTask` (wants the bare prompt) Each was reaching directly into the run directory: raw_path = self.run_id_dir / FilenameEnum.INITIAL_PLAN_RAW.value with `SetupTask` additionally hand-rolling its own existence check. Both `PromptAdherence` and `ScreenPlanningPrompt` listed `SetupTask` in `requires()` purely for ordering, even though neither actually consumed `plan.txt`. Introduce `InitialPlanRawTask` (`luigi.ExternalTask`) that exposes `plan_raw.json` as a Luigi target. Consumers now go through the DAG: def requires(self): return self.clone(InitialPlanRawTask) def run_with_llm(self, llm): plan_file = PlanFile.load(self.input().path) plan_prompt = plan_file.plan_prompt `SetupTask`'s manual `FileNotFoundError` check is removed — Luigi's `ExternalTask.complete()` already gates the pipeline on the file's existence. `PromptAdherenceTask` swaps its `'setup'` requires entry for `'plan_raw'`, dropping the unused dependency on the formatted `plan.txt`. --- .../plan/nodes/initial_plan_raw.py | 16 ++++++++++++++++ .../plan/nodes/prompt_adherence.py | 7 +++---- .../plan/nodes/screen_planning_prompt.py | 9 +++++---- .../worker_plan_internal/plan/nodes/setup.py | 12 +++++------- 4 files changed, 29 insertions(+), 15 deletions(-) create mode 100644 worker_plan/worker_plan_internal/plan/nodes/initial_plan_raw.py diff --git a/worker_plan/worker_plan_internal/plan/nodes/initial_plan_raw.py b/worker_plan/worker_plan_internal/plan/nodes/initial_plan_raw.py new file mode 100644 index 000000000..dbc60a22f --- /dev/null +++ b/worker_plan/worker_plan_internal/plan/nodes/initial_plan_raw.py @@ -0,0 +1,16 @@ +"""InitialPlanRawTask - ExternalTask exposing the user-supplied plan_raw.json.""" +from pathlib import Path +import luigi +from worker_plan_api.filenames import FilenameEnum + + +class InitialPlanRawTask(luigi.ExternalTask): + """The user-supplied plan_raw.json that must exist before the pipeline starts. + + Tasks that want the bare prompt (rather than SetupTask's + formatted plan.txt) require this and read it via PlanFile. + """ + run_id_dir = luigi.Parameter() + + def output(self): + return luigi.LocalTarget(str(Path(self.run_id_dir) / FilenameEnum.INITIAL_PLAN_RAW.value)) diff --git a/worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py b/worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py index 95c83b0cc..5f869a2d5 100644 --- a/worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py +++ b/worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py @@ -4,7 +4,7 @@ from worker_plan_internal.llm_util.llm_executor import LLMExecutor from worker_plan_api.filenames import FilenameEnum from worker_plan_api.plan_file import PlanFile -from worker_plan_internal.plan.nodes.setup import SetupTask +from worker_plan_internal.plan.nodes.initial_plan_raw import InitialPlanRawTask from worker_plan_internal.plan.nodes.project_plan import ProjectPlanTask from worker_plan_internal.plan.nodes.executive_summary import ExecutiveSummaryTask from worker_plan_internal.plan.nodes.consolidate_assumptions_markdown import ConsolidateAssumptionsMarkdownTask @@ -21,7 +21,7 @@ def output(self): def requires(self): return { - 'setup': self.clone(SetupTask), + 'plan_raw': self.clone(InitialPlanRawTask), 'project_plan': self.clone(ProjectPlanTask), 'executive_summary': self.clone(ExecutiveSummaryTask), 'consolidate_assumptions_markdown': self.clone(ConsolidateAssumptionsMarkdownTask), @@ -30,8 +30,7 @@ def requires(self): def run_inner(self): llm_executor: LLMExecutor = self.create_llm_executor() - plan_raw_path = self.run_id_dir / FilenameEnum.INITIAL_PLAN_RAW.value - plan_file = PlanFile.load(str(plan_raw_path)) + plan_file = PlanFile.load(self.input()['plan_raw'].path) plan_prompt = plan_file.plan_prompt with self.input()['project_plan']['markdown'].open("r") as f: project_plan_markdown = f.read() diff --git a/worker_plan/worker_plan_internal/plan/nodes/screen_planning_prompt.py b/worker_plan/worker_plan_internal/plan/nodes/screen_planning_prompt.py index 17eddef3d..9467618f0 100644 --- a/worker_plan/worker_plan_internal/plan/nodes/screen_planning_prompt.py +++ b/worker_plan/worker_plan_internal/plan/nodes/screen_planning_prompt.py @@ -3,13 +3,14 @@ from worker_plan_internal.plan.run_plan_pipeline import PlanTask from worker_plan_internal.diagnostics.screen_planning_prompt import ScreenPlanningPrompt from worker_plan_api.filenames import FilenameEnum -from worker_plan_internal.plan.nodes.setup import SetupTask +from worker_plan_api.plan_file import PlanFile +from worker_plan_internal.plan.nodes.initial_plan_raw import InitialPlanRawTask class ScreenPlanningPromptTask(PlanTask): """Flag prompts as UNUSABLE when there is high confidence the prompt is garbage.""" def requires(self): - return self.clone(SetupTask) + return self.clone(InitialPlanRawTask) def output(self): return { @@ -18,8 +19,8 @@ def output(self): } def run_with_llm(self, llm: LLM) -> None: - with self.input().open("r") as f: - plan_prompt = f.read() + plan_file = PlanFile.load(self.input().path) + plan_prompt = plan_file.plan_prompt result = ScreenPlanningPrompt.execute(llm, plan_prompt) diff --git a/worker_plan/worker_plan_internal/plan/nodes/setup.py b/worker_plan/worker_plan_internal/plan/nodes/setup.py index 56ce171ea..152609faf 100644 --- a/worker_plan/worker_plan_internal/plan/nodes/setup.py +++ b/worker_plan/worker_plan_internal/plan/nodes/setup.py @@ -2,21 +2,19 @@ from worker_plan_internal.plan.run_plan_pipeline import PlanTask from worker_plan_api.filenames import FilenameEnum from worker_plan_api.plan_file import PlanFile +from worker_plan_internal.plan.nodes.initial_plan_raw import InitialPlanRawTask class SetupTask(PlanTask): """Read plan_raw.json and produce plan.txt from the template.""" + def requires(self): + return self.clone(InitialPlanRawTask) + def output(self): return self.local_target(FilenameEnum.INITIAL_PLAN) def run(self): - raw_path = self.run_id_dir / FilenameEnum.INITIAL_PLAN_RAW.value - if not raw_path.exists(): - raise FileNotFoundError( - f"Before starting the pipeline the '{FilenameEnum.INITIAL_PLAN_RAW.value}' file " - f"must be present in the run_id_dir: {self.run_id_dir!r}" - ) - plan_file = PlanFile.load(str(raw_path)) + plan_file = PlanFile.load(self.input().path) plan_text = plan_file.to_plan_text() with open(self.output().path, "w", encoding="utf-8") as f: f.write(plan_text)