Tokio-native typed TCP/UDP pipelines inspired by Netty.
中文文档 · English Docs · API Reference · Crate
rs-netty is a small Rust networking framework that keeps the useful parts of
Netty's Channel / Pipeline / Handler model while rebuilding the main path around
Rust ownership, async/await, Tokio tasks, typed messages, and bounded queues.
The core idea is simple: a pipeline is not a dynamically reordered bag of handlers. It is a typed sequence:
codec -> inbound* -> business* -> handler -> outbound*
Invalid stage order, message mismatches, and TCP/UDP pipeline mixups are caught by the Rust type checker instead of failing at runtime.
- Tokio-native: built on Tokio TCP/UDP sockets, tasks, channels, and async IO rather than a Java-style event loop abstraction.
- Netty-inspired model: use channels, pipelines, handlers, codecs, lifecycle hooks, and write/flush semantics in a Rust-shaped API.
- Typed pipeline construction: builder states enforce
codec -> inbound* -> business* -> handler -> outbound*at compile time. - Separate TCP and UDP builders: stream pipelines and datagram pipelines are different types, so they cannot be accidentally mixed.
- Bounded outbound queues: channels use bounded Tokio queues to make backpressure visible instead of allowing unbounded writes.
- Graceful shutdown and lifecycle hooks: servers expose shutdown handles and optional hooks for server, connection, and socket lifecycle events.
- Zero unsafe: the crate does not use
unsafe. - Practical codecs and examples: TCP, UDP, JSON, HTTP, MQTT, WebSocket, typed chains, lifecycle hooks, benchmarks, and compile-fail tests are included.
Add the crate:
[dependencies]
rs-netty = "1.1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }The default feature set includes rs-netty-macros, which provides the
#[handler] macro used in the examples below.
Build a typed TCP line echo server:
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};
#[tokio::main]
async fn main() -> Result<()> {
TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(Echo)
})
.run()
.await
}
struct Echo;
#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
Ok(msg)
}Talk to it with a typed TCP client:
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpClient};
use tokio::sync::oneshot;
#[tokio::main]
async fn main() -> Result<()> {
let (tx, rx) = oneshot::channel();
let client = TcpClient::connect("127.0.0.1:9000")
.pipeline_instance(
pipeline()
.codec(LineCodec::new())
.handler(PrintResponse { done: Some(tx) }),
)
.run()
.await?;
client.write_and_flush("hello".to_string()).await?;
let _ = rx.await;
client.close().await?;
client.wait().await
}
struct PrintResponse {
done: Option<oneshot::Sender<()>>,
}
#[handler(PrintResponse, write = String)]
async fn print_response(handler: &mut PrintResponse, msg: String) -> Result<()> {
println!("server -> {msg}");
if let Some(done) = handler.done.take() {
let _ = done.send(());
}
Ok(())
}UDP uses a datagram pipeline and a datagram codec:
use rs_netty::{
codec::Utf8DatagramCodec, datagram_pipeline, DatagramContext, DatagramHandler, Result,
UdpServer,
};
#[tokio::main]
async fn main() -> Result<()> {
UdpServer::bind("127.0.0.1:9002")
.pipeline(|| {
datagram_pipeline()
.codec(Utf8DatagramCodec)
.handler(UdpEcho)
})
.run()
.await
}
struct UdpEcho;
impl DatagramHandler<String> for UdpEcho {
type Write = String;
async fn read(&mut self, ctx: &mut DatagramContext<Self::Write>, msg: String) -> Result<()> {
ctx.write_and_flush(format!("echo: {msg}")).await
}
}UDP support is socket-oriented. UdpServer uses one socket-level pipeline and
does not create per-peer child pipelines. If you need per-peer state, store it in
your handler, for example with a HashMap<SocketAddr, PeerState>.
TLS is a TCP transport layer, not a pipeline codec. Enable the tls feature,
build a server or client context, and attach it with .tls(...) before running
the same typed pipeline:
[dependencies]
rs-netty = { version = "1.0.0", features = ["tls"] }use rs_netty::{codec::LineCodec, pipeline, Result, TcpServer, TlsContextBuilder};
#[tokio::main]
async fn main() -> Result<()> {
let tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/server-chain.pem"))
.private_key_pem(include_bytes!("certs/server-key.pem"))
.build()?;
TcpServer::bind("127.0.0.1:9443")
.tls(tls)
.pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
.run()
.await
}Client trust is selected with a typestate builder, so
TlsContextBuilder::for_client().build() does not compile until you choose a
trust strategy such as root_certificate_pem, native_roots, webpki_roots,
or the feature-gated development helper danger_accept_invalid_certs.
For required mTLS, configure trusted client roots on the server and a client identity on the client:
let server_tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/server-chain.pem"))
.private_key_pem(include_bytes!("certs/server-key.pem"))
.client_auth_required_pem(include_bytes!("certs/client-ca.pem"))
.build()?;
let client_tls = TlsContextBuilder::for_client()
.root_certificate_pem(include_bytes!("certs/server-ca.pem"))
.client_identity_pem(
include_bytes!("certs/client-chain.pem"),
include_bytes!("certs/client-key.pem"),
)
.server_name("localhost")
.build()?;For optional mTLS, use client_auth_optional_pem or
client_auth_optional_der. Clients may connect without a certificate, while a
certificate is still verified when one is presented:
let server_tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/server-chain.pem"))
.private_key_pem(include_bytes!("certs/server-key.pem"))
.client_auth_optional_pem(include_bytes!("certs/client-ca.pem"))
.build()?;Servers and clients can advertise ALPN protocols with alpn_protocols. A
selected protocol is exposed through TlsInfo; if both sides configure ALPN
but there is no common protocol, the TLS handshake fails.
let server_tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/server-chain.pem"))
.private_key_pem(include_bytes!("certs/server-key.pem"))
.alpn_protocols([b"h2".as_slice(), b"http/1.1".as_slice()])
.build()?;One listener can serve multiple certificate identities with SNI. Configure a
default certificate as the fallback, then add named identities with
sni_certificate_pem or sni_certificate_der:
let server_tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/default-chain.pem"))
.private_key_pem(include_bytes!("certs/default-key.pem"))
.sni_certificate_pem(
"api.example.com",
include_bytes!("certs/api-chain.pem"),
include_bytes!("certs/api-key.pem"),
)
.build()?;When TLS is negotiated, ctx.tls() returns TlsInfo from TCP handlers and
stream transformation contexts. ConnInfo::tls() also exposes the same
metadata to lifecycle hooks. TlsInfo includes the peer certificate chain, the
selected ALPN protocol, and the effective server name or server-side SNI.
TCP stream pipelines start with pipeline():
pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
UDP datagram pipelines start with datagram_pipeline():
datagram_pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
The builders expose methods only in valid states. Message transitions are checked with trait bounds, so:
- a handler input must match the previous inbound/business output;
- outbound input must match
Handler::WriteorDatagramHandler::Write; - the final outbound type must be encodable by the selected codec;
TcpServer/TcpClientaccept only stream pipelines;UdpServer/UdpClientaccept only datagram pipelines.
This is intentionally different from Java Netty's dynamic pipeline. The Rust API trades runtime handler mutation for compile-time ordering and message checks.
Stream codecs and stages:
LineCodecLengthFieldBasedFrameDecoderLengthFieldPrependerFixedLengthFrameDecoderDelimiterBasedFrameDecoderByteArrayDecoderByteArrayEncoderHttpCodecMqttCodecWebSocketCodecandHttpWsCodecbehind thewebsocketfeatureJsonDecode<T>andJsonEncode<T>behind thejsonfeature
Datagram codecs:
Utf8DatagramCodecBytesDatagramCodec
JSON is modeled as ordinary pipeline stages, so framing and serialization remain separate:
[dependencies]
rs-netty = { version = "1.0.0", features = ["json"] }
serde = { version = "1", features = ["derive"] }use rs_netty::{
codec::{JsonDecode, JsonEncode, LineCodec},
handler, pipeline,
};
#[derive(serde::Deserialize)]
struct Request {
op: String,
}
#[derive(serde::Serialize)]
struct Response {
ok: bool,
}
struct ApiHandler;
#[handler(ApiHandler)]
async fn handle_api(_req: Request) -> rs_netty::Result<Response> {
Ok(Response { ok: true })
}
let pipeline = pipeline()
.codec(LineCodec::new())
.inbound(JsonDecode::<Request>::new())
.handler(ApiHandler)
.outbound(JsonEncode::<Response>::new());Servers and clients can attach optional lifecycle hooks with .life(...).
Applications that do not need hooks use the default NoLife.
use std::net::SocketAddr;
use rs_netty::{codec::LineCodec, pipeline, Life, Result, TcpServer};
#[derive(Clone, Copy)]
struct TraceLife;
impl Life for TraceLife {
async fn tcp_server_started(&self, local_addr: SocketAddr) -> Result<()> {
tracing::info!(%local_addr, "tcp server started");
Ok(())
}
}
TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(MyHandler)
})
.life(TraceLife)
.run()
.awaitServers also support external shutdown handles:
let server = TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(MyHandler)
})
.start()
.await?;
server.shutdown();
server.wait().await?;Channel, Context, DatagramChannel, and DatagramContext expose
write, flush, and write_and_flush:
writestages outbound messages;flushpushes staged messages to the socket task;write_and_flushdoes both;- a completed flush means the local socket write/send path completed, not that the remote peer acknowledged the message.
Outbound queues are bounded. Server and client builders expose
.outbound_queue_size(...) when you need to tune queue capacity.
The repository includes benchmark harnesses for rs-netty, bare Tokio, and Java
Netty under benchmarks/. They measure throughput, latency percentiles, and
server RSS for TCP line echo, TCP length-field echo, and UDP echo scenarios.
The table below comes from one local non-loopback run with TCP rows using
TCP_NODELAY=true, 100 connections, 1,000,000 messages, 128-byte payloads,
in-flight 16, and 100,000 untimed Netty warmup messages. UDP rows used 100
clients, 1,000,000 datagrams, 128-byte payloads, and 100,000 untimed Netty
warmup datagrams. Treat these numbers as a directional snapshot, not a universal
performance promise.
| Protocol | Implementation | Throughput | P99 Latency | Server Max RSS |
|---|---|---|---|---|
| line | rs-netty | 260,483 msg/s | 10,853 us | 5,056 KB |
| line | Tokio | 266,537 msg/s | 10,521 us | 3,312 KB |
| line | Netty | 176,980 msg/s | 10,657 us | 597,040 KB |
| length-field | rs-netty | 438,633 msg/s | 17,729 us | 5,216 KB |
| length-field | Tokio | 156,356 msg/s | 18,167 us | 2,496 KB |
| length-field | Netty | 177,886 msg/s | 10,992 us | 569,232 KB |
| UDP | rs-netty | 31,090 msg/s | 3,487 us | 2,672 KB |
| UDP | Tokio | 32,270 msg/s | 3,325 us | 2,272 KB |
| UDP | Netty | 35,323 msg/s | 3,112 us | 346,624 KB |
Run the examples from the repository root:
cargo run --example tcp_echo_server
cargo run --example tcp_echo_client
cargo run --example tcp_json_line_echo --features json
cargo run --example tcp_lifecycle
cargo run --example tcp_tls_echo --features tls
cargo run --example tcp_typed_chain
cargo run --example tcp_typed_chain_client
cargo run --example udp_echo_server
cargo run --example udp_echo_client
cargo run --example udp_typed_chain
cargo run --example udp_typed_chain_client
cargo run --example websocket_server --features websocket
cargo run --example http_websocket_server --features websocketrs-netty deliberately does not expose or implement some Java Netty patterns on
the main path:
- No public EventLoop API.
- No reference-counted
ByteBufAPI. - No
ChannelFuture/PromiseAPI. - No dynamic
Box<dyn Handler>main path. - No TLS pipeline stage; TLS, required/optional mTLS, ALPN, and SNI are optional TCP transport capabilities.
- No codec registry.
- No automatic UDP reliability, ordering, or retransmission.
- No per-peer UDP child pipeline.
Licensed under the Apache License, Version 2.0. See LICENSE.
