English | 简体ä¸ć–‡
evno is a high-performance, type-safe asynchronous event bus library built on the Tokio runtime. It leverages the low-latency, multicast ring buffer provided by gyre, combined with the structured concurrency of the acty Actor model, to deliver an event distribution system that features middleware capabilities and reliable lifecycle management.
evno is designed to provide a high-performance event system with robust and predictable lifecycle management.
evno ensures that event delivery does not begin until all listeners currently starting up have completed their subscription registration. This eliminates concerns about transient event loss due to asynchronous startup race conditions, guaranteeing that listeners always start receiving events from the initial point in the stream.
Every Listener started via Bus::bind runs in an independent asynchronous task with strict lifecycle control.
| Method | Description |
|---|---|
Bus::bind / Bus::on / Bus::once / Bus::many |
Starts a new event listening task. |
SubscribeHandle |
Provides explicit control for active task cancellation (cancel()) and waiting for completion (.await). |
CancellationToken |
Embedded within the Listener's handle method, enabling the Listener's internal logic to perform conditional self-cancellation. |
evno allows you to build event processing pipelines (middleware) using the Chain and Step Traits. A Step is responsible for transforming an event from type E_in to type E_out, enabling features like context injection, logging, or data normalization.
- Type Safety: The input and output types of the pipeline are determined at compile time, ensuring downstream listeners receive the expected, processed event type.
- Chaining: Use
chain.prepend(Step)to add a new processing step to the front of the existing chain.
Bus instances are cloneable, with all clones sharing the underlying event system and lifecycle state.
| Method | Semantics | Behavior |
|---|---|---|
bus.drain() |
Global Forced Drain. Consumes self. |
Blocks until 1. All Bus clones have been dropped, and 2. All running Listener tasks have completed processing and exited. |
bus.close() |
Conditional Graceful Shutdown. Consumes self. |
If the current Bus instance is the last remaining reference, it executes a full drain(). Otherwise, it only drops the current reference and returns immediately. |
Best Practice: When exiting the application, call close() on the objects holding Bus references. The system will automatically trigger a global drain only when the very last reference is released.
We will demonstrate the core usage of evno through a series of examples.
Adding Dependencies:
Add evno and tokio to your Cargo.toml.
[dependencies]
evno = "1"
tokio = { version = "1", features = ["full"] }Define an event, start a continuous listener, and send events.
use evno::{Bus, Emit, Close, Guard, from_fn};
// 1. Define the event
#[derive(Debug, Clone)]
struct UserAction(String);
#[tokio::main]
async fn main() {
// Initialize Bus with capacity 4
let bus = Bus::new(4);
// 2. Bind a continuous listener (Bus::on is an alias for Bus::bind)
bus.on(from_fn(|event: Guard<UserAction>| async move {
println!("[Listener A] Received action: {}", event.0);
}));
// 3. Bind a second listener; both will receive the same events
bus.on(from_fn(|event: Guard<UserAction>| async move {
// Guard<E> is an ownership wrapper for the event. When it is dropped,
// the bus releases the underlying buffer resources.
println!("[Listener B] Confirming: {}", event.0);
}));
// 4. Emit events
bus.emit(UserAction("Login".to_string())).await;
bus.emit(UserAction("UpdateProfile".to_string())).await;
// 5. Graceful shutdown, waiting for all event processing to complete
bus.close().await;
println!("Bus closed successfully, all listeners finished.");
}Bus provides once (listen once) and many (listen N times) methods, as well as active cancellation via SubscribeHandle.
use evno::{Bus, Emit, Guard, Close, from_fn};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone)]
struct CounterEvent(u32);
#[tokio::main]
async fn main() {
let bus = Bus::new(4);
let counter = Arc::new(AtomicUsize::new(0));
// 1. Listen once (once)
let counter_clone = counter.clone();
bus.once(from_fn(move |_event: Guard<CounterEvent>| {
let c = counter_clone.clone();
async move { c.fetch_add(1, Ordering::SeqCst); }
}));
// 2. Listen three times (many)
let counter_clone = counter.clone();
let handle_many = bus.many(3, from_fn(move |_event: Guard<CounterEvent>| {
let c = counter_clone.clone();
async move { c.fetch_add(1, Ordering::SeqCst); }
}));
// 3. Emit 5 events
for i in 0..5 {
bus.emit(CounterEvent(i)).await;
}
// Wait for the 'many' listener to complete (it automatically exits after the 3rd event)
handle_many.await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 4); // 1 (once) + 3 (many)
// 4. Demonstrate active cancellation
let handle_cancel = bus.on(from_fn(move |_event: Guard<CounterEvent>| async move {
unreachable!("This task should have been cancelled.");
}));
// Immediately cancel the task
let join_handle = handle_cancel.cancel();
// Wait for the task to confirm exit
assert!(join_handle.await.is_ok());
bus.close().await;
}Use Chain and Step to implement an event pipeline that injects context data before the event reaches the Bus.
use evno::{Bus, Chain, Close, Emit, Event, Guard, Step, from_fn};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
// 1. Original event type
#[derive(Debug, Clone, PartialEq)]
struct OriginalEvent(String);
// 2. Injected context
#[derive(Debug, Clone, PartialEq)]
struct RequestContext { request_id: u64 }
// 3. Transformed event type
#[derive(Debug, Clone, PartialEq)]
struct ContextualEvent<E>(E, RequestContext);
// 4. Define Step: Request ID Injector
#[derive(Clone)]
struct RequestInjector(Arc<AtomicU64>);
impl Step for RequestInjector {
// Define the output type: Input E -> Output ContextualEvent<E>
type Event<E: Event> = ContextualEvent<E>;
async fn process<E: Event>(self, event: E) -> Self::Event<E> {
let id = self.0.fetch_add(1, Ordering::Relaxed);
ContextualEvent(event, RequestContext { request_id: id })
}
}
#[tokio::main]
async fn main() {
let bus = Bus::new(4);
let counter = Arc::new(AtomicU64::new(100));
// 5. Build the event chain: Bus <- RequestInjector
// All OriginalEvents will first pass through RequestInjector
let chain = Chain::from(bus.clone()).prepend(RequestInjector(counter));
// 6. Bind listener: Must listen for the final type ContextualEvent<OriginalEvent>
bus.on(from_fn(
move |event: Guard<ContextualEvent<OriginalEvent>>| async move {
let (original, context) = (&event.0, &event.1);
println!(
"[Listener] ID: {} -> Event: {}",
context.request_id,
original.0
);
},
));
// 7. Emit the original event through the Chain
chain.emit(OriginalEvent("First request".to_string())).await;
chain.emit(OriginalEvent("Second request".to_string())).await;
// 8. Graceful shutdown
chain.close().await;
bus.close().await;
}You can use to_emitter::<E>() to obtain a sender endpoint for a specific event type. This is convenient for encapsulating sending logic or integrating with other systems. If obtained from a Chain, the returned Emitter automatically applies all Step logic in the chain.
use evno::{Bus, Chain, Emit, TypedEmit, Close};
// Reuse RequestInjector and OriginalEvent definitions from the previous example
#[tokio::main]
async fn main() {
let bus = Bus::new(4);
let counter = Arc::new(AtomicU64::new(200));
let chain = Chain::from(bus.clone()).prepend(RequestInjector(counter));
// 1. Get a Typed Emitter from the Chain
// Events sent through this emitter will automatically pass RequestInjector
let chained_emitter = chain.to_emitter::<OriginalEvent>();
chained_emitter.emit(OriginalEvent("Action via Chained Emitter".to_string())).await;
// 2. Get a raw Typed Emitter directly from the Bus (bypasses the Chain)
let raw_emitter = bus.to_emitter::<OriginalEvent>();
raw_emitter.emit(OriginalEvent("Action via Raw Emitter".to_string())).await;
// Note: Events sent via raw_emitter will NOT be processed by RequestInjector
bus.close().await;
}| Trait / Struct | Description |
|---|---|
Bus |
The core event bus structure, used for event distribution and lifecycle management. |
Emit |
Generic sending Trait, allowing the sending of any type implementing Event (bus.emit(E)). |
TypedEmit |
Specific type sending Trait, used by type-fixed Emitters. |
Drain / Close |
Defines the asynchronous Traits for graceful bus shutdown and resource cleanup. |
Listener |
The Trait implemented by users to define event handling logic, including begin, handle, and after lifecycle hooks. |
Guard<E> |
The event data wrapper type, representing ownership, whose Drop behavior controls the release of underlying resources. |
SubscribeHandle |
The handle for a listener task, used for cancellation or waiting for completion. |
Chain |
The event processing pipeline structure, used to combine multiple Steps. |
Step |
The Trait defining event transformation logic, implementing event type modification. |
This project is distributed under either the MIT license or the Apache 2.0 license, at your option: