Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(rpc): Add a tonic server in zebra-rpc #8674

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6014,11 +6014,16 @@ dependencies = [
"jsonrpc-derive",
"jsonrpc-http-server",
"proptest",
"prost",
"rand 0.8.5",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"tonic 0.11.0",
"tonic-build 0.11.0",
"tonic-reflection",
"tower",
"tracing",
"zcash_address",
Expand Down
61 changes: 52 additions & 9 deletions zebra-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,23 @@ homepage = "https://zfnd.org/zebra/"
# crates.io is limited to 5 keywords and categories
keywords = ["zebra", "zcash"]
# Must be one of <https://crates.io/category_slugs>
categories = ["asynchronous", "cryptography::cryptocurrencies", "encoding", "network-programming"]
categories = [
"asynchronous",
"cryptography::cryptocurrencies",
"encoding",
"network-programming",
]

[features]

indexer-rpcs = [
"tonic-build",
"tonic",
"tonic-reflection",
"prost",
"tokio-stream",
]

# Production features that activate extra dependencies, or extra features in dependencies

# Mining RPC support
Expand All @@ -41,7 +54,10 @@ proptest-impl = [
]

[dependencies]
chrono = { version = "0.4.38", default-features = false, features = ["clock", "std"] }
chrono = { version = "0.4.38", default-features = false, features = [
"clock",
"std",
] }
futures = "0.3.30"

# lightwalletd sends JSON-RPC requests over HTTP 1.1
Expand All @@ -55,14 +71,26 @@ jsonrpc-http-server = "18.0.0"
serde_json = { version = "1.0.117", features = ["preserve_order"] }
indexmap = { version = "2.2.6", features = ["serde"] }

tokio = { version = "1.37.0", features = ["time", "rt-multi-thread", "macros", "tracing"] }
tokio = { version = "1.37.0", features = [
"time",
"rt-multi-thread",
"macros",
"tracing",
] }
tower = "0.4.13"

# indexer-rpcs dependencies
tonic = { version = "0.11.0", optional = true }
tonic-reflection = { version = "0.11.0", optional = true }
prost = { version = "0.12.6", optional = true }
tokio-stream = { version = "0.1.15", optional = true }

tracing = "0.1.39"

hex = { version = "0.4.3", features = ["serde"] }
serde = { version = "1.0.203", features = ["serde_derive"] }


zcash_primitives = { version = "0.15.0" }

# Experimental feature getblocktemplate-rpcs
Expand All @@ -73,13 +101,20 @@ zcash_address = { version = "0.3.2", optional = true }
# Test-only feature proptest-impl
proptest = { version = "1.4.0", optional = true }

zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["json-conversion"] }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [
"json-conversion",
] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38" }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = [
"rpc-client",
] }
zebra-script = { path = "../zebra-script", version = "1.0.0-beta.38" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38" }

[build-dependencies]
tonic-build = { version = "0.11.0", optional = true }

[dev-dependencies]
insta = { version = "1.39.0", features = ["redactions", "json", "ron"] }

Expand All @@ -88,9 +123,17 @@ proptest = "1.4.0"
thiserror = "1.0.61"
tokio = { version = "1.37.0", features = ["full", "tracing", "test-util"] }

zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }

zebra-test = { path = "../zebra-test", version = "1.0.0-beta.38" }
15 changes: 15 additions & 0 deletions zebra-rpc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! Compile proto files

fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(feature = "indexer-rpcs")]
{
use std::{env, path::PathBuf};
let out_dir = env::var("OUT_DIR").map(PathBuf::from);
tonic_build::configure()
.type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]")
.file_descriptor_set_path(out_dir.unwrap().join("indexer_descriptor.bin"))
.compile(&["proto/indexer.proto"], &[""])?;
}

Ok(())
}
10 changes: 10 additions & 0 deletions zebra-rpc/proto/indexer.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";
package zebra.indexer.rpc;

// Used by methods that take no arguments.
message Empty {};

service Indexer {
// Notifies listeners of chain tip changes
rpc ChainTipChange(Empty) returns (stream Empty);
}
19 changes: 19 additions & 0 deletions zebra-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ pub struct Config {
/// They can also query your node's state.
pub listen_addr: Option<SocketAddr>,

/// IP address and port for the indexer RPC server.
///
/// Note: The indexer RPC server is disabled by default.
/// To enable the indexer RPC server, compile `zebrad` with the
/// `indexer` feature flag and set a listen address in the config:
/// ```toml
/// [rpc]
/// indexer_listen_addr = '127.0.0.1:8232'
/// ```
///
/// # Security
///
/// If you bind Zebra's indexer RPC port to a public IP address,
/// anyone on the internet can query your node's state.
pub indexer_listen_addr: Option<SocketAddr>,

/// The number of threads used to process RPC requests and responses.
///
/// Zebra's RPC server has a separate thread pool and a `tokio` executor for each thread.
Expand Down Expand Up @@ -65,6 +81,9 @@ impl Default for Config {
// Disable RPCs by default.
listen_addr: None,

// Disable indexer RPCs by default.
indexer_listen_addr: None,

// Use a single thread, so we can detect RPC port conflicts.
#[cfg(not(feature = "getblocktemplate-rpcs"))]
parallel_cpu_threads: 1,
Expand Down
13 changes: 13 additions & 0 deletions zebra-rpc/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! A tonic RPC server for Zebra's indexer API.

#[cfg(test)]
mod tests;

pub mod methods;
pub mod server;

// The generated indexer proto
tonic::include_proto!("zebra.indexer.rpc");

pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("indexer_descriptor");
57 changes: 57 additions & 0 deletions zebra-rpc/src/indexer/methods.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! Implements `Indexer` methods on the `IndexerRPC` type

use std::pin::Pin;

use futures::Stream;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Response, Status};
use tower::BoxError;

use zebra_chain::chain_tip::ChainTip;

use super::{indexer_server::Indexer, server::IndexerRPC, Empty};

/// The maximum number of messages that can be queued to be streamed to a client
const RESPONSE_BUFFER_SIZE: usize = 10_000;

#[tonic::async_trait]
impl<ReadStateService, Tip> Indexer for IndexerRPC<ReadStateService, Tip>
where
ReadStateService: tower::Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
type ChainTipChangeStream = Pin<Box<dyn Stream<Item = Result<Empty, Status>> + Send>>;

async fn chain_tip_change(
&self,
_: tonic::Request<Empty>,
) -> Result<Response<Self::ChainTipChangeStream>, Status> {
let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
let response_stream = ReceiverStream::new(response_receiver);
let mut chain_tip_change = self.chain_tip_change.clone();

tokio::spawn(async move {
// Notify the client of chain tip changes until the channel is closed
while let Ok(()) = chain_tip_change.best_tip_changed().await {
let tx = response_sender.clone();
tokio::spawn(async move { tx.send(Ok(Empty {})).await });
}

let _ = response_sender
.send(Err(Status::unavailable(
"chain_tip_change channel has closed",
)))
.await;
});

Ok(Response::new(Box::pin(response_stream)))
}
}
77 changes: 77 additions & 0 deletions zebra-rpc/src/indexer/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! A tonic RPC server for Zebra's indexer API.

use std::net::SocketAddr;

use tokio::task::JoinHandle;
use tonic::transport::{server::TcpIncoming, Server};
use tower::BoxError;
use zebra_chain::chain_tip::ChainTip;

use super::indexer_server::IndexerServer;

type ServerTask = JoinHandle<Result<(), BoxError>>;

/// Indexer RPC service.
pub struct IndexerRPC<ReadStateService, Tip>
where
ReadStateService: tower::Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
_read_state: ReadStateService,
pub(super) chain_tip_change: Tip,
}

/// Initializes the indexer RPC server
pub async fn init<ReadStateService, Tip>(
listen_addr: SocketAddr,
_read_state: ReadStateService,
chain_tip_change: Tip,
) -> Result<(ServerTask, SocketAddr), BoxError>
where
ReadStateService: tower::Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
let indexer_service = IndexerRPC {
_read_state,
chain_tip_change,
};

let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(crate::indexer::FILE_DESCRIPTOR_SET)
.build()
.unwrap();

tracing::info!("Trying to open indexer RPC endpoint at {}...", listen_addr,);

let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
let listen_addr = tcp_listener.local_addr()?;
let incoming = TcpIncoming::from_listener(tcp_listener, true, None)?;

let server_task: JoinHandle<Result<(), BoxError>> = tokio::spawn(async move {
Server::builder()
.add_service(reflection_service)
.add_service(IndexerServer::new(indexer_service))
.serve_with_incoming(incoming)
.await?;

Ok(())
});

Ok((server_task, listen_addr))
}
1 change: 1 addition & 0 deletions zebra-rpc/src/indexer/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod vectors;
Loading
Loading