## 💡 Splitter Pattern Flow Summary: Purchase Order Processing

This implementation demonstrates the **Splitter** Enterprise Integration Pattern in the **Rustic AI** framework, specifically designed for modular and scalable purchase order processing.

---

### 📦 Scenario

A client sends a **`PurchaseOrderRequest`** containing multiple items. Each item needs to be processed independently — such as validation, inventory check, or fulfillment. To achieve this, the message is **split** into individual per-item messages, which are then routed downstream for further handling.

---

### 🧠 Key Concepts Demonstrated

#### ✅ Splitter Agent (`Purchase Order Splitter`)

* **Input:** `PurchaseOrderRequest` with list of items.
* **Behavior:** Splits the items into individual `ItemProcessingResult` messages.
* **Output:** Sends each split item to the topic `item_processing_results`.

#### 🔀 Routing and Traceability

* Each outgoing message maintains a full **message history** and **routing slip**, ensuring auditability and traceability.
* The `SplitAndSend` method logs:

  * Original message index
  * Origin topic (`purchase_orders`)
  * Destination topic (`item_processing_results`)
  * Agent and processor involved

#### 🔂 Message Breakdown

* Each item (e.g., `{id: item-001, quantity: 2}`) is turned into an individual message.
* These messages can now be routed independently — allowing parallel or asynchronous downstream processing.

---

### 🔎 Message Trace Example

#### Input Message:

```python
PurchaseOrderRequest(id="PO-12345", items=[{"id": "item-001", "quantity": 2}, {"id": "item-002", "quantity": 1}])
```

#### Splitter Output:

Two distinct `ItemProcessingResult` messages sent to `item_processing_results`:

```python
Message 0: {'id': 'item-001', 'quantity': 2}
Message 1: {'id': 'item-002', 'quantity': 1}
```

#### Trace:

```
Message at index 0:
  purchase_orders → [Purchase Order Splitter/SplitterAgent:split_and_send] → item_processing_results

Message at index 1:
  purchase_orders → [Purchase Order Splitter/SplitterAgent:split_and_send] → item_processing_results
```

---

### 🧩 EIP Pattern Context

#### Pattern Used: **Splitter**

#### 🔧 Components Involved:

* `SplitterAgent`: Implements the splitting logic
* `RoutingSlip`: Carries delivery metadata
* `ProcessEntry`: Captures routing history for observability
* `Topic-Based Dispatch`: Each split message is published to a specific topic (`item_processing_results`)

---

### 🔁 Why Use the Splitter Pattern?

This pattern is ideal for scenarios such as:

* Multi-item processing in procurement and logistics
* Handling composite tasks like batch uploads, multi-sensor data, or survey responses
* Distributing subtasks for concurrent processing

---

### ✅ Benefits

* **Scalable**: Each item can be processed in parallel.
* **Modular**: Logic separation between splitting and downstream processing.
* **Auditable**: Full trace of message lifecycle via routing slips and history.
* **Reusable**: Works with any `ListSplitter` + downstream agents or transformers (e.g., `FormatSelector`, `TransformerAgent`).

---

### 🧬 Pattern Summary

```
1 Purchase Order → N Item Tasks → Parallel Processing
```

This design ensures clean separation of responsibilities and sets the stage for further EIP integrations like **Content-Based Router**, **Aggregator**, or **Resequencer**.

In [1]:
from typing import Any, Dict, List, Optional
from pydantic import BaseModel

class PurchaseOrderRequest(BaseModel):
    order_id: str
    items: List[Dict[str, Any]]
    customer: str


class ItemOrderList(BaseModel):
    item1: Dict
    item2: Dict


class ItemProcessingResult(BaseModel):
    id: Optional[str]
    quantity: Optional[int]


In [2]:
# Build the scatter-gather guild with bootstrap for state management
import os

from rustic_ai.core.guild.builders import GuildBuilder
from rustic_ai.core.guild.metastore.database import Metastore

guild_builder = GuildBuilder.from_yaml_file("./007_splitter.yaml")

# Bootstrap the guild so we have GuildManager for state management
db = "sqlite:///splitter_demo.db"

if os.path.exists("splitter_demo.db"):
    os.remove("splitter_demo.db")

Metastore.initialize_engine(db)
Metastore.get_engine(db)
Metastore.create_db()

guild = guild_builder.bootstrap(metastore_database_url=db, organization_id="myorg")  # Use SQLite for simplicity





In [3]:
import time

from rustic_ai.core.agents.testutils.probe_agent import ProbeAgent
from rustic_ai.core.guild.builders import AgentBuilder

time.sleep(2)

# Create probe agent to monitor the entire flow
probe_spec = (
    AgentBuilder(ProbeAgent)
    .set_id("TestProbe")
    .set_name("Test Probe Agent")
    .set_description("Monitors the entire scatter-gather flow")
    .add_additional_topic("purchase_orders") # Initial request
    .add_additional_topic("item_processing_results")  # Final results
    .build_spec()
)

# Add probe agent to the bootstrapped guild
probe_agent: ProbeAgent = guild._add_local_agent(probe_spec)  # type: ignore

In [4]:
# Create test data for analysis
test_order = PurchaseOrderRequest(
    order_id="PO-12345",
    customer="ACME Corp",
    items=[{"id": "item-001", "quantity": 2}, {"id": "item-002", "quantity": 1}],
)


print(f"Sending PurchaseOrderRequest with ID: {test_order.order_id}")
print(f"Items: {test_order.items}")

# Send through probe agent to trigger the scatter-gather flow
probe_agent.publish_with_guild_route(topic="purchase_orders", payload=test_order)

Sending PurchaseOrderRequest with ID: PO-12345
Items: [{'id': 'item-001', 'quantity': 2}, {'id': 'item-002', 'quantity': 1}]


<rustic_ai.core.utils.gemstone_id.GemstoneID at 0x7f537c4ef6b0>

In [5]:
# Let's check the message history to see the complete flow
probe_agent.print_all_history()


For message at index 0 (9567363465551548416):
	(purchase_orders) -> [Purchase Order Splitter/SplitterAgent:split_and_send] -> (item_processing_results)

For message at index 1 (9567363465694154752):
	(purchase_orders) -> [Purchase Order Splitter/SplitterAgent:split_and_send] -> (item_processing_results)


In [6]:
probe_agent.get_messages()

[Message(sender=AgentTag(id='SplitterAgent', name='Purchase Order Splitter'), topics='item_processing_results', recipient_list=[], payload={'id': 'item-001', 'quantity': 2}, format='__main__.ItemProcessingResult', in_response_to=9567363465480249344, thread=[9567363465480249344], conversation_id=None, forward_header=None, routing_slip=RoutingSlip(steps=[RoutingRule(agent=AgentTag(id=None, name='Purchase Order Splitter'), agent_type=None, method_name=None, origin_filter=None, message_format='__main__.ItemProcessingResult', destination=RoutingDestination(topics='item_processing_results', recipient_list=[], priority=None), mark_forwarded=False, route_times=-2, transformer=None, agent_state_update=None, guild_state_update=None, process_status=None)]), message_history=[ProcessEntry(agent=AgentTag(id='SplitterAgent', name='Purchase Order Splitter'), origin=9567363465480249344, result=9567363465551548416, processor='split_and_send', from_topic='purchase_orders', to_topics=['item_processing_res