Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions lib/bindings/python/rust/engine.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use super::context::{Context, callable_accepts_kwarg};
use dynamo_runtime::logging::get_distributed_tracing_context;
use std::sync::Arc;

use anyhow::{Error, Result};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyModule};
use pyo3::{PyAny, PyErr};
use pyo3_async_runtimes::TaskLocals;
use pythonize::{depythonize, pythonize};
use std::sync::Arc;
pub use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tokio_util::sync::CancellationToken;

use dynamo_runtime::logging::get_distributed_tracing_context;
pub use dynamo_runtime::{
CancellationToken, Error, Result,
pipeline::{
AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, SingleIn,
async_trait,
},
pipeline::{AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, SingleIn},
protocols::annotated::Annotated,
};
pub use serde::{Deserialize, Serialize};

use super::context::{Context, callable_accepts_kwarg};

/// Add bingings from this crate to the provided module
pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
Expand Down Expand Up @@ -87,7 +87,7 @@ impl PythonAsyncEngine {
}
}

#[async_trait]
#[async_trait::async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error> for PythonAsyncEngine
where
Req: Data + Serialize,
Expand Down Expand Up @@ -141,7 +141,7 @@ enum ResponseProcessingError {
OffloadError(String),
}

#[async_trait]
#[async_trait::async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error>
for PythonServerStreamingEngine
where
Expand Down
2 changes: 1 addition & 1 deletion lib/bindings/python/rust/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

use std::sync::Arc;

use anyhow::{Error, Result, anyhow as error};
use pyo3::prelude::*;

use crate::{CancellationToken, engine::*, to_pyerr};

pub use dynamo_llm::endpoint_type::EndpointType;
pub use dynamo_llm::http::service::{error as http_error, service_v2};
pub use dynamo_runtime::{
Error, Result, error,
pipeline::{AsyncEngine, Data, ManyOut, SingleIn, async_trait},
protocols::annotated::Annotated,
};
Expand Down
10 changes: 5 additions & 5 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,20 +441,20 @@ impl DistributedRuntime {
// Try to get existing runtime first, create new Worker only if needed
// This allows multiple DistributedRuntime instances to share the same tokio runtime
let runtime = rs::Worker::runtime_from_existing()
.or_else(|_| {
.or_else(|_| -> anyhow::Result<rs::Runtime> {
// No existing Worker, create new one
let worker = rs::Worker::from_settings()?;

// Initialize pyo3 bridge (only happens once per process)
INIT.get_or_try_init(|| {
INIT.get_or_try_init(|| -> anyhow::Result<()> {
let primary = worker.tokio_runtime()?;
pyo3_async_runtimes::tokio::init_with_runtime(primary).map_err(|e| {
rs::error!("failed to initialize pyo3 static runtime: {:?}", e)
anyhow::anyhow!("failed to initialize pyo3 static runtime: {:?}", e)
})?;
rs::OK(())
Ok(())
})?;

rs::OK(worker.runtime().clone())
Ok(worker.runtime().clone())
})
.map_err(to_pyerr)?;

Expand Down
3 changes: 1 addition & 2 deletions lib/llm/src/block_manager/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub mod managed;
pub use managed::ManagedBlockPool;

use anyhow::Result;
use derive_builder::Builder;
use derive_getters::Dissolve;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -31,8 +32,6 @@ use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;

use dynamo_runtime::Result;

// Type aliases to reduce complexity across the module
type BlockPoolResult<T> = Result<T, BlockPoolError>;
type AsyncResponse<T> = Result<oneshot::Receiver<T>, BlockPoolError>;
Expand Down
42 changes: 22 additions & 20 deletions lib/llm/src/kv_router/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::kv_router::{
KV_EVENT_SUBJECT, KV_METRICS_SUBJECT,
indexer::{RouterEvent, compute_block_hash_for_seq},
protocols::*,
scoring::LoadEvent,
};
use dynamo_runtime::metrics::{MetricsHierarchy, prometheus_names::kvstats};
use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher};
use dynamo_runtime::{
Result,
component::{Component, Namespace},
transports::nats::{NatsQueue, QUEUE_NAME, Slug},
};
use std::fmt;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, OnceLock};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use std::time::Duration;

use anyhow::Result;
use rmp_serde as rmps;
use serde::Deserialize;
use serde::Serialize;
use serde::de::{self, Deserializer, IgnoredAny, MapAccess, SeqAccess, Visitor};
use std::fmt;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket};

use dynamo_runtime::metrics::{MetricsHierarchy, prometheus_names::kvstats};
use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher};
use dynamo_runtime::{
component::{Component, Namespace},
transports::nats::{NatsQueue, QUEUE_NAME, Slug},
};

use crate::kv_router::{
KV_EVENT_SUBJECT, KV_METRICS_SUBJECT,
indexer::{RouterEvent, compute_block_hash_for_seq},
protocols::*,
scoring::LoadEvent,
};

// -------------------------------------------------------------------------
// KV Event Publishers -----------------------------------------------------
// -------------------------------------------------------------------------
Expand Down Expand Up @@ -1025,7 +1027,7 @@ mod tests_startup_helpers {
&self,
event_name: impl AsRef<str> + Send + Sync,
event: &(impl serde::Serialize + Send + Sync),
) -> dynamo_runtime::Result<()> {
) -> anyhow::Result<()> {
let bytes = rmp_serde::to_vec(event).unwrap();
self.published
.lock()
Expand All @@ -1038,7 +1040,7 @@ mod tests_startup_helpers {
&self,
event_name: impl AsRef<str> + Send + Sync,
bytes: Vec<u8>,
) -> dynamo_runtime::Result<()> {
) -> anyhow::Result<()> {
self.published
.lock()
.unwrap()
Expand Down
36 changes: 19 additions & 17 deletions lib/llm/src/mocker/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,33 @@
//! This module provides an AsyncEngine implementation that wraps the Scheduler
//! to provide streaming token generation with realistic timing simulation.

use crate::kv_router::publisher::WorkerMetricsPublisher;
use crate::mocker::protocols::DirectRequest;
use crate::mocker::protocols::{MockEngineArgs, OutputSignal, WorkerType};
use crate::mocker::scheduler::Scheduler;
use crate::protocols::TokenIdType;
use crate::protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest};
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::protocols::annotated::Annotated;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use futures::StreamExt;
use rand::Rng;
use tokio::sync::{Mutex, OnceCell, mpsc};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::protocols::annotated::Annotated;
use dynamo_runtime::{
Result,
component::Component,
engine::AsyncEngineContextProvider,
pipeline::{AsyncEngine, Error, ManyOut, ResponseStream, SingleIn, async_trait},
traits::DistributedRuntimeProvider,
};
use futures::StreamExt;
use rand::Rng;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, OnceCell, mpsc};
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;

use crate::kv_router::publisher::WorkerMetricsPublisher;
use crate::mocker::protocols::DirectRequest;
use crate::mocker::protocols::{MockEngineArgs, OutputSignal, WorkerType};
use crate::mocker::scheduler::Scheduler;
use crate::protocols::TokenIdType;
use crate::protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest};

pub const MOCKER_COMPONENT: &str = "mocker";

Expand Down
1 change: 0 additions & 1 deletion lib/llm/tests/http_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ mod integration_tests {
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::discovery::DiscoveryQuery;
use dynamo_runtime::pipeline::RouterMode;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use std::sync::Arc;

#[tokio::test]
Expand Down
2 changes: 2 additions & 0 deletions lib/runtime/examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/runtime/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ repository = "https://github.com/ai-dynamo/dynamo.git"

[workspace.dependencies]
# local or crates.io
anyhow = "1"
dynamo-runtime = { path = "../" }
prometheus = { version = "0.14" }
1 change: 1 addition & 0 deletions lib/runtime/examples/hello_world/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ homepage.workspace = true
dynamo-runtime = { workspace = true }

# third-party
anyhow = { workspace = true }
6 changes: 3 additions & 3 deletions lib/runtime/examples/hello_world/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
// SPDX-License-Identifier: Apache-2.0

use dynamo_runtime::{
DistributedRuntime, Result, Runtime, Worker, logging, pipeline::PushRouter,
DistributedRuntime, Runtime, Worker, logging, pipeline::PushRouter,
protocols::annotated::Annotated, stream::StreamExt,
};
use hello_world::DEFAULT_NAMESPACE;

fn main() -> Result<()> {
fn main() -> anyhow::Result<()> {
logging::init();
let worker = Worker::from_settings()?;
worker.execute(app)
}

async fn app(runtime: Runtime) -> Result<()> {
async fn app(runtime: Runtime) -> anyhow::Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;

let client = distributed
Expand Down
13 changes: 8 additions & 5 deletions lib/runtime/examples/hello_world/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use dynamo_runtime::{
DistributedRuntime, Result, Runtime, Worker, logging,
DistributedRuntime, Runtime, Worker, logging,
pipeline::{
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn,
async_trait, network::Ingress,
Expand All @@ -13,13 +13,13 @@ use dynamo_runtime::{
use hello_world::DEFAULT_NAMESPACE;
use std::sync::Arc;

fn main() -> Result<()> {
fn main() -> anyhow::Result<()> {
logging::init();
let worker = Worker::from_settings()?;
worker.execute(app)
}

async fn app(runtime: Runtime) -> Result<()> {
async fn app(runtime: Runtime) -> anyhow::Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
backend(distributed).await
}
Expand All @@ -34,7 +34,10 @@ impl RequestHandler {

#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
async fn generate(
&self,
input: SingleIn<String>,
) -> anyhow::Result<ManyOut<Annotated<String>>> {
let (data, ctx) = input.into_parts();

let chars = data
Expand All @@ -48,7 +51,7 @@ impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for Reques
}
}

async fn backend(runtime: DistributedRuntime) -> Result<()> {
async fn backend(runtime: DistributedRuntime) -> anyhow::Result<()> {
// attach an ingress to an engine
let ingress = Ingress::for_engine(RequestHandler::new())?;

Expand Down
1 change: 1 addition & 0 deletions lib/runtime/examples/service_metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ repository.workspace = true
dynamo-runtime = { workspace = true }

# third-party
anyhow = { workspace = true }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use futures::StreamExt;
use service_metrics::DEFAULT_NAMESPACE;

use dynamo_runtime::{
DistributedRuntime, Result, Runtime, Worker, logging, pipeline::PushRouter,
DistributedRuntime, Runtime, Worker, logging, pipeline::PushRouter,
protocols::annotated::Annotated, utils::Duration,
};

fn main() -> Result<()> {
fn main() -> anyhow::Result<()> {
logging::init();
let worker = Worker::from_settings()?;
worker.execute(app)
}

async fn app(runtime: Runtime) -> Result<()> {
async fn app(runtime: Runtime) -> anyhow::Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;

let namespace = distributed.namespace(DEFAULT_NAMESPACE)?;
Expand Down
Loading
Loading