Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## [v0.6.1] - 2025-07-16

- Can execute pipelines with `input_memory`: It is a `CompactMemory: Dict[str, Dict[str, Any]]`

## [v0.6.0] - 2025-07-15

### Changed
Expand Down
31 changes: 5 additions & 26 deletions pipelex/client/api_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
from decimal import Decimal
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, cast
from typing import Any, Dict, List, Optional, cast

from pipelex.client.protocol import CompactMemory
from pipelex.core.concept_native import NativeConcept
from pipelex.core.pipe_output import PipeOutput
from pipelex.core.stuff_content import StuffContent, TextContent
from pipelex.core.stuff_factory import StuffContentFactory
from pipelex.core.stuff_content import TextContent
from pipelex.core.working_memory import WorkingMemory
from pipelex.exceptions import ApiSerializationError


class ApiSerializer:
Expand All @@ -21,7 +19,7 @@ class ApiSerializer:
FIELDS_TO_SKIP = ("__class__", "__module__")

@classmethod
def serialize_working_memory_for_api(cls, working_memory: WorkingMemory) -> CompactMemory:
def serialize_working_memory_for_api(cls, working_memory: Optional[WorkingMemory] = None) -> CompactMemory:
"""
Convert WorkingMemory to API-ready format using kajson with proper datetime handling.

Expand All @@ -32,6 +30,8 @@ def serialize_working_memory_for_api(cls, working_memory: WorkingMemory) -> Comp
Dict ready for API transmission with datetime strings and no __class__/__module__
"""
compact_memory: CompactMemory = {}
if working_memory is None:
return compact_memory

for stuff_name, stuff in working_memory.root.items():
if stuff.concept_code == NativeConcept.TEXT.code:
Expand Down Expand Up @@ -101,24 +101,3 @@ def _clean_and_format_content(cls, content: Any) -> Any:
return str(content) # Convert Path to string representation
else:
return content

@classmethod
def make_stuff_content_from_api_data(cls, concept_code: str, value: Dict[str, Any] | str) -> StuffContent:
"""
Create StuffContent from API data using concept code.

Args:
concept_code: The concept code to determine the content type
value: The content value from API

Returns:
StuffContent instance

Raises:
ApiSerializationError: If concept cannot be resolved or content creation fails
"""
try:
return StuffContentFactory.make_stuffcontent_from_concept_code_with_fallback(concept_code=concept_code, value=value)

except Exception as exc:
raise ApiSerializationError(f"Failed to create StuffContent for concept '{concept_code}': {exc}") from exc
16 changes: 15 additions & 1 deletion pipelex/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

from pipelex.client.pipeline_request_factory import PipelineRequestFactory
from pipelex.client.pipeline_response_factory import PipelineResponseFactory
from pipelex.client.protocol import PipelexProtocol, PipelineResponse
from pipelex.client.protocol import CompactMemory, PipelexProtocol, PipelineResponse
from pipelex.core.pipe_run_params import PipeOutputMultiplicity
from pipelex.core.working_memory import WorkingMemory
from pipelex.core.working_memory_factory import WorkingMemoryFactory
from pipelex.exceptions import ClientAuthenticationError
from pipelex.tools.environment import get_required_env

Expand Down Expand Up @@ -75,10 +76,16 @@ async def execute_pipeline(
self,
pipe_code: str,
working_memory: Optional[WorkingMemory] = None,
input_memory: Optional[CompactMemory] = None,
output_name: Optional[str] = None,
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
dynamic_output_concept_code: Optional[str] = None,
) -> PipelineResponse:
if working_memory and input_memory:
raise ValueError(f"working_memory and input_memory cannot be provided together to the API execute_pipeline {pipe_code=}")

if input_memory is not None:
working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory)
pipeline_request = PipelineRequestFactory.make_from_working_memory(
working_memory=working_memory,
output_name=output_name,
Expand All @@ -93,10 +100,17 @@ async def start_pipeline(
self,
pipe_code: str,
working_memory: Optional[WorkingMemory] = None,
input_memory: Optional[CompactMemory] = None,
output_name: Optional[str] = None,
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
dynamic_output_concept_code: Optional[str] = None,
) -> PipelineResponse:
if working_memory and input_memory:
raise ValueError(f"working_memory and input_memory cannot be provided together to the API start_pipeline {pipe_code=}")

if input_memory is not None:
working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory)

pipeline_request = PipelineRequestFactory.make_from_working_memory(
working_memory=working_memory,
output_name=output_name,
Expand Down
42 changes: 4 additions & 38 deletions pipelex/client/pipeline_request_factory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import Any, Dict, Optional

from pipelex.client.api_serializer import ApiSerializer
from pipelex.client.protocol import COMPACT_MEMORY_KEY, CompactMemory, PipelineRequest
from pipelex.client.protocol import COMPACT_MEMORY_KEY, PipelineRequest
from pipelex.core.pipe_run_params import PipeOutputMultiplicity
from pipelex.core.stuff_factory import StuffFactory
from pipelex.core.working_memory import WorkingMemory
from pipelex.core.working_memory_factory import WorkingMemoryFactory


class PipelineRequestFactory:
Expand All @@ -30,48 +28,16 @@ def make_from_working_memory(
Returns:
PipelineRequest with the working memory serialized to reduced format
"""
compact_memory = None
if working_memory is not None:
compact_memory = ApiSerializer.serialize_working_memory_for_api(working_memory)

return PipelineRequest(
compact_memory=compact_memory,
input_memory=ApiSerializer.serialize_working_memory_for_api(working_memory),
output_name=output_name,
output_multiplicity=output_multiplicity,
dynamic_output_concept_code=dynamic_output_concept_code,
)

@staticmethod
def make_working_memory_from_reduced(compact_memory: Optional[CompactMemory]) -> WorkingMemory:
"""
Create a WorkingMemory from a reduced memory dictionary.

Args:
compact_memory: Dictionary in the format from API

Returns:
WorkingMemory object reconstructed from the reduced format
"""
working_memory = WorkingMemoryFactory.make_empty()
if compact_memory is None:
return working_memory

for stuff_key, stuff_data in compact_memory.items():
concept_code = stuff_data.get("concept_code", "")
content_value = stuff_data.get("content", {})

# Use API serializer to create content
content = ApiSerializer.make_stuff_content_from_api_data(concept_code=concept_code, value=content_value)

# Create stuff directly
stuff = StuffFactory.make_stuff(concept_str=concept_code, name=stuff_key, content=content)

working_memory.add_new_stuff(name=stuff_key, stuff=stuff)

return working_memory

@staticmethod
def make_request_from_body(request_body: Dict[str, Any]) -> PipelineRequest:
def make_from_body(request_body: Dict[str, Any]) -> PipelineRequest:
"""
Create a PipelineRequest from raw request body dictionary.

Expand All @@ -82,7 +48,7 @@ def make_request_from_body(request_body: Dict[str, Any]) -> PipelineRequest:
PipelineRequest object with dictionary working_memory
"""
return PipelineRequest(
compact_memory=request_body.get(COMPACT_MEMORY_KEY),
input_memory=request_body.get(COMPACT_MEMORY_KEY),
output_name=request_body.get("output_name"),
output_multiplicity=request_body.get("output_multiplicity"),
dynamic_output_concept_code=request_body.get("dynamic_output_concept_code"),
Expand Down
6 changes: 3 additions & 3 deletions pipelex/client/pipeline_response_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ def make_from_pipe_output(
Returns:
PipelineResponse with the pipe output serialized to reduced format
"""
reduced_output = None
compact_output = None
if pipe_output is not None:
reduced_output = ApiSerializer.serialize_pipe_output_for_api(pipe_output=pipe_output)
compact_output = ApiSerializer.serialize_pipe_output_for_api(pipe_output=pipe_output)

return PipelineResponse(
pipeline_run_id=pipeline_run_id,
created_at=created_at,
pipeline_state=pipeline_state,
finished_at=finished_at,
pipe_output=reduced_output,
pipe_output=compact_output,
status=status,
message=message,
error=error,
Expand Down
10 changes: 7 additions & 3 deletions pipelex/client/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ class PipelineRequest(BaseModel):
Request for executing a pipeline.

Attributes:
compact_memory (Optional[CompactMemory]): In the format of WorkingMemory.to_compact_memory()
input_memory (Optional[CompactMemory]): In the format of WorkingMemory.to_compact_memory()
output_name (Optional[str]): Name of the output slot to write to
output_multiplicity (Optional[PipeOutputMultiplicity]): Output multiplicity setting
dynamic_output_concept_code (Optional[str]): Override for the dynamic output concept code
"""

compact_memory: Optional[CompactMemory] = None
input_memory: Optional[CompactMemory] = None
output_name: Optional[str] = None
output_multiplicity: Optional[PipeOutputMultiplicity] = None
dynamic_output_concept_code: Optional[str] = None
Expand All @@ -70,7 +70,7 @@ class PipelineResponse(ApiResponse):

Example of pipe_output:
"pipe_output": {
"compact_memory": {
"input_memory": {
"text": {
"concept_code": "native.Text",
"content": "Some text........"
Expand Down Expand Up @@ -125,6 +125,7 @@ async def execute_pipeline(
self,
pipe_code: str,
working_memory: Optional[WorkingMemory] = None,
input_memory: Optional[CompactMemory] = None,
output_name: Optional[str] = None,
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
dynamic_output_concept_code: Optional[str] = None,
Expand All @@ -135,6 +136,7 @@ async def execute_pipeline(
Args:
pipe_code (str): The code identifying the pipeline to execute
working_memory (Optional[WorkingMemory]): Memory context passed to the pipeline
input_memory (Optional[CompactMemory]): Input memory passed to the pipeline
output_name (Optional[str]): Target output slot name
output_multiplicity (Optional[PipeOutputMultiplicity]): Output multiplicity setting
dynamic_output_concept_code (Optional[str]): Override for dynamic output concept
Expand All @@ -152,6 +154,7 @@ async def start_pipeline(
self,
pipe_code: str,
working_memory: Optional[WorkingMemory] = None,
input_memory: Optional[CompactMemory] = None,
output_name: Optional[str] = None,
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
dynamic_output_concept_code: Optional[str] = None,
Expand All @@ -162,6 +165,7 @@ async def start_pipeline(
Args:
pipe_code (str): The code identifying the pipeline to execute
working_memory (Optional[WorkingMemory]): Memory context passed to the pipeline
input_memory (Optional[CompactMemory]): Input memory passed to the pipeline
output_name (Optional[str]): Target output slot name
output_multiplicity (Optional[PipeOutputMultiplicity]): Output multiplicity setting
dynamic_output_concept_code (Optional[str]): Override for dynamic output concept
Expand Down
28 changes: 27 additions & 1 deletion pipelex/core/working_memory_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from pydantic import BaseModel

from pipelex import log
from pipelex.client.protocol import CompactMemory
from pipelex.core.concept_native import NativeConcept
from pipelex.core.stuff import Stuff
from pipelex.core.stuff_content import ImageContent, PDFContent, StuffContent, TextContent
from pipelex.core.stuff_factory import StuffBlueprint, StuffFactory
from pipelex.core.stuff_factory import StuffBlueprint, StuffContentFactory, StuffFactory
from pipelex.core.working_memory import MAIN_STUFF_NAME, StuffDict, WorkingMemory
from pipelex.exceptions import WorkingMemoryFactoryError
from pipelex.tools.misc.json_utils import load_json_dict_from_path
Expand Down Expand Up @@ -124,6 +125,31 @@ def make_from_memory_file(cls, memory_file_path: str) -> WorkingMemory:
working_memory = WorkingMemory.model_validate(working_memory_dict)
return working_memory

@classmethod
def make_from_compact_memory(cls, compact_memory: CompactMemory) -> WorkingMemory:
"""
Create a WorkingMemory from a compact memory dictionary.

Args:
compact_memory: Dictionary in the format from API serialization

Returns:
WorkingMemory object reconstructed from the compact format
"""
working_memory = cls.make_empty()

for stuff_key, stuff_data in compact_memory.items():
concept_code = stuff_data.get("concept_code", "")
content_value = stuff_data.get("content", {})

content = StuffContentFactory.make_stuffcontent_from_concept_code_with_fallback(concept_code=concept_code, value=content_value)

stuff = StuffFactory.make_stuff(concept_str=concept_code, name=stuff_key, content=content)

working_memory.add_new_stuff(name=stuff_key, stuff=stuff)

return working_memory

@classmethod
def make_for_dry_run(cls, needed_inputs: List[Tuple[str, str, Type[StuffContent]]]) -> "WorkingMemory":
"""
Expand Down
8 changes: 8 additions & 0 deletions pipelex/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,11 @@ class ApiSerializationError(Exception):
"""Exception raised when API serialization fails."""

pass


class StartPipelineException(Exception):
pass


class ExecutePipelineException(Exception):
pass
3 changes: 2 additions & 1 deletion pipelex/pipe_works/pipe_job_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pipelex.core.pipe_run_params import PipeRunParams
from pipelex.core.pipe_run_params_factory import PipeRunParamsFactory
from pipelex.core.working_memory import WorkingMemory
from pipelex.core.working_memory_factory import WorkingMemoryFactory
from pipelex.pipe_works.pipe_job import PipeJob
from pipelex.pipeline.job_metadata import JobMetadata

Expand All @@ -19,7 +20,7 @@ def make_pipe_job(
output_name: Optional[str] = None,
) -> PipeJob:
job_metadata = job_metadata or JobMetadata()
working_memory = working_memory or WorkingMemory()
working_memory = working_memory or WorkingMemoryFactory.make_empty()
if not pipe_run_params:
pipe_run_params = PipeRunParamsFactory.make_run_params()
return PipeJob(
Expand Down
13 changes: 13 additions & 0 deletions pipelex/pipeline/execute.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Optional

from pipelex.client.protocol import CompactMemory
from pipelex.core.pipe_output import PipeOutput
from pipelex.core.pipe_run_params import FORCE_DRY_RUN_MODE_ENV_KEY, PipeOutputMultiplicity, PipeRunMode
from pipelex.core.pipe_run_params_factory import PipeRunParamsFactory
from pipelex.core.working_memory import WorkingMemory
from pipelex.core.working_memory_factory import WorkingMemoryFactory
from pipelex.exceptions import ExecutePipelineException
from pipelex.hub import get_pipe_router, get_pipeline_manager, get_report_delegate, get_required_pipe
from pipelex.pipe_works.pipe_job_factory import PipeJobFactory
from pipelex.pipeline.job_metadata import JobMetadata
Expand All @@ -13,6 +16,7 @@
async def execute_pipeline(
pipe_code: str,
working_memory: Optional[WorkingMemory] = None,
input_memory: Optional[CompactMemory] = None,
output_name: Optional[str] = None,
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
dynamic_output_concept_code: Optional[str] = None,
Expand All @@ -30,6 +34,8 @@ async def execute_pipeline(
The code of the pipe to execute.
working_memory:
Optional ``WorkingMemory`` instance passed to the pipe.
input_memory:
Optional compact memory to pass to the pipe.
output_name:
Name of the output slot to write to.
output_multiplicity:
Expand All @@ -47,6 +53,13 @@ async def execute_pipeline(
Tuple[PipeOutput, str]
A tuple containing the pipe output and the pipeline run ID.
"""
# Can be either working_memory or compact_memory, but not both
if working_memory and input_memory:
raise ExecutePipelineException(f"Cannot pass both working_memory and input_memory to `execute_pipeline` {pipe_code=}")

if input_memory:
working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory)

if pipe_run_mode is None:
if run_mode_from_env := get_optional_env(key=FORCE_DRY_RUN_MODE_ENV_KEY):
pipe_run_mode = PipeRunMode(run_mode_from_env)
Expand Down
Loading