From 9642665c322d2bc1d43c3180d830fc541e5cecc6 Mon Sep 17 00:00:00 2001 From: Thomas Hebrard Date: Wed, 16 Jul 2025 17:00:18 +0200 Subject: [PATCH 1/2] feature/input_memory (#173) --- CHANGELOG.md | 4 + pipelex/client/api_serializer.py | 31 +- pipelex/client/client.py | 16 +- pipelex/client/pipeline_request_factory.py | 42 +-- pipelex/client/pipeline_response_factory.py | 6 +- pipelex/client/protocol.py | 10 +- pipelex/core/working_memory_factory.py | 28 +- pipelex/exceptions.py | 8 + pipelex/pipe_works/pipe_job_factory.py | 3 +- pipelex/pipeline/execute.py | 13 + pipelex/pipeline/start.py | 12 + pyproject.toml | 2 +- .../pipelex/client/test_api_serialization.py | 352 +----------------- .../core/test_working_memory_factory.py | 174 +++++++++ uv.lock | 2 +- 15 files changed, 279 insertions(+), 424 deletions(-) create mode 100644 tests/unit/pipelex/core/test_working_memory_factory.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 99f4a74a1..404730691 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## [v0.6.1] - 2025-07-16 + +- Can execute pipelines with `input_memory`: It is a `CompactMemory: Dict[str, Dict[str, Any]]` + ## [v0.6.0] - 2025-07-12 ### Changed diff --git a/pipelex/client/api_serializer.py b/pipelex/client/api_serializer.py index 0cbd56163..8de04fea0 100644 --- a/pipelex/client/api_serializer.py +++ b/pipelex/client/api_serializer.py @@ -2,15 +2,13 @@ from decimal import Decimal from enum import Enum from pathlib import Path -from typing import Any, Dict, List, cast +from typing import Any, Dict, List, Optional, cast from pipelex.client.protocol import CompactMemory from pipelex.core.concept_native import NativeConcept from pipelex.core.pipe_output import PipeOutput -from pipelex.core.stuff_content import StuffContent, TextContent -from pipelex.core.stuff_factory import StuffContentFactory +from pipelex.core.stuff_content import TextContent from pipelex.core.working_memory import WorkingMemory -from pipelex.exceptions import ApiSerializationError class ApiSerializer: @@ -21,7 +19,7 @@ class ApiSerializer: FIELDS_TO_SKIP = ("__class__", "__module__") @classmethod - def serialize_working_memory_for_api(cls, working_memory: WorkingMemory) -> CompactMemory: + def serialize_working_memory_for_api(cls, working_memory: Optional[WorkingMemory] = None) -> CompactMemory: """ Convert WorkingMemory to API-ready format using kajson with proper datetime handling. @@ -32,6 +30,8 @@ def serialize_working_memory_for_api(cls, working_memory: WorkingMemory) -> Comp Dict ready for API transmission with datetime strings and no __class__/__module__ """ compact_memory: CompactMemory = {} + if working_memory is None: + return compact_memory for stuff_name, stuff in working_memory.root.items(): if stuff.concept_code == NativeConcept.TEXT.code: @@ -101,24 +101,3 @@ def _clean_and_format_content(cls, content: Any) -> Any: return str(content) # Convert Path to string representation else: return content - - @classmethod - def make_stuff_content_from_api_data(cls, concept_code: str, value: Dict[str, Any] | str) -> StuffContent: - """ - Create StuffContent from API data using concept code. - - Args: - concept_code: The concept code to determine the content type - value: The content value from API - - Returns: - StuffContent instance - - Raises: - ApiSerializationError: If concept cannot be resolved or content creation fails - """ - try: - return StuffContentFactory.make_stuffcontent_from_concept_code_with_fallback(concept_code=concept_code, value=value) - - except Exception as exc: - raise ApiSerializationError(f"Failed to create StuffContent for concept '{concept_code}': {exc}") from exc diff --git a/pipelex/client/client.py b/pipelex/client/client.py index 793507f0f..f5217315d 100644 --- a/pipelex/client/client.py +++ b/pipelex/client/client.py @@ -5,9 +5,10 @@ from pipelex.client.pipeline_request_factory import PipelineRequestFactory from pipelex.client.pipeline_response_factory import PipelineResponseFactory -from pipelex.client.protocol import PipelexProtocol, PipelineResponse +from pipelex.client.protocol import CompactMemory, PipelexProtocol, PipelineResponse from pipelex.core.pipe_run_params import PipeOutputMultiplicity from pipelex.core.working_memory import WorkingMemory +from pipelex.core.working_memory_factory import WorkingMemoryFactory from pipelex.exceptions import ClientAuthenticationError from pipelex.tools.environment import get_required_env @@ -75,10 +76,16 @@ async def execute_pipeline( self, pipe_code: str, working_memory: Optional[WorkingMemory] = None, + input_memory: Optional[CompactMemory] = None, output_name: Optional[str] = None, output_multiplicity: Optional[PipeOutputMultiplicity] = None, dynamic_output_concept_code: Optional[str] = None, ) -> PipelineResponse: + if working_memory and input_memory: + raise ValueError(f"working_memory and input_memory cannot be provided together to the API execute_pipeline {pipe_code=}") + + if input_memory is not None: + working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory) pipeline_request = PipelineRequestFactory.make_from_working_memory( working_memory=working_memory, output_name=output_name, @@ -93,10 +100,17 @@ async def start_pipeline( self, pipe_code: str, working_memory: Optional[WorkingMemory] = None, + input_memory: Optional[CompactMemory] = None, output_name: Optional[str] = None, output_multiplicity: Optional[PipeOutputMultiplicity] = None, dynamic_output_concept_code: Optional[str] = None, ) -> PipelineResponse: + if working_memory and input_memory: + raise ValueError(f"working_memory and input_memory cannot be provided together to the API start_pipeline {pipe_code=}") + + if input_memory is not None: + working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory) + pipeline_request = PipelineRequestFactory.make_from_working_memory( working_memory=working_memory, output_name=output_name, diff --git a/pipelex/client/pipeline_request_factory.py b/pipelex/client/pipeline_request_factory.py index d209cca19..6c198361b 100644 --- a/pipelex/client/pipeline_request_factory.py +++ b/pipelex/client/pipeline_request_factory.py @@ -1,11 +1,9 @@ from typing import Any, Dict, Optional from pipelex.client.api_serializer import ApiSerializer -from pipelex.client.protocol import COMPACT_MEMORY_KEY, CompactMemory, PipelineRequest +from pipelex.client.protocol import COMPACT_MEMORY_KEY, PipelineRequest from pipelex.core.pipe_run_params import PipeOutputMultiplicity -from pipelex.core.stuff_factory import StuffFactory from pipelex.core.working_memory import WorkingMemory -from pipelex.core.working_memory_factory import WorkingMemoryFactory class PipelineRequestFactory: @@ -30,48 +28,16 @@ def make_from_working_memory( Returns: PipelineRequest with the working memory serialized to reduced format """ - compact_memory = None - if working_memory is not None: - compact_memory = ApiSerializer.serialize_working_memory_for_api(working_memory) return PipelineRequest( - compact_memory=compact_memory, + input_memory=ApiSerializer.serialize_working_memory_for_api(working_memory), output_name=output_name, output_multiplicity=output_multiplicity, dynamic_output_concept_code=dynamic_output_concept_code, ) @staticmethod - def make_working_memory_from_reduced(compact_memory: Optional[CompactMemory]) -> WorkingMemory: - """ - Create a WorkingMemory from a reduced memory dictionary. - - Args: - compact_memory: Dictionary in the format from API - - Returns: - WorkingMemory object reconstructed from the reduced format - """ - working_memory = WorkingMemoryFactory.make_empty() - if compact_memory is None: - return working_memory - - for stuff_key, stuff_data in compact_memory.items(): - concept_code = stuff_data.get("concept_code", "") - content_value = stuff_data.get("content", {}) - - # Use API serializer to create content - content = ApiSerializer.make_stuff_content_from_api_data(concept_code=concept_code, value=content_value) - - # Create stuff directly - stuff = StuffFactory.make_stuff(concept_str=concept_code, name=stuff_key, content=content) - - working_memory.add_new_stuff(name=stuff_key, stuff=stuff) - - return working_memory - - @staticmethod - def make_request_from_body(request_body: Dict[str, Any]) -> PipelineRequest: + def make_from_body(request_body: Dict[str, Any]) -> PipelineRequest: """ Create a PipelineRequest from raw request body dictionary. @@ -82,7 +48,7 @@ def make_request_from_body(request_body: Dict[str, Any]) -> PipelineRequest: PipelineRequest object with dictionary working_memory """ return PipelineRequest( - compact_memory=request_body.get(COMPACT_MEMORY_KEY), + input_memory=request_body.get(COMPACT_MEMORY_KEY), output_name=request_body.get("output_name"), output_multiplicity=request_body.get("output_multiplicity"), dynamic_output_concept_code=request_body.get("dynamic_output_concept_code"), diff --git a/pipelex/client/pipeline_response_factory.py b/pipelex/client/pipeline_response_factory.py index a21a3b499..5014fc153 100644 --- a/pipelex/client/pipeline_response_factory.py +++ b/pipelex/client/pipeline_response_factory.py @@ -35,16 +35,16 @@ def make_from_pipe_output( Returns: PipelineResponse with the pipe output serialized to reduced format """ - reduced_output = None + compact_output = None if pipe_output is not None: - reduced_output = ApiSerializer.serialize_pipe_output_for_api(pipe_output=pipe_output) + compact_output = ApiSerializer.serialize_pipe_output_for_api(pipe_output=pipe_output) return PipelineResponse( pipeline_run_id=pipeline_run_id, created_at=created_at, pipeline_state=pipeline_state, finished_at=finished_at, - pipe_output=reduced_output, + pipe_output=compact_output, status=status, message=message, error=error, diff --git a/pipelex/client/protocol.py b/pipelex/client/protocol.py index 9531b0aa5..54ef7150b 100644 --- a/pipelex/client/protocol.py +++ b/pipelex/client/protocol.py @@ -45,13 +45,13 @@ class PipelineRequest(BaseModel): Request for executing a pipeline. Attributes: - compact_memory (Optional[CompactMemory]): In the format of WorkingMemory.to_compact_memory() + input_memory (Optional[CompactMemory]): In the format of WorkingMemory.to_compact_memory() output_name (Optional[str]): Name of the output slot to write to output_multiplicity (Optional[PipeOutputMultiplicity]): Output multiplicity setting dynamic_output_concept_code (Optional[str]): Override for the dynamic output concept code """ - compact_memory: Optional[CompactMemory] = None + input_memory: Optional[CompactMemory] = None output_name: Optional[str] = None output_multiplicity: Optional[PipeOutputMultiplicity] = None dynamic_output_concept_code: Optional[str] = None @@ -70,7 +70,7 @@ class PipelineResponse(ApiResponse): Example of pipe_output: "pipe_output": { - "compact_memory": { + "input_memory": { "text": { "concept_code": "native.Text", "content": "Some text........" @@ -125,6 +125,7 @@ async def execute_pipeline( self, pipe_code: str, working_memory: Optional[WorkingMemory] = None, + input_memory: Optional[CompactMemory] = None, output_name: Optional[str] = None, output_multiplicity: Optional[PipeOutputMultiplicity] = None, dynamic_output_concept_code: Optional[str] = None, @@ -135,6 +136,7 @@ async def execute_pipeline( Args: pipe_code (str): The code identifying the pipeline to execute working_memory (Optional[WorkingMemory]): Memory context passed to the pipeline + input_memory (Optional[CompactMemory]): Input memory passed to the pipeline output_name (Optional[str]): Target output slot name output_multiplicity (Optional[PipeOutputMultiplicity]): Output multiplicity setting dynamic_output_concept_code (Optional[str]): Override for dynamic output concept @@ -152,6 +154,7 @@ async def start_pipeline( self, pipe_code: str, working_memory: Optional[WorkingMemory] = None, + input_memory: Optional[CompactMemory] = None, output_name: Optional[str] = None, output_multiplicity: Optional[PipeOutputMultiplicity] = None, dynamic_output_concept_code: Optional[str] = None, @@ -162,6 +165,7 @@ async def start_pipeline( Args: pipe_code (str): The code identifying the pipeline to execute working_memory (Optional[WorkingMemory]): Memory context passed to the pipeline + input_memory (Optional[CompactMemory]): Input memory passed to the pipeline output_name (Optional[str]): Target output slot name output_multiplicity (Optional[PipeOutputMultiplicity]): Output multiplicity setting dynamic_output_concept_code (Optional[str]): Override for dynamic output concept diff --git a/pipelex/core/working_memory_factory.py b/pipelex/core/working_memory_factory.py index ec2befcae..1ca389732 100644 --- a/pipelex/core/working_memory_factory.py +++ b/pipelex/core/working_memory_factory.py @@ -5,10 +5,11 @@ from pydantic import BaseModel from pipelex import log +from pipelex.client.protocol import CompactMemory from pipelex.core.concept_native import NativeConcept from pipelex.core.stuff import Stuff from pipelex.core.stuff_content import ImageContent, PDFContent, StuffContent, TextContent -from pipelex.core.stuff_factory import StuffBlueprint, StuffFactory +from pipelex.core.stuff_factory import StuffBlueprint, StuffContentFactory, StuffFactory from pipelex.core.working_memory import MAIN_STUFF_NAME, StuffDict, WorkingMemory from pipelex.exceptions import WorkingMemoryFactoryError from pipelex.tools.misc.json_utils import load_json_dict_from_path @@ -124,6 +125,31 @@ def make_from_memory_file(cls, memory_file_path: str) -> WorkingMemory: working_memory = WorkingMemory.model_validate(working_memory_dict) return working_memory + @classmethod + def make_from_compact_memory(cls, compact_memory: CompactMemory) -> WorkingMemory: + """ + Create a WorkingMemory from a compact memory dictionary. + + Args: + compact_memory: Dictionary in the format from API serialization + + Returns: + WorkingMemory object reconstructed from the compact format + """ + working_memory = cls.make_empty() + + for stuff_key, stuff_data in compact_memory.items(): + concept_code = stuff_data.get("concept_code", "") + content_value = stuff_data.get("content", {}) + + content = StuffContentFactory.make_stuffcontent_from_concept_code_with_fallback(concept_code=concept_code, value=content_value) + + stuff = StuffFactory.make_stuff(concept_str=concept_code, name=stuff_key, content=content) + + working_memory.add_new_stuff(name=stuff_key, stuff=stuff) + + return working_memory + @classmethod def make_for_dry_run(cls, needed_inputs: List[Tuple[str, str, Type[StuffContent]]]) -> "WorkingMemory": """ diff --git a/pipelex/exceptions.py b/pipelex/exceptions.py index d29f396f6..986534b85 100644 --- a/pipelex/exceptions.py +++ b/pipelex/exceptions.py @@ -264,3 +264,11 @@ class ApiSerializationError(Exception): """Exception raised when API serialization fails.""" pass + + +class StartPipelineException(Exception): + pass + + +class ExecutePipelineException(Exception): + pass diff --git a/pipelex/pipe_works/pipe_job_factory.py b/pipelex/pipe_works/pipe_job_factory.py index c9e2f8801..e1f351d85 100644 --- a/pipelex/pipe_works/pipe_job_factory.py +++ b/pipelex/pipe_works/pipe_job_factory.py @@ -4,6 +4,7 @@ from pipelex.core.pipe_run_params import PipeRunParams from pipelex.core.pipe_run_params_factory import PipeRunParamsFactory from pipelex.core.working_memory import WorkingMemory +from pipelex.core.working_memory_factory import WorkingMemoryFactory from pipelex.pipe_works.pipe_job import PipeJob from pipelex.pipeline.job_metadata import JobMetadata @@ -19,7 +20,7 @@ def make_pipe_job( output_name: Optional[str] = None, ) -> PipeJob: job_metadata = job_metadata or JobMetadata() - working_memory = working_memory or WorkingMemory() + working_memory = working_memory or WorkingMemoryFactory.make_empty() if not pipe_run_params: pipe_run_params = PipeRunParamsFactory.make_run_params() return PipeJob( diff --git a/pipelex/pipeline/execute.py b/pipelex/pipeline/execute.py index adc17e7f0..8bb4b5fa3 100644 --- a/pipelex/pipeline/execute.py +++ b/pipelex/pipeline/execute.py @@ -1,9 +1,12 @@ from typing import Optional +from pipelex.client.protocol import CompactMemory from pipelex.core.pipe_output import PipeOutput from pipelex.core.pipe_run_params import FORCE_DRY_RUN_MODE_ENV_KEY, PipeOutputMultiplicity, PipeRunMode from pipelex.core.pipe_run_params_factory import PipeRunParamsFactory from pipelex.core.working_memory import WorkingMemory +from pipelex.core.working_memory_factory import WorkingMemoryFactory +from pipelex.exceptions import ExecutePipelineException from pipelex.hub import get_pipe_router, get_pipeline_manager, get_report_delegate, get_required_pipe from pipelex.pipe_works.pipe_job_factory import PipeJobFactory from pipelex.pipeline.job_metadata import JobMetadata @@ -13,6 +16,7 @@ async def execute_pipeline( pipe_code: str, working_memory: Optional[WorkingMemory] = None, + input_memory: Optional[CompactMemory] = None, output_name: Optional[str] = None, output_multiplicity: Optional[PipeOutputMultiplicity] = None, dynamic_output_concept_code: Optional[str] = None, @@ -30,6 +34,8 @@ async def execute_pipeline( The code of the pipe to execute. working_memory: Optional ``WorkingMemory`` instance passed to the pipe. + input_memory: + Optional compact memory to pass to the pipe. output_name: Name of the output slot to write to. output_multiplicity: @@ -47,6 +53,13 @@ async def execute_pipeline( Tuple[PipeOutput, str] A tuple containing the pipe output and the pipeline run ID. """ + # Can be either working_memory or compact_memory, but not both + if working_memory and input_memory: + raise ExecutePipelineException(f"Cannot pass both working_memory and input_memory to `execute_pipeline` {pipe_code=}") + + if input_memory: + working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory) + if pipe_run_mode is None: if run_mode_from_env := get_optional_env(key=FORCE_DRY_RUN_MODE_ENV_KEY): pipe_run_mode = PipeRunMode(run_mode_from_env) diff --git a/pipelex/pipeline/start.py b/pipelex/pipeline/start.py index 2ff82a62c..85027c1e2 100644 --- a/pipelex/pipeline/start.py +++ b/pipelex/pipeline/start.py @@ -1,10 +1,13 @@ import asyncio from typing import Optional +from pipelex.client.protocol import CompactMemory from pipelex.core.pipe_output import PipeOutput from pipelex.core.pipe_run_params import PipeOutputMultiplicity, PipeRunMode from pipelex.core.pipe_run_params_factory import PipeRunParamsFactory from pipelex.core.working_memory import WorkingMemory +from pipelex.core.working_memory_factory import WorkingMemoryFactory +from pipelex.exceptions import StartPipelineException from pipelex.hub import get_pipe_router, get_pipeline_manager, get_report_delegate, get_required_pipe from pipelex.pipe_works.pipe_job_factory import PipeJobFactory from pipelex.pipeline.job_metadata import JobMetadata @@ -13,6 +16,7 @@ async def start_pipeline( pipe_code: str, working_memory: Optional[WorkingMemory] = None, + input_memory: Optional[CompactMemory] = None, output_name: Optional[str] = None, output_multiplicity: Optional[PipeOutputMultiplicity] = None, dynamic_output_concept_code: Optional[str] = None, @@ -31,6 +35,8 @@ async def start_pipeline( The code of the pipe to execute. working_memory: Optional ``WorkingMemory`` instance passed to the pipe. + input_memory: + Optional compact memory to pass to the pipe. output_name: Name of the output slot to write to. output_multiplicity: @@ -46,6 +52,12 @@ async def start_pipeline( can be awaited to get the pipe output. """ + if working_memory and input_memory: + raise StartPipelineException(f"Cannot pass both working_memory and input_memory to `start_pipeline` {pipe_code=}") + + if input_memory: + working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory) + pipeline = get_pipeline_manager().add_new_pipeline() pipeline_run_id = pipeline.pipeline_run_id get_report_delegate().open_registry(pipeline_run_id=pipeline_run_id) diff --git a/pyproject.toml b/pyproject.toml index 7b157939b..431f6ac36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "pipelex" -version = "0.6.0" +version = "0.6.1" description = "Pipelex is an open-source dev tool based on a simple declarative language that lets you define replicable, structured, composable LLM pipelines." authors = [{ name = "Evotis S.A.S.", email = "evotis@pipelex.com" }] maintainers = [{ name = "Pipelex staff", email = "oss@pipelex.com" }] diff --git a/tests/unit/pipelex/client/test_api_serialization.py b/tests/unit/pipelex/client/test_api_serialization.py index 5299c0843..504f6e3d0 100644 --- a/tests/unit/pipelex/client/test_api_serialization.py +++ b/tests/unit/pipelex/client/test_api_serialization.py @@ -2,17 +2,14 @@ from datetime import datetime from decimal import Decimal from enum import Enum -from pathlib import Path -from typing import Any, Dict, List, Optional, cast +from typing import Any, Dict, List, Optional import pytest from pydantic import BaseModel -from pipelex.client.api_serializer import ApiSerializationError, ApiSerializer -from pipelex.client.protocol import COMPACT_MEMORY_KEY +from pipelex.client.api_serializer import ApiSerializer from pipelex.core.concept_native import NativeConcept -from pipelex.core.pipe_output import PipeOutput -from pipelex.core.stuff_content import NumberContent, TextContent +from pipelex.core.stuff_content import NumberContent from pipelex.core.stuff_factory import StuffFactory from pipelex.core.working_memory import WorkingMemory from pipelex.core.working_memory_factory import WorkingMemoryFactory @@ -150,346 +147,3 @@ def test_serialize_number_content(self, number_content_memory: WorkingMemory): assert number_blueprint["concept_code"] == "native.Number" assert isinstance(number_blueprint["content"], dict) assert number_blueprint["content"]["number"] == 3.14159 - - def test_serialize_pipe_output(self, datetime_content_memory: WorkingMemory): - """Test that PipeOutput is properly serialized.""" - pipe_output = PipeOutput(working_memory=datetime_content_memory) - reduced_output = ApiSerializer.serialize_pipe_output_for_api(pipe_output) - - assert COMPACT_MEMORY_KEY in reduced_output - - # Should contain the same structure as working memory serialization - working_memory_data = reduced_output[COMPACT_MEMORY_KEY] - assert "project_meeting" in working_memory_data - - # Verify datetime formatting - content = working_memory_data["project_meeting"]["content"] - assert content["start_time"] == "2024-01-15T10:00:00" - - def test_make_stuff_content_from_api_data_text(self): - """Test creating StuffContent from API data for text.""" - content = ApiSerializer.make_stuff_content_from_api_data(concept_code=NativeConcept.TEXT.code, value="Test text content") - - assert isinstance(content, TextContent) - assert content.text == "Test text content" - - def test_make_stuff_content_from_api_data_datetime(self): - """Test creating StuffContent from API datetime data.""" - api_data = { - "event_name": "Test Event", - "start_time": "2024-01-15T10:00:00", - "end_time": "2024-01-15T11:30:00", - "created_at": "2024-01-01T09:00:00", - } - - content = ApiSerializer.make_stuff_content_from_api_data(concept_code="event.DateTimeEvent", value=api_data) - content = cast(DateTimeEvent, content) - assert content.event_name == "Test Event" - assert content.start_time == datetime(2024, 1, 15, 10, 0, 0) - assert content.end_time == datetime(2024, 1, 15, 11, 30, 0) - assert content.created_at == datetime(2024, 1, 1, 9, 0, 0) - - def test_make_stuff_content_from_api_data_error(self): - """Test error handling for invalid concept codes.""" - with pytest.raises(ApiSerializationError, match="Failed to create StuffContent"): - ApiSerializer.make_stuff_content_from_api_data(concept_code="invalid.ConceptCode", value={"some": "data"}) - - def test_make_stuff_content_from_api_data_text_concept_no_structure(self): - """Test creating StuffContent from API data for concept with no structure (should be TextContent).""" - # Test case for concept that has no structure - should be treated as TextContent - content = ApiSerializer.make_stuff_content_from_api_data(concept_code="answer.Question", value="What is the capital of France?") - - assert isinstance(content, TextContent) - assert content.text == "What is the capital of France?" - - def test_make_stuff_content_from_api_data_various_cases(self): - """Test make_stuff_content_from_api_data with various input cases.""" - - # Test 1: Native text concept - text_content = ApiSerializer.make_stuff_content_from_api_data(concept_code=NativeConcept.TEXT.code, value="Simple text") - assert isinstance(text_content, TextContent) - assert text_content.text == "Simple text" - - # Test 2: Concept with no structure should become TextContent - question_content = ApiSerializer.make_stuff_content_from_api_data(concept_code="answer.Question", value="What is 2+2?") - assert isinstance(question_content, TextContent) - assert question_content.text == "What is 2+2?" - - # Test 3: Number content (structured) - number_data = {"number": 42.0} - number_content = ApiSerializer.make_stuff_content_from_api_data(concept_code="native.Number", value=number_data) - assert number_content.__class__.__name__ == "NumberContent" - assert hasattr(number_content, "number") - assert number_content.number == 42.0 # type: ignore - - def test_datetime_format_consistency(self): - """Test that the datetime format is consistent.""" - test_datetime = datetime(2024, 12, 25, 15, 30, 45) - formatted = test_datetime.strftime(ApiSerializer.API_DATETIME_FORMAT) - - assert formatted == "2024-12-25T15:30:45" - - # Verify no microseconds or timezone info - assert "." not in formatted - assert "+" not in formatted - assert "Z" not in formatted - - # ===== COMPLEX TESTS ===== - - def test_serialize_complex_nested_pydantic_models(self): - """Test serialization with deeply nested Pydantic models.""" - # Create complex nested structure - task1 = ComplexTask( - task_id="TASK-001", - title="Implement Feature X", - priority=Priority.HIGH, - status=TaskStatus(is_complete=False, completion_date=None, notes=["Started implementation", "Need to review specs"]), - due_dates=[datetime(2024, 2, 15, 10, 0, 0), datetime(2024, 2, 28, 17, 0, 0)], - metadata={"tags": ["frontend", "ui"], "estimated_hours": 40.5, "assigned_team": "Alpha"}, - score=Decimal("8.75"), - ) - - task2 = ComplexTask( - task_id="TASK-002", - title="Bug Fix", - priority=Priority.MEDIUM, - status=TaskStatus(is_complete=True, completion_date=datetime(2024, 1, 20, 14, 30, 0), notes=["Fixed authentication issue"]), - due_dates=[datetime(2024, 1, 25, 12, 0, 0)], - metadata={"severity": "medium", "component": "auth"}, - score=Decimal("6.00"), - ) - - project = Project( - name="Q1 Development Sprint", - created_at=datetime(2024, 1, 1, 9, 0, 0), - tasks=[task1, task2], - settings={"auto_assign": True, "notification_hours": [9, 14, 18], "max_parallel_tasks": 5}, - ) - - # Create stuff content from the complex model - use a proper StructuredContent subclass - from pipelex.core.stuff_content import StructuredContent - - class ProjectContent(StructuredContent): - name: str - created_at: datetime - tasks: List[ComplexTask] - settings: Dict[str, Any] - - project_content = ProjectContent(name=project.name, created_at=project.created_at, tasks=project.tasks, settings=project.settings) - - stuff = StuffFactory.make_stuff(concept_str="project.Complex", name="complex_project", content=project_content) - memory = WorkingMemoryFactory.make_from_single_stuff(stuff=stuff) - - # Test serialization - compact_memory = ApiSerializer.serialize_working_memory_for_api(memory) - - # Verify JSON serializable - this is the key test! - json_string = json.dumps(compact_memory) - roundtrip = json.loads(json_string) - assert roundtrip == compact_memory - - # Verify complex structure is preserved - content = compact_memory["complex_project"]["content"] - assert content["name"] == "Q1 Development Sprint" - assert len(content["tasks"]) == 2 - - # Verify nested datetime formatting - assert content["created_at"] == "2024-01-01T09:00:00" - assert content["tasks"][0]["due_dates"][0] == "2024-02-15T10:00:00" - assert content["tasks"][1]["status"]["completion_date"] == "2024-01-20T14:30:00" - - # Verify enums are converted to values - assert content["tasks"][0]["priority"] == "high" # Priority.HIGH.value - assert content["tasks"][1]["priority"] == "medium" # Priority.MEDIUM.value - - # Verify Decimal handling (converted to float) - assert content["tasks"][0]["score"] == 8.75 # float(Decimal("8.75")) - assert content["tasks"][1]["score"] == 6.0 # float(Decimal("6.00")) - - def test_serialize_lists_of_datetimes(self): - """Test serialization of lists containing datetime objects.""" - from pipelex.core.stuff_content import StructuredContent - - class ScheduleContent(StructuredContent): - meeting_times: List[datetime] - deadlines: Dict[str, datetime] - - schedule = ScheduleContent( - meeting_times=[datetime(2024, 1, 15, 10, 0, 0), datetime(2024, 1, 16, 14, 30, 0), datetime(2024, 1, 17, 9, 15, 0)], - deadlines={"milestone_1": datetime(2024, 2, 1, 23, 59, 59), "milestone_2": datetime(2024, 3, 15, 17, 0, 0)}, - ) - - stuff = StuffFactory.make_stuff(concept_str="schedule.Complex", name="schedule", content=schedule) - memory = WorkingMemoryFactory.make_from_single_stuff(stuff=stuff) - - compact_memory = ApiSerializer.serialize_working_memory_for_api(memory) - - # Verify JSON serializable - json_string = json.dumps(compact_memory) - roundtrip = json.loads(json_string) - assert roundtrip == compact_memory - - content = compact_memory["schedule"]["content"] - - # Verify list of datetimes - assert len(content["meeting_times"]) == 3 - assert content["meeting_times"][0] == "2024-01-15T10:00:00" - assert content["meeting_times"][1] == "2024-01-16T14:30:00" - assert content["meeting_times"][2] == "2024-01-17T09:15:00" - - # Verify dict of datetimes - assert content["deadlines"]["milestone_1"] == "2024-02-01T23:59:59" - assert content["deadlines"]["milestone_2"] == "2024-03-15T17:00:00" - - def test_serialize_with_none_values(self): - """Test serialization with None/Optional values.""" - from pipelex.core.stuff_content import StructuredContent - - class OptionalContent(StructuredContent): - required_field: str - optional_datetime: Optional[datetime] - optional_list: Optional[List[str]] - nullable_dict: Optional[Dict[str, Any]] - - content = OptionalContent(required_field="test", optional_datetime=None, optional_list=None, nullable_dict=None) - - stuff = StuffFactory.make_stuff(concept_str="optional.Test", name="optional_test", content=content) - memory = WorkingMemoryFactory.make_from_single_stuff(stuff=stuff) - - compact_memory = ApiSerializer.serialize_working_memory_for_api(memory) - - # Verify JSON serializable - json_string = json.dumps(compact_memory) - roundtrip = json.loads(json_string) - assert roundtrip == compact_memory - - content_data = compact_memory["optional_test"]["content"] - assert content_data["required_field"] == "test" - assert content_data["optional_datetime"] is None - assert content_data["optional_list"] is None - assert content_data["nullable_dict"] is None - - def test_serialize_deeply_nested_structure(self): - """Test serialization with deeply nested data structures.""" - from pipelex.core.stuff_content import StructuredContent - - class DeepContent(StructuredContent): - level1: Dict[str, Any] - - deep_structure = { - "level2": { - "level3": { - "level4": { - "timestamps": [datetime(2024, 1, 1, 12, 0, 0), datetime(2024, 1, 2, 13, 30, 0)], - "metadata": {"created": datetime(2024, 1, 1, 10, 0, 0), "modified": datetime(2024, 1, 3, 15, 45, 0)}, - } - } - } - } - - content = DeepContent(level1=deep_structure) - - stuff = StuffFactory.make_stuff(concept_str="deep.Nested", name="deep_test", content=content) - memory = WorkingMemoryFactory.make_from_single_stuff(stuff=stuff) - - compact_memory = ApiSerializer.serialize_working_memory_for_api(memory) - - # Verify JSON serializable - json_string = json.dumps(compact_memory) - roundtrip = json.loads(json_string) - assert roundtrip == compact_memory - - # Navigate to deep nested datetime - content_data = compact_memory["deep_test"]["content"] - level4 = content_data["level1"]["level2"]["level3"]["level4"] - - # Verify deep datetime formatting - assert level4["timestamps"][0] == "2024-01-01T12:00:00" - assert level4["timestamps"][1] == "2024-01-02T13:30:00" - assert level4["metadata"]["created"] == "2024-01-01T10:00:00" - assert level4["metadata"]["modified"] == "2024-01-03T15:45:00" - - def test_serialize_with_class_module_fields_removal(self): - """Test that __class__ and __module__ fields are properly removed.""" - from pipelex.core.stuff_content import StructuredContent - - class TestContent(StructuredContent): - data: Dict[str, Any] - - # Create content with intentional __class__ and __module__ fields - test_data = { - "normal_field": "value", - "__class__": "ShouldBeRemoved", - "__module__": "should.be.removed", - "nested": {"field": "nested_value", "__class__": "NestedShouldBeRemoved", "__module__": "nested.should.be.removed"}, - "list_with_class": [{"item": "value1", "__class__": "ItemClass"}, {"item": "value2", "__module__": "item.module"}], - } - - content = TestContent(data=test_data) - - stuff = StuffFactory.make_stuff(concept_str="test.ClassModule", name="class_test", content=content) - memory = WorkingMemoryFactory.make_from_single_stuff(stuff=stuff) - - compact_memory = ApiSerializer.serialize_working_memory_for_api(memory) - - content_data = compact_memory["class_test"]["content"] - - # Verify top-level __class__ and __module__ removal - assert "__class__" not in content_data - assert "__module__" not in content_data - assert content_data["data"]["normal_field"] == "value" - - # Verify nested __class__ and __module__ removal - assert "__class__" not in content_data["data"] - assert "__module__" not in content_data["data"] - assert "__class__" not in content_data["data"]["nested"] - assert "__module__" not in content_data["data"]["nested"] - assert content_data["data"]["nested"]["field"] == "nested_value" - - # Verify list item __class__ and __module__ removal - assert "__class__" not in content_data["data"]["list_with_class"][0] - assert "__module__" not in content_data["data"]["list_with_class"][1] - assert content_data["data"]["list_with_class"][0]["item"] == "value1" - assert content_data["data"]["list_with_class"][1]["item"] == "value2" - - def test_serialize_with_path_objects(self): - """Test serialization with pathlib.Path objects.""" - from pipelex.core.stuff_content import StructuredContent - - class PathContent(StructuredContent): - file_path: Path - directory_path: Path - nested_paths: Dict[str, Path] - path_list: List[Path] - - content = PathContent( - file_path=Path("/home/user/documents/file.txt"), - directory_path=Path("/var/log/"), - nested_paths={"config": Path("/etc/config.yaml"), "temp": Path("/tmp/temp.log")}, - path_list=[Path("/usr/bin/python"), Path("/opt/app/main.py")], - ) - - stuff = StuffFactory.make_stuff(concept_str="path.Test", name="path_test", content=content) - memory = WorkingMemoryFactory.make_from_single_stuff(stuff=stuff) - - compact_memory = ApiSerializer.serialize_working_memory_for_api(memory) - - # Verify JSON serializable - this is the key test! - json_string = json.dumps(compact_memory) - roundtrip = json.loads(json_string) - assert roundtrip == compact_memory - - # Verify Path objects were converted to strings - content_data = compact_memory["path_test"]["content"] - - # Check that paths are now strings - assert content_data["file_path"] == "/home/user/documents/file.txt" - assert content_data["directory_path"] == "/var/log" - - # Verify nested paths in dict - assert content_data["nested_paths"]["config"] == "/etc/config.yaml" - assert content_data["nested_paths"]["temp"] == "/tmp/temp.log" - - # Verify paths in list - assert content_data["path_list"][0] == "/usr/bin/python" - assert content_data["path_list"][1] == "/opt/app/main.py" diff --git a/tests/unit/pipelex/core/test_working_memory_factory.py b/tests/unit/pipelex/core/test_working_memory_factory.py new file mode 100644 index 000000000..cde6aa5e6 --- /dev/null +++ b/tests/unit/pipelex/core/test_working_memory_factory.py @@ -0,0 +1,174 @@ +from pipelex.client.protocol import CompactMemory +from pipelex.core.concept_native import NativeConcept +from pipelex.core.stuff_content import ImageContent, PageContent, TextAndImagesContent, TextContent +from pipelex.core.working_memory_factory import WorkingMemoryFactory + + +class TestWorkingMemoryFactory: + def test_make_from_compact_memory_with_text_content(self): + """Test deserialization of compact memory with text content.""" + compact_memory: CompactMemory = { + "text_item": { + "concept_code": NativeConcept.TEXT.code, + "content": "Hello, world!", + } + } + + working_memory = WorkingMemoryFactory.make_from_compact_memory(compact_memory) + + assert working_memory is not None + assert "text_item" in working_memory.root + + stuff = working_memory.root["text_item"] + assert stuff.concept_code == NativeConcept.TEXT.code + assert isinstance(stuff.content, TextContent) + assert stuff.content.text == "Hello, world!" + + def test_make_from_compact_memory_with_structured_content(self): + """Test deserialization of compact memory with structured content.""" + compact_memory: CompactMemory = { + "structured_item": { + "concept_code": "some.CustomConcept", + "content": "This is fallback text content since CustomConcept doesn't exist", + } + } + + working_memory = WorkingMemoryFactory.make_from_compact_memory(compact_memory) + + assert working_memory is not None + assert "structured_item" in working_memory.root + + stuff = working_memory.root["structured_item"] + assert stuff.concept_code == "some.CustomConcept" + assert stuff.content is not None + assert isinstance(stuff.content, TextContent) # Falls back to TextContent + + def test_make_from_compact_memory_with_complex_nested_content(self): + """Test deserialization of compact memory with complex nested structured content.""" + compact_memory: CompactMemory = { + "complex_page": { + "concept_code": NativeConcept.PAGE.code, + "content": { + "text_and_images": { + "text": { + "text": "This is a complex document page with multiple images and rich text content. " + "It demonstrates nested structured content handling." + }, + "images": [ + { + "url": "mock_url", + "caption": "First image showing data visualization", + "source_prompt": "Generate a chart showing quarterly sales data", + }, + { + "url": ("data:image/png;base64,mock_base64"), + "caption": "Second image with base64 data", + "base_64": ("mock_base64"), + }, + {"url": "/local/path/diagram.png", "caption": "System architecture diagram"}, + ], + }, + "page_view": {"url": "mock_url", "caption": "Full page screenshot"}, + }, + } + } + + working_memory = WorkingMemoryFactory.make_from_compact_memory(compact_memory) + + assert working_memory is not None + assert "complex_page" in working_memory.root + + stuff = working_memory.root["complex_page"] + assert stuff.concept_code == NativeConcept.PAGE.code + assert isinstance(stuff.content, PageContent) + + # Verify text_and_images structure + page_content = stuff.content + assert isinstance(page_content.text_and_images, TextAndImagesContent) + + # Verify text content + text_content = page_content.text_and_images.text + assert text_content is not None + assert isinstance(text_content, TextContent) + assert "complex document page" in text_content.text + + # Verify images + images = page_content.text_and_images.images + assert images is not None + assert len(images) == 3 + + # Check first image + first_image = images[0] + assert isinstance(first_image, ImageContent) + assert first_image.url == "mock_url" + assert first_image.caption == "First image showing data visualization" + assert first_image.source_prompt == "Generate a chart showing quarterly sales data" + + # Check second image (with base64) + second_image = images[1] + assert isinstance(second_image, ImageContent) + expected_base64 = "mock_base64" + assert second_image.base_64 == expected_base64 + assert second_image.caption == "Second image with base64 data" + + # Check third image + third_image = images[2] + assert isinstance(third_image, ImageContent) + assert third_image.url == "/local/path/diagram.png" + assert third_image.caption == "System architecture diagram" + + # Verify page_view + page_view = page_content.page_view + assert page_view is not None + assert isinstance(page_view, ImageContent) + assert page_view.url == "mock_url" + assert page_view.caption == "Full page screenshot" + + def test_make_from_compact_memory_empty(self): + """Test deserialization of empty compact memory.""" + compact_memory: CompactMemory = {} + + working_memory = WorkingMemoryFactory.make_from_compact_memory(compact_memory) + + assert working_memory is not None + assert len(working_memory.root) == 0 + + def test_make_from_compact_memory_multiple_items(self): + """Test deserialization of compact memory with multiple items.""" + compact_memory: CompactMemory = { + "text1": { + "concept_code": NativeConcept.TEXT.code, + "content": "First text", + }, + "text2": { + "concept_code": NativeConcept.TEXT.code, + "content": "Second text", + }, + "structured": { + "concept_code": "custom.Concept", + "content": "Fallback text for custom concept", + }, + } + + working_memory = WorkingMemoryFactory.make_from_compact_memory(compact_memory) + + assert working_memory is not None + assert len(working_memory.root) == 3 + assert "text1" in working_memory.root + assert "text2" in working_memory.root + assert "structured" in working_memory.root + + # Verify text content + text1_stuff = working_memory.root["text1"] + assert isinstance(text1_stuff.content, TextContent) + assert text1_stuff.content.text == "First text" + + text2_stuff = working_memory.root["text2"] + assert isinstance(text2_stuff.content, TextContent) + assert text2_stuff.content.text == "Second text" + + # Verify structured content (falls back to TextContent) + structured_stuff = working_memory.root["structured"] + assert structured_stuff.concept_code == "custom.Concept" + assert isinstance(structured_stuff.content, TextContent) + assert structured_stuff.content.text == "Fallback text for custom concept" diff --git a/uv.lock b/uv.lock index 5f7317061..401aa9e90 100644 --- a/uv.lock +++ b/uv.lock @@ -1619,7 +1619,7 @@ wheels = [ [[package]] name = "pipelex" -version = "0.6.0" +version = "0.6.1" source = { editable = "." } dependencies = [ { name = "aiofiles" }, From bbe135f0c0f9ba6fe53c611cb13d7e357aed8116 Mon Sep 17 00:00:00 2001 From: thomashebrard Date: Wed, 16 Jul 2025 17:05:17 +0200 Subject: [PATCH 2/2] fix changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e14fb22b4..7cd580bd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ - Can execute pipelines with `input_memory`: It is a `CompactMemory: Dict[str, Dict[str, Any]]` -## [v0.6.0] - 2025-07-12 +## [v0.6.0] - 2025-07-15 ### Changed - **Enhanced `Pipelex.make()` method**: Complete overhaul of the initialization method with new path configuration options and robust validation: