Skip to content

anthropics/connect-rust

Repository files navigation

connectrpc

A Tower-based Rust implementation of the ConnectRPC protocol, providing HTTP-based RPC with protobuf messages in either binary or JSON form.

Overview

connectrpc provides:

  • connectrpc — A Tower-based runtime library implementing the Connect protocol
  • protoc-gen-connect-rust — A protoc plugin that generates service traits, clients, and message types
  • connectrpc-buildbuild.rs integration for generating code at build time

The runtime is built on tower::Service, making it framework-agnostic. It integrates with any tower-compatible HTTP framework including Axum, Hyper, and others.

Quick Start

Define your service

// greet.proto
syntax = "proto3";
package greet.v1;

service GreetService {
  rpc Greet(GreetRequest) returns (GreetResponse);
}

message GreetRequest {
  string name = 1;
}

message GreetResponse {
  string greeting = 1;
}

Generate Rust code

Two workflows are supported. Both produce the same runtime API; pick the one that fits your build pipeline.

Option A - buf generate (recommended for checked-in code)

Runs two codegen plugins (protoc-gen-buffa for message types, protoc-gen-connect-rust for service stubs) and protoc-gen-buffa-packaging twice to assemble the mod.rs module tree for each output directory. The codegen plugins are invoked per-file; only the packaging plugin needs strategy: all.

# buf.gen.yaml
version: v2
plugins:
  - local: protoc-gen-buffa
    out: src/generated/buffa
    opt: [views=true, json=true]
  - local: protoc-gen-buffa-packaging
    out: src/generated/buffa
    strategy: all
  - local: protoc-gen-connect-rust
    out: src/generated/connect
    opt: [buffa_module=crate::proto]
  - local: protoc-gen-buffa-packaging
    out: src/generated/connect
    strategy: all
    opt: [filter=services]
// src/lib.rs
#[path = "generated/buffa/mod.rs"]
pub mod proto;
#[path = "generated/connect/mod.rs"]
pub mod connect;

buffa_module=crate::proto tells the service-stub generator where you mounted the buffa output. For a method input type greet.v1.GreetRequest it emits crate::proto::greet::v1::GreetRequest - the crate::proto root you named, then the proto package as nested modules, then the type. The second packaging invocation uses filter=services so the connect tree's mod.rs only include!s files that actually have service stubs in them. Changing the mount point requires regenerating.

The underlying option is extern_path=.=crate::proto - same format the Buf Schema Registry uses when generating Cargo SDKs. buffa_module=X is shorthand for the . catch-all case.

Option B - build.rs (generated at build time)

Unified output: message types and service stubs in one file per proto, assembled via a single include!. No plugin binaries required at build time.

[build-dependencies]
connectrpc-build = "0.2"
// build.rs
fn main() {
    connectrpc_build::Config::new()
        .files(&["proto/greet.proto"])
        .includes(&["proto/"])
        .include_file("_connectrpc.rs")
        .compile()
        .unwrap();
}
// lib.rs
include!(concat!(env!("OUT_DIR"), "/_connectrpc.rs"));

Implement the server

use connectrpc::{Router, ConnectRpcService, Context, ConnectError};
use buffa::OwnedView;
use std::sync::Arc;

struct MyGreetService;

impl GreetService for MyGreetService {
    async fn greet(
        &self,
        ctx: Context,
        request: OwnedView<GreetRequestView<'static>>,
    ) -> Result<(GreetResponse, Context), ConnectError> {
        // `request` derefs to the view — string fields are borrowed `&str`
        // directly from the request buffer (zero-copy).
        let response = GreetResponse {
            greeting: format!("Hello, {}!", request.name),
            ..Default::default()
        };
        Ok((response, ctx))
    }
}

With Axum (recommended)

use axum::{Router, routing::get};
use connectrpc::Router as ConnectRouter;
use std::sync::Arc;

let service = Arc::new(MyGreetService);
let connect = service.register(ConnectRouter::new());

let app = Router::new()
    .route("/health", get(|| async { "OK" }))
    .fallback_service(connect.into_axum_service());

let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
axum::serve(listener, app).await?;

Standalone server

For simple cases, enable the server feature for a built-in hyper server:

use connectrpc::{Router, Server};
use std::sync::Arc;

let service = Arc::new(MyGreetService);
let router = service.register(Router::new());

Server::new(router).serve("127.0.0.1:8080".parse()?).await?;

Client

Enable the client feature for HTTP client support with connection pooling:

use connectrpc::client::{HttpClient, ClientConfig};

let http = HttpClient::plaintext();  // cleartext http:// only; use with_tls() for https://
let config = ClientConfig::new("http://localhost:8080".parse()?);
let client = GreetServiceClient::new(http, config);

let response = client.greet(GreetRequest {
    name: "World".into(),
}).await?;

Per-call options and client-wide defaults

Generated clients expose both a no-options convenience method and a _with_options variant for per-call control (timeout, headers, max message size, compression override):

use connectrpc::client::CallOptions;
use std::time::Duration;

// Per-call timeout
let response = client.greet_with_options(
    GreetRequest { name: "World".into() },
    CallOptions::default().with_timeout(Duration::from_secs(5)),
).await?;

For options you want on every call (e.g. auth headers, a default timeout), set them on ClientConfig instead — the no-options method picks them up automatically:

let config = ClientConfig::new("http://localhost:8080".parse()?)
    .default_timeout(Duration::from_secs(30))
    .default_header("authorization", "Bearer ...");

let client = GreetServiceClient::new(http, config);

// Uses the 30s timeout and auth header without repeating them:
let response = client.greet(request).await?;

Per-call CallOptions override config defaults (options win).

Feature Flags

Feature Default Description
gzip Yes Gzip compression via flate2
zstd Yes Zstandard compression via zstd
streaming Yes Streaming compression via async-compression
client No HTTP client transports (plaintext)
client-tls No TLS for client transports (HttpClient::with_tls, Http2Connection::connect_tls)
server No Standalone hyper-based server
server-tls No TLS for the built-in server (Server::with_tls)
tls No Convenience: enables both server-tls + client-tls
axum No Axum framework integration

Minimal build (no compression)

[dependencies]
connectrpc = { version = "0.2", default-features = false }

With Axum integration

[dependencies]
connectrpc = { version = "0.2", features = ["axum"] }

Generated Code Dependencies

Code generated by protoc-gen-connect-rust requires these dependencies:

[dependencies]
connectrpc = { version = "0.2", features = ["client"] }
buffa = { version = "0.2", features = ["json"] }
buffa-types = { version = "0.2", features = ["json"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
http-body = "1"

Protocol Support

Protocol Status
Connect (unary + streaming)
gRPC over HTTP/2
gRPC-Web

All 3,600 ConnectRPC server conformance tests pass for all three protocols, plus 1,514 TLS conformance tests and 1,444 client conformance tests. Run with task conformance:test.

RPC type Status
Unary ✓ (POST + GET for idempotent methods)
Server streaming
Client streaming
Bidirectional streaming

Not yet implemented: gRPC server reflection.

Performance

Comparison against tonic 0.14 (the standard Rust gRPC implementation, built on the same hyper/h2 stack). Measured on Intel Xeon Platinum 8488C with buffa as the proto library. Higher is better unless noted.

Single-request latency

Criterion benchmarks at concurrency=1 (no h2 contention), measuring per-request framework + proto work in isolation. Lower is better.

Single-request latency

Raw data (μs, lower is better)
Benchmark connectrpc-rs tonic ratio
unary_small (1 int32 + nested msg) 87.6 170.8 1.95×
unary_logs_50 (50 log records, ~15 KB) 195.0 338.5 1.74×
client_stream (10 messages) 166.1 223.8 1.35×
server_stream (10 messages) 109.8 110.1 1.00×

Run with task bench:cross:quick.

Echo throughput

64-byte string echo, 8 h2 connections (to avoid single-connection mutex contention — see h2 #531). Measures framework dispatch + envelope framing + proto encode/decode with minimal handler work.

Echo throughput

Raw data (req/s)
Concurrency connectrpc-rs tonic
c=16 170,292 168,811 (−1%)
c=64 238,498 234,304 (−2%)
c=256 252,000 247,167 (−2%)

Run with task bench:echo -- --multi-conn=8.

Log ingest (decode-heavy)

50 structured log records per request (~22 KB batch): varints, string fields, nested message, map entries. Handler iterates every field to force full decode. This is where the proto library matters — buffa's zero-copy views avoid the per-string allocations that prost's owned types require.

Log ingest throughput

Raw data (req/s)
Concurrency connectrpc-rs tonic
c=16 32,257 28,110 (−13%)
c=64 73,313 68,690 (−6%)
c=256 112,027 84,171 (−25%)

At c=256, connectrpc-rs decodes 5.6M records/sec vs tonic's 4.2M.

Raw mode (strict_utf8_mapping): For trusted-source log ingestion where UTF-8 validation is unnecessary, buffa can emit &[u8] instead of &str for string fields (editions utf8_validation = NONE + the strict_utf8_mapping codegen option). CPU profile shows this eliminates 11.8% of server CPU (str::from_utf8 drops to zero). End-to-end throughput gain in this benchmark is smaller (~1%) because client encode becomes the bottleneck when both run on one machine — in production with separate client/server, the server sees ~15% more capacity.

Run with task bench:log.

Fortunes (realistic workload + backing store)

Handler performs a network round-trip to a valkey container (HGETALL of 12 fortune messages, ~800 bytes), adds an ephemeral record, sorts, and encodes a 13-message response. This is the shape of a typical read-mostly service: RPC framing + async I/O wait + moderate-size response. All three servers use an 8-connection valkey pool; client uses 8 h2 connections so protocol framing is the only variable.

Raw data (req/s, c=256)

Cross-implementation (gRPC protocol):

Implementation req/s vs connectrpc-rs
connectrpc-rs 199,574
tonic 192,127 −4%
connect-go 88,054 −56%

Protocol framing (connectrpc-rs server):

Protocol c=16 c=64 c=256 Connect ÷ gRPC
Connect 73,511 177,700 245,173 1.23×
gRPC 69,706 157,481 199,574
gRPC-Web 69,067 153,727 191,811

Connect's ~20% unary throughput advantage over gRPC at c=256 comes from simpler framing: no envelope header, no trailing HEADERS frame. At 200k+ req/s, gRPC's trailer frame is ~200k extra h2 HEADERS encodes per second. The gap grows with throughput (5% @ c=16 → 23% @ c=256).

Run with task bench:fortunes:protocols:h2. Requires docker for the valkey sibling container (image pulled automatically on first run).

Where the advantage comes from

CPU profile breakdown (log-ingest, c=64, 30s, task profile:log):

Cost center connectrpc-rs tonic
Proto decode (views/owned) 14.7% 2.1%
UTF-8 validation 11.2% 4.0%
Varint decode 2.2% 3.5%
String alloc + copy ~0 6.2%
HashMap ops (map fields) ~0 8.5%
Total proto 27.1% ~24% (+allocator)
Allocator (malloc/free/realloc) 3.6% 9.6%

connectrpc-rs spends a larger fraction of CPU in proto decode — because it spends so much less everywhere else. buffa's view types borrow string data directly from the request buffer (zero allocs per string field); MapView is a flat Vec<(K,V)> scan with no hashing. tonic/prost must fully materialize String + HashMap<String,String> for every record before the handler runs.

The framework itself contributes: codegen-emitted FooServiceServer<T> with compile-time match dispatch (no Arc<dyn Handler> vtable), a two-frame GrpcUnaryBody for the common unary case, and stream-message batching into fewer h2 DATA frames.

Custom Compression

The compression system is pluggable:

use connectrpc::{CompressionProvider, CompressionRegistry, ConnectError};
use bytes::Bytes;

struct MyCompression;

impl CompressionProvider for MyCompression {
    fn name(&self) -> &'static str { "my-algo" }
    fn compress(&self, data: &[u8]) -> Result<Bytes, ConnectError> { /* ... */ }
    fn decompressor<'a>(&self, data: &'a [u8]) -> Result<Box<dyn std::io::Read + 'a>, ConnectError> {
        // Return a reader that yields decompressed bytes. The framework
        // controls how much is read, so decompress_with_limit is safe by default.
        /* ... */
    }
}

let registry = CompressionRegistry::default().register(MyCompression);

Protocol Specifications

This implementation tracks the following upstream specifications:

Local copies can be fetched with task specs:fetch (see docs/specs/).

Contributing

By submitting a pull request, you agree to the terms of our Contributor License Agreement.

License

This project is licensed under the Apache License, Version 2.0.

About

An implementation of the ConnectRPC protocol for Rust

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages