Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added Transport-agnostic-dynamo/comm.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added Transport-agnostic-dynamo/dyn_entities.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added Transport-agnostic-dynamo/dynamo-entity.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added Transport-agnostic-dynamo/generic_event_req.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added Transport-agnostic-dynamo/net_manager.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
370 changes: 370 additions & 0 deletions Transport-agnostic-dynamo/transport_agnostic_dynamo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,370 @@
# Dynamo runtime: Transport Agnostic Dynamo Pipelines
Status: Draft

Authors: [biswapanda](https://github.com/biswapanda)

Category: Architecture

Reviewers: [Ryan Olson](https://github.com/ryanolson), [Neelay Shah](https://github.com/nnshah1), [Graham King](https://github.com/grahamking), [Maksim Khadkevich](https://github.com/hutm), [Rudy Pei](https://github.com/PeaBrane), [Kyle kranen](https://github.com/kkranen)


## Overview

High level goal is to decouple the NATs (transport and object store) from the dynamo runtime.

- introduce abstractions for current NATs usages (e.g. event plane, request plane & object store, etc) which can be used to plug different implementations.

- deprecate NATs object store and reduce dependencies on NATs.

## Requirements
- deliver messages across dynamo instances with at least once delivery guarantee.
- switch between transports at runtime.
- support long term architecure goals for Dynamo GA

### Transport Agnostic API

Dynamo communication primitives needs to support:
- peer-to-peer (req/reply: request plane) and scoped broadcasts (event plane)
- communication regimes: single process, single node (multi process) and multi-node
- transport options: NATs, Raw TCP, ZMQ, HTTP SSE, GRPC, UCX active messaging
- cancelation support for request/reply pattern
- events does not require response back

### Deprecate NATs Object store usage
- Router snapshots are stored in NATs object store.
- Model files are stored in NATs object store.

### Long term architectural goal support:

- separation of Frontend (3-in-1 across in-process, same node or remote node)

- HTTP based endpoint for one off usage of a component (KV router, etc)

- batching/pipelining messages for Req/Responses:
- we can see a perf benefit by batching multiple requests together over a network round-trip.

- Simplify `dynamo namespace` usage and process heirarchy (namespace, component, etc)
- `dynamo namespace` is causing unnecessaary cognitive complexity for end-users.
- Support mapping to more meaningful Grove concepts like PodClique, PodCliqueSet, etc.

## Usage Patterns

More details in [NATs use cases](#nats-use-cases)

| Plane | Purpose / Protocol | Delivery Guarantee / Notes | Current NATs Usage Example |
|---------------|------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------|
| Request Plane | Protocol for request context, control messages, and request cancellation | At least once (with idempotent handlers); supports request/reply, cancellation, and timeouts | Service stats collection, direct requests |
| Event Plane | Delivery of KV events and other system events | At least once (preferred); events are idempotent, redelivered messages are ignored | KV router event publishing/consumption |
| Metrics | Collection and reporting of runtime/service metrics | At least once (metrics can be aggregated, duplicate-safe) | Service stats, health checks |
| Object Store | Ephemeral storage for model files and artifacts | At least once (object upload/download, may use JetStream Object Store) | Model file storage, file upload/download |

**Notes:**
- All planes should support pluggable transports (NATs, ZMQ, HTTP SSE, GRPC, UCX, etc).
- At least once delivery is preferred for reliability; consumers must be idempotent.
- Request Plane requires protocol for cancellation and context propagation.
- Object Store usage via NATs is being deprecated in favor of more direct or persistent solutions.


## Proposal
![alt text](./dyn_entities.png)

![alt text](./comm.png)

### Transport Agnostic API

![alt text](./generic_event_req.jpg)

Use unique identifiers to identify the target entity.

`DynamoEntity`: uniquely addessible compute unit (engine, instance, component), etc.
- dynamo application will use communication primitives: (publish, subscribe, request) will use Entity to route messages to the target in transport agnostic manner.
- An Opaque `u128` identifier that uniquely identifies a single instance or collection of instances (component, namespace, etc)
- each dynamo entity has a unique ID and can be used to communicate with other dynamo entities.
- can be used to identify the target and establish connection by `DynamoNetworkManager`.
- discovery service api can be used to find participating dynamo entities.

`DynamoNetworkManager`: manages communication between dynamo entities.
- manages listening ports and client connections to remote peers
- responsible for serializing and deserializing messages for different transports
- responsible for handling timeout error, retry and cancellation for request/reply pattern
- responsible for sending and receiving messages
- Handles remote peer connections: in-process, local host or remote host

![alt text](./net_manager.png)


High level `DynamoNetworkManager` API:

Following high level network manager interface will be available to dynamo application layer to be used by component authors.

```
publish(topic: &str, message: InputType, entity: &DynamoEntity) -> Result<(), Error>

subscribe(topic: &str,
handler: fn(message: InputType) -> Result<(), Error>,
entity: &DynamoEntity) -> Result<(), Error>


request(topic: &str, message: InputType, entity_id: u128) -> Future<Result<OutputType, Error>>
```

### PubSub interface:
Event plane will use pubsub interface to publish and subscribe to events.

#### topics
Topics are used to group events of same nature for example: "kv_events", "metrics" etc.

#### dynamo entity based filtering / scoping
Guiding principle: "flat is better than nested".
Inspired by Kubernetes experience (selectors and labels) and mongo document filter usage.

DynamoEntity can be used to as filtering criteria for events publishing and subscribing. This matching will be done at the network layer and is transparent to the application layer.

For example, an event published by publisher_entity will be delivered to subscriber_entity if the subscriber_entity's metadata matches (is a subset of) the publisher_entity's metadata.

- metadata is fetched from service discovery layer and wire protocol does not (de)serialize metadata.

- dynamo entity is used as a filter to identify the target entities to publish/subscribe so we can do this in absense of a dedicated broker service. This can be done in network manager layer.

- perf: matching logic can be cached in network manager layer to avoid re-matching for each message. Service discovery watch can be used to invalidate the cache when the metadata changes.


```
publisher_entity: DynamoEntity{
metadata: {"k1": "v1", "k2": "v2"}
}

// matches: {"k2": "v2"} is subset of publisher_entity's metadata {"k1": "v1", "k2": "v2"}
subscriber_entity: DynamoEntity{
metadata: {"k2": "v2"}
}

// matches: {"k1": "v1"} is subset of publisher_entity's metadata {"k1": "v1", "k2": "v2"}
subscriber_entity: DynamoEntity{
metadata: {"k1": "v1"}
}

// does not match
subscriber_entity: DynamoEntity{
metadata: {"k1": "v2"}
}
```

### DynamoEntity

#### Current problems: dynamo namespace and heirarchy based addressing scheme

- we can't support multiple models in a single dynamo namespace.
Currently we use `dynamo_namespace.component.endpoint` as addressing scheme to orchestrate communication (p2p/ pubsub) across pods. This can cause cross-model communication because of lack of scoping.

- current 3 level heirarchy (namespace, component, endpoint) is not capable of addressing pods with Grove.
We need extensible way to address specific pods within [grove heirarchy](https://github.com/NVIDIA/grove/blob/bcc412140323d5d781c2d16c9828befa2e965bb8/docs/assets/multinode-disaggregated.excalidraw.png)

- Identify specific instance or a collection of instances within a deployment based on a query / selector.


#### Solution

Improve addressing scheme for communication (p2p/ pubsub).

Use proven data model to identify and query entities:
- Kubernetes labels and selectors
- SQL WHERE clause and ORMs (SQLAlchemy, Django, etc)
- MongoDB document filter
- Inspired by inodes and (parent) ProcessID from Operating System to represent heirarchy.

DynamoEntity uniquely identifies a single instance or collection of instances (currently component, namespace, etc) across a deployment using a selector.

1. Each dynamo process registers itself with the discovery service and advertises its metadata (similar to kubernetes labels).

For example, a prefill worker in a deepseek deployment with model deepseek-r1-671b:

Corresponding json serialized dynamo entity:
```json
{
"id": "123",
"entity_type": "instance",
"name": "<prefill-pod-name>",
"metadata": {
"deployment_name": "deepseek",
"model_name": "deepseek-r1-671b",
"component": "prefill",
"role": "leader",
"rank": "0",
}
}
```

Router can query the discovery service to find the prefill leader workers.
Query `DynamoEntity` in serialized form:
```rust
// Create a dynamo entity to get all prefill leader workers
prefill_leaders = DynamoEntity::from_selector({
"deployment_name": "deepseek",
"model_name": "deepseek-r1-671b",
"component": "prefill",
"role": "leader",
})
```

Decode can publish KV events to All router workers by specifying the collection entity:
```rust
// Create a dynamo entity to get all router workers
routers = DynamoEntity::from_selector({
"deployment_name": "deepseek",
"model_name": "deepseek-r1-671b",
"component": "router",
})

// Publish KV events to All router workers
publish("kv_events", kv_block_removed_event, routers)

```
![alt text](./dynamo-entity.png)

Collection entity type are useful and can be mapped to [grove heirarchy](https://github.com/NVIDIA/grove/blob/bcc412140323d5d781c2d16c9828befa2e965bb8/docs/assets/multinode-disaggregated.excalidraw.png) like PodClique/PodCliqueSet/PodCliqueSet.

- current `dynamo_namespace`: maps to a PodCliqueSet
it's children are components (PodClique)

- current `component`: maps to a PodClique
children are instances (Pod)

```rust
pub struct DynamoEntity {
pub id: u128,
pub entity_type: EntityType,
pub name: Option<Arc<String>>, // (Optional) human-readable name
pub children: Vec<DynamoEntity>, // (Optional) children for collection
pub metadata: HashMap<String, String>, // (Optional) metadata for matching
}

impl DynamoEntity {
/// Create from name with automatic hashing
pub fn from_name(name: &str, entity_type: EntityType) -> Self {
Self {
id: hash_name(name),
name: Some(Arc::new(name.to_string())),
entity_type: entity_type,
children: Vec::new(),
metadata: HashMap::new(),
}
}
}

enum EntityType {
Instance,
Collection,
}
```

`EntityType`: single instance or collection of instances (currently component, namespace, etc)

`name`: String (optional)
- Human-readable identifiers for debugging and logging
- Configuration files (YAML/JSON)
- Command-line interfaces
- Logging and observability

### Object Store Interface

// Todo: add clean interface for object store

### Implementation
- Phase 1
* degraded feature set
* not use KV router if they want. Best effort
* nats
* No HA guarantees for router
* Operate without high availability w/ single router
- Phase 2
* explore transports
* durability
* exactly once delivery

## Guiding principles

### Generic Messaging Protocol
Decouple messaging protocol from the underlying transport like Raw TCP, ZMQ or (HTTP, GRPC, and UCX active message).

Phased approach: start with Nats, ZMQ and HTTP SSE.
Later, incrementally expand to support more advanced transports, ensuring that the protocol remains adaptable to requirements.

### Handshake and Closure Protocols:
Robust handshake and closure protocols, using sentinels and message headers to signal the end of stream or cancellation.
A common semantic for closing requests and handling errors, will be generalized across different transports.

### Multipart Message Structure
Use a multipart message structure, inspired by ZMQ's native multipart support, to encapsulate headers, body, and control signals (such as closure control signals or error notifications).

### Better Python-Rust Interoperability and Data class generation

Improve Python-Rust interoperability, focusing on auto-generating Python data classes from Rust structs using Pydantic.
This way message schemas are aligned and we can reduce manual coding and serialization errors.


## Additional notes

## NATs use cases

### 1. NatsQueue python binding
- **Location**: `lib/bindings/python/rust/llm/nats.rs` (`NatsQueue`)
- **Functionality**:
- Deprecated: We don't use `NatsQueue` python binding anymore. We use `NatsQueue` rust binding instead.
- We can remove the python binding and the associated tests to simplify the codebase.

### 2. JetStream-backed Queue/Event Bus
- **Location**: `lib/runtime/src/transports/nats.rs` (`NatsQueue`)
- **Functionality**:
- Stream creation per subject pattern `{stream_name}.*`
- Publisher-only, worker-group, and broadcast consumer modes
- Durable consumers with pull-based consumption
- Administrative operations (purge, consumer management)

### 3. Event Publishing for KV Router
- **Location**: `lib/llm/src/kv_router/publisher.rs`
- **Functionality**:
- Publishes KV cache events from ZMQ or direct sources
- Uses `EventPublisher` trait to send events

### 4. Event Consumption for KV Router
- **Location**: `lib/llm/src/kv_router/subscriber.rs`
- **Functionality**:
- Consumes `RouterEvent` messages via durable consumers
- Handles state snapshots and stream purging

### 5. Object Store (JetStream Object Store)
- **Location**: `lib/runtime/src/transports/nats.rs`
- **Functionality**:
- File upload/download operations
- Typed data serialization with bincode
- Bucket management and cleanup

### 6. Key-Value Store (JetStream KV)
- **Location**: `lib/runtime/src/storage/key_value_store/nats.rs`
- **Functionality**:
- Implements `KeyValueStore` trait
- CRUD operations with conflict resolution
- Watch streams for real-time updates

### 7. Request/Reply Pattern
- **Location**: `lib/runtime/src/transports/nats.rs`
- **Functionality**:
- Service stats collection via broadcast requests
- Each service responds once to stats queries

### 8 KVBM Nats usage (todo)


## Message Delivery Guarantees

### At least once delivery (preferred)
- No message loss is possible.
- Message is delivered at least once to the consumers
- consumers should be idempotent and be able to handle duplicate messages.

### Exactly once delivery
- needs stateful tracking of messages and ack/nack coordination to ensure exactly once delivery.

### At most once delivery
- Message loss is possible.
Loading