Skip to content

Commit

Permalink
feat: integrate ipc service
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Sep 8, 2022
1 parent c56a515 commit 2ac8722
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 19 deletions.
6 changes: 3 additions & 3 deletions anvil/server/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub struct IpcEndpoint<Handler> {

impl<Handler: PubSubRpcHandler> IpcEndpoint<Handler> {
/// Creates a new endpoint with the given handler
pub fn new(handler: Handler, endpoint: Endpoint) -> Self {
Self { handler, endpoint }
pub fn new(handler: Handler, endpoint: impl Into<String>) -> Self {
Self { handler, endpoint: Endpoint::new(endpoint.into()) }
}

/// Start listening for incoming connections
Expand Down Expand Up @@ -143,7 +143,7 @@ impl tokio_util::codec::Decoder for JsonRpcCodec {
return match String::from_utf8(bts.as_ref().to_vec()) {
Ok(val) => Ok(Some(val)),
Err(_) => Ok(None),
};
}
}
}
Ok(None)
Expand Down
9 changes: 9 additions & 0 deletions anvil/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ pub struct NodeArgs {
value_parser = Genesis::parse
)]
pub init: Option<Genesis>,

#[clap(
long,
help = "Whether to launch an ipc server.",
value_name = "PATH",
visible_alias = "ipcpath"
)]
pub ipc: Option<Option<String>>,
}

impl NodeArgs {
Expand Down Expand Up @@ -160,6 +168,7 @@ impl NodeArgs {
.with_chain_id(self.evm_opts.chain_id)
.with_transaction_order(self.order)
.with_genesis(self.init)
.with_ipc(self.ipc)
}

fn account_generator(&self) -> AccountGenerator {
Expand Down
32 changes: 32 additions & 0 deletions anvil/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ pub struct NodeConfig {
pub fork_retry_backoff: Duration,
/// available CUPS
pub compute_units_per_second: u64,
/// The ipc path
pub ipc_path: Option<Option<String>>,
}

impl NodeConfig {
Expand Down Expand Up @@ -339,6 +341,7 @@ impl Default for NodeConfig {
fork_retry_backoff: Duration::from_millis(1_000),
// alchemy max cpus <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
compute_units_per_second: ALCHEMY_FREE_TIER_CUPS,
ipc_path: None,
}
}
}
Expand Down Expand Up @@ -503,6 +506,18 @@ impl NodeConfig {
self
}

/// Sets the ipc path to use
///
/// Note: this is a double Option for
/// - `None` -> no opc
/// - `Some(None)` -> use default path
/// - `Some(Some(path))` -> use custom path
#[must_use]
pub fn with_ipc(mut self, ipc_path: Option<Option<String>>) -> Self {
self.ipc_path = ipc_path;
self
}

/// Sets the file path to write the Anvil node's config info to.
#[must_use]
pub fn set_config_out(mut self, config_out: Option<String>) -> Self {
Expand Down Expand Up @@ -600,6 +615,23 @@ impl NodeConfig {
self
}

/// Returns the ipc path for the ipc endpoint if any
pub fn get_ipc_path(&self) -> Option<String> {
match self.ipc_path.as_ref() {
Some(path) => path.clone().or_else(|| {
#[cfg(windows)]
{
Some(r"\\.\pipe\anvil-ipc".to_string())
}
#[cfg(not(windows))]
{
Some("/tmp/anvil-ipc".to_string())
}
}),
None => None,
}
}

/// Prints the config info
pub fn print(&self, fork: Option<&ClientFork>) {
if self.config_out.is_some() {
Expand Down
40 changes: 33 additions & 7 deletions anvil/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,32 @@ use ethers::{
types::{Address, U256},
};
use foundry_evm::revm;
use futures::FutureExt;
use futures::{FutureExt, TryFutureExt};
use parking_lot::Mutex;
use std::{
future::Future,
io,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::{runtime::Handle, task::JoinError};
use tokio::{
runtime::Handle,
task::{JoinError, JoinHandle},
};

/// contains the background service that drives the node
mod service;

mod config;
pub use config::{AccountGenerator, NodeConfig, CHAIN_ID, VERSION_MESSAGE};
mod hardfork;
use crate::server::{
error::{NodeError, NodeResult},
spawn_ipc,
};
pub use hardfork::Hardfork;

/// ethereum related implementations
Expand Down Expand Up @@ -160,7 +169,7 @@ pub async fn spawn(mut config: NodeConfig) -> (EthApi, NodeHandle) {
addr = server.local_addr();

// spawn the server on a new task
let serve = tokio::task::spawn(server);
let serve = tokio::task::spawn(server.map_err(NodeError::from));

// select over both tasks
let inner = futures::future::select(node_service, serve);
Expand All @@ -169,12 +178,15 @@ pub async fn spawn(mut config: NodeConfig) -> (EthApi, NodeHandle) {
let (signal, on_shutdown) = shutdown::signal();
let task_manager = TaskManager::new(tokio_handle, on_shutdown);

let ipc_task = config.get_ipc_path().map(|path| spawn_ipc(api.clone(), path));

let handle = NodeHandle {
config,
inner: Box::pin(async move {
// wait for the first task to finish
inner.await.into_inner().0
}),
ipc_task,
address: addr,
_signal: Some(signal),
task_manager,
Expand All @@ -185,17 +197,21 @@ pub async fn spawn(mut config: NodeConfig) -> (EthApi, NodeHandle) {
(api, handle)
}

type NodeFuture = Pin<Box<dyn Future<Output = Result<hyper::Result<()>, JoinError>>>>;
type NodeFuture = Pin<Box<dyn Future<Output = Result<NodeResult<()>, JoinError>>>>;

type IpcTask = JoinHandle<io::Result<()>>;

/// A handle to the spawned node and server tasks
///
/// This future will resolve if either the node or server task resolve/fail.
pub struct NodeHandle {
config: NodeConfig,
/// the address of the running rpc server
/// The address of the running rpc server
address: SocketAddr,
/// the future that drives the rpc service and the node service
/// The future that joins the rpc service and the node service
inner: NodeFuture,
// The future that joins the ipc server, if any
ipc_task: Option<IpcTask>,
/// A signal that fires the shutdown, fired on drop.
_signal: Option<Signal>,
/// A task manager that can be used to spawn additional tasks
Expand Down Expand Up @@ -234,6 +250,11 @@ impl NodeHandle {
format!("ws://{}", self.socket_address())
}

/// Returns the path of the launched ipc server, if any
pub fn ipc_path(&self) -> Option<String> {
self.config.get_ipc_path()
}

/// Returns a Provider for the http endpoint
pub fn http_provider(&self) -> Provider<Http> {
Provider::<Http>::try_from(self.http_endpoint())
Expand Down Expand Up @@ -306,10 +327,15 @@ impl NodeHandle {
}

impl Future for NodeHandle {
type Output = Result<hyper::Result<()>, JoinError>;
type Output = Result<NodeResult<()>, JoinError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();
if let Some(mut ipc) = pin.ipc_task.take() {
if let Poll::Ready(res) = ipc.poll_unpin(cx) {
return Poll::Ready(res.map(|res| res.map_err(NodeError::from)))
}
}
pin.inner.poll_unpin(cx)
}
}
Expand Down
11 changes: 11 additions & 0 deletions anvil/src/server/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/// Result alias
pub type NodeResult<T> = Result<T, NodeError>;

/// An error that can occur when launching a anvil instance
#[derive(Debug, thiserror::Error)]
pub enum NodeError {
#[error(transparent)]
Hyper(#[from] hyper::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
}
8 changes: 4 additions & 4 deletions anvil/src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ impl RpcHandler for HttpEthRpcHandler {
}
}

/// A `RpcHandler` that expects `EthRequest` rpc calls and `EthPubSub` via websocket
/// A `RpcHandler` that expects `EthRequest` rpc calls and `EthPubSub` via pubsub connection
#[derive(Clone)]
pub struct WsEthRpcHandler {
pub struct PubSubEthRpcHandler {
/// Access to the node
api: EthApi,
}

impl WsEthRpcHandler {
impl PubSubEthRpcHandler {
/// Creates a new instance of the handler using the given `EthApi`
pub fn new(api: EthApi) -> Self {
Self { api }
Expand Down Expand Up @@ -105,7 +105,7 @@ impl WsEthRpcHandler {
}

#[async_trait::async_trait]
impl PubSubRpcHandler for WsEthRpcHandler {
impl PubSubRpcHandler for PubSubEthRpcHandler {
type Request = EthRpcCall;
type SubscriptionId = SubscriptionId;
type Subscription = EthSubscription;
Expand Down
21 changes: 18 additions & 3 deletions anvil/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
//! Contains the code to launch an ethereum RPC-Server
use crate::EthApi;
use anvil_server::{AnvilServer, ServerConfig};
use handler::{HttpEthRpcHandler, WsEthRpcHandler};
use anvil_server::{ipc::IpcEndpoint, AnvilServer, ServerConfig};
use handler::{HttpEthRpcHandler, PubSubEthRpcHandler};
use std::net::SocketAddr;
use tokio::{io, task::JoinHandle};

mod handler;

pub mod error;

/// Configures an [axum::Server] that handles [EthApi] related JSON-RPC calls via HTTP and WS
pub fn serve(addr: SocketAddr, api: EthApi, config: ServerConfig) -> AnvilServer {
let http = HttpEthRpcHandler::new(api.clone());
let ws = WsEthRpcHandler::new(api);
let ws = PubSubEthRpcHandler::new(api);
anvil_server::serve_http_ws(addr, config, http, ws)
}

/// Launches an ipc server at the given path in a new task
pub fn spawn_ipc(api: EthApi, path: impl Into<String>) -> JoinHandle<io::Result<()>> {
let path = path.into();
tokio::task::spawn(async move {
let handler = PubSubEthRpcHandler::new(api);
let ipc = IpcEndpoint::new(handler, path);
ipc.listen().await;

Ok(())
})
}
4 changes: 2 additions & 2 deletions anvil/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
},
filter::Filters,
mem::{storage::MinedBlockOutcome, Backend},
NodeResult,
};
use futures::{FutureExt, Stream, StreamExt};
use std::{
Expand Down Expand Up @@ -63,8 +64,7 @@ impl NodeService {
}

impl Future for NodeService {
// Note: this is out of convenience as this gets joined with the server
type Output = hyper::Result<()>;
type Output = NodeResult<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();
Expand Down

0 comments on commit 2ac8722

Please sign in to comment.