Skip to content

Commit

Permalink
feat: add ipc support
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Sep 8, 2022
1 parent 3251fe1 commit d5aa9ff
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 81 deletions.
133 changes: 110 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion anvil/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@ tracing = "0.1"
# async
parking_lot = "0.12"
futures = "0.3"

# ipc
parity-tokio-ipc = { version = "0.9.0", optional = true }
tokio-serde = { version = "0.8", features = ["json"], optional = true }
tokio = { version = "1.21.0", optional = true }

# misc
serde_json = "1.0.67"
serde = { version = "1.0.136", features = ["derive"] }
async-trait = "0.1.53"
thiserror = "1.0.34"

clap = { version = "3.0.10", features = [
"derive",
"env",
], optional = true }
pin-project = "1.0.12"

[features]
default = ["ipc"]
ipc = ["parity-tokio-ipc"]
ipc = ["parity-tokio-ipc", "tokio-serde", "tokio"]
14 changes: 14 additions & 0 deletions anvil/server/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Error variants used to unify different connection streams

/// An error that can occur when reading an incoming request
#[derive(Debug, thiserror::Error)]
pub enum RequestError {
#[error(transparent)]
Axum(#[from] axum::Error),
#[error(transparent)]
Serde(#[from] serde_json::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("Disconnect")]
Disconnect,
}
43 changes: 41 additions & 2 deletions anvil/server/src/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,53 @@
//! IPC handling

use crate::PubSubRpcHandler;
use anvil_rpc::{request::Request, response::Response};
use futures::stream::StreamExt;
use parity_tokio_ipc::Endpoint;
use tokio_serde::{formats::Json, Framed};
use tracing::{error, trace};
use crate::pubsub::PubSubConnection;

/// An IPC connection for anvil
pub struct IpcEndpoint<Handler: PubSubRpcHandler> {
///
/// A Future that listens for incoming connections and spawns new connections
pub struct IpcEndpoint<Handler> {
/// the handler for the websocket connection
handler: Handler,
/// The endpoint we listen for incoming transactions
endpoint: Endpoint,
// TODO add shutdown
}

async fn on_connection() {}
impl<Handler: PubSubRpcHandler> IpcEndpoint<Handler> {
/// Creates a new endpoint with the given handler
pub fn new(handler: Handler, endpoint: Endpoint) -> Self {
Self { handler, endpoint }
}

/// Start listening for incoming connections
pub async fn start(self) {
let IpcEndpoint { handler, endpoint } = self;
trace!(target: "ipc", endpoint=?endpoint.path(), "starting ipc server" );

let mut connections = match endpoint.incoming() {
Ok(connections) => connections,
Err(err) => {
error!(target: "ipc", ?err, "Failed to create ipc listener");
return
}
};

while let Some(Ok(stream)) = connections.next().await {
trace!(target: "ipc", "successful incoming IPC connection");

let framed: Framed<_, Request, Response, _> =
Framed::new(stream, Json::<Request, Response>::default());

// TOOD need to convert the stream into a Sink+Stream

// PubSubConnection::new(framed, handler.clone()).await

}
}
}
1 change: 1 addition & 0 deletions anvil/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tracing::{error, trace};
mod config;

// #[cfg(feature = "ipc")]
mod error;
/// handlers for axum server
mod handler;
mod ipc;
Expand Down

0 comments on commit d5aa9ff

Please sign in to comment.