Skip to content

Commit

Permalink
Merge pull request #16 from hearth-rs/ipc
Browse files Browse the repository at this point in the history
Set up the `hearth-ipc` crate
  • Loading branch information
marceline-cramer committed Jan 30, 2023
2 parents 17213fd + 8919e73 commit 53d0c87
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 2 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -5,6 +5,7 @@ members = [
"crates/hearth-cognito",
"crates/hearth-core",
"crates/hearth-guest",
"crates/hearth-ipc",
"crates/hearth-network",
"crates/hearth-rpc",
"crates/hearth-server",
Expand All @@ -13,10 +14,12 @@ members = [
]

[workspace.dependencies]
hearth-ipc = { path = "crates/hearth-ipc" }
hearth-network = { path = "crates/hearth-network" }
hearth-rpc = { path = "crates/hearth-rpc" }
hearth-types = { path = "crates/hearth-types" }
hearth-wasm = { path = "crates/hearth-wasm" }
tracing = "0.1.37"

[workspace.dependencies.remoc]
version = "0.10"
Expand Down
3 changes: 2 additions & 1 deletion crates/hearth-client/Cargo.toml
Expand Up @@ -5,9 +5,10 @@ edition = "2021"

[dependencies]
clap = { version= "4", features = ["derive"] }
hearth-ipc = { workspace = true }
hearth-network = { workspace = true }
hearth-rpc = { workspace = true }
remoc = { workspace = true, features = ["full"] }
tokio = { version = "1.24", features = ["full"] }
tracing = "0.1.37"
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.16", features = ["fmt"] }
16 changes: 16 additions & 0 deletions crates/hearth-client/src/main.rs
Expand Up @@ -96,6 +96,22 @@ async fn main() {

info!("Successfully connected!");

debug!("Initializing IPC");
let daemon_listener = match hearth_ipc::Listener::new().await {
Ok(l) => l,
Err(err) => {
tracing::error!("IPC listener setup error: {:?}", err);
return;
}
};

let daemon_offer = DaemonOffer {
peer_provider: offer.peer_provider.clone(),
peer_id: offer.new_id,
};

hearth_ipc::listen(daemon_listener, daemon_offer);

debug!("Waiting to join connection thread");
join_connection.await.unwrap().unwrap();
}
Expand Down
9 changes: 9 additions & 0 deletions crates/hearth-ipc/Cargo.toml
@@ -0,0 +1,9 @@
[package]
name = "hearth-ipc"
version = "0.1.0"
edition = "2021"

[dependencies]
hearth-rpc = { workspace = true }
tokio = { version = "1.24", features = ["net"] }
tracing = { workspace = true }
148 changes: 148 additions & 0 deletions crates/hearth-ipc/src/lib.rs
@@ -0,0 +1,148 @@
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;

use hearth_rpc::DaemonOffer;
use tokio::net::{UnixListener, UnixStream};

/// Returns the path of the Hearth IPC socket.
///
/// If the HEARTH_SOCK environment variable is set, then that is used for the
/// path. Otherwise, "$XDG_RUNTIME_DIR/hearth.sock" is used. If XDG_RUNTIME_DIR
/// is not set, then this function returns `None`.
pub fn get_socket_path() -> Option<PathBuf> {
if let Ok(path) = std::env::var("HEARTH_SOCK") {
match path.clone().try_into() {
Ok(path) => return Some(path),
Err(err) => {
tracing::error!("Failed to cast HEARTH_SOCK ({}) to path: {:?}", path, err);
}
}
}

if let Ok(path) = std::env::var("XDG_RUNTIME_DIR") {
match TryInto::<PathBuf>::try_into(path.clone()) {
Ok(path) => {
let path = path.join("hearth.sock");
return Some(path);
}
Err(err) => {
tracing::error!(
"Failed to cast XDG_RUNTIME_DIR ({}) to path: {:?}",
path,
err
);
}
}
}

None
}

pub struct Listener {
pub uds: UnixListener,
pub path: PathBuf,
}

impl Drop for Listener {
fn drop(&mut self) {
match std::fs::remove_file(&self.path) {
Ok(_) => {}
Err(e) => tracing::error!("Could not delete UnixListener {:?}", e),
}
}
}

impl Deref for Listener {
type Target = UnixListener;

fn deref(&self) -> &UnixListener {
&self.uds
}
}

impl DerefMut for Listener {
fn deref_mut(&mut self) -> &mut UnixListener {
&mut self.uds
}
}

impl Listener {
pub async fn new() -> std::io::Result<Self> {
use std::io::{Error, ErrorKind};

let sock_path = match get_socket_path() {
Some(p) => p,
None => {
let kind = ErrorKind::NotFound;
let msg = "Failed to find a socket path";
tracing::error!(msg);
return Err(Error::new(kind, msg));
}
};

match UnixStream::connect(&sock_path).await {
Ok(_) => {
tracing::warn!(
"Socket is already in use. Another instance of Hearth may be running."
);
let kind = ErrorKind::AddrInUse;
let error = Error::new(kind, "Socket is already in use.");
return Err(error);
}
Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
tracing::warn!("Found leftover socket; removing.");
std::fs::remove_file(&sock_path)?;
}
Err(ref err) if err.kind() == ErrorKind::NotFound => {}
Err(err) => return Err(err),
}

tracing::info!("Making socket at: {:?}", sock_path);
let uds = UnixListener::bind(&sock_path)?;
let path = sock_path.to_path_buf();
Ok(Self { uds, path })
}
}

/// Spawns a Tokio thread to respond to connections on the domain socket.
pub fn listen(listener: Listener, offer: DaemonOffer) {
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((socket, addr)) => {
tracing::debug!("Accepting IPC connection from {:?}", addr);
let offer = offer.clone();
tokio::spawn(async move {
on_accept(socket, offer).await;
});
}
Err(err) => {
tracing::error!("IPC listen error: {:?}", err);
}
}
}
});
}

async fn on_accept(socket: UnixStream, offer: DaemonOffer) {
let (sock_rx, sock_tx) = tokio::io::split(socket);

use hearth_rpc::remoc::{
rch::base::{Receiver, Sender},
Cfg, Connect,
};

let cfg = Cfg::default();
let (conn, mut tx, _rx): (_, Sender<DaemonOffer>, Receiver<()>) =
match Connect::io(cfg, sock_rx, sock_tx).await {
Ok(v) => v,
Err(err) => {
tracing::error!("Remoc connection failure: {:?}", err);
return;
}
};

tokio::spawn(conn);

tx.send(offer).await.unwrap();
}
10 changes: 10 additions & 0 deletions crates/hearth-rpc/src/lib.rs
Expand Up @@ -37,6 +37,16 @@ pub struct ClientOffer {
pub peer_api: PeerApiClient,
}

/// The data sent from an IPC daemon to a client on connection.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct DaemonOffer {
/// A [PeerProvider] to all peers on the daemon's network.
pub peer_provider: PeerProviderClient,

/// The ID of this daemon's peer.
pub peer_id: PeerId,
}

/// Top-level interface for a peer. Provides access to its metadata as well as
/// its lower-level interfaces.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/hearth-server/Cargo.toml
Expand Up @@ -9,5 +9,5 @@ hearth-network = { workspace = true }
hearth-rpc = { workspace = true }
remoc = { workspace = true, features = ["full"] }
tokio = { version = "1.24", features = ["full"] }
tracing = "0.1.37"
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.16", features = ["fmt"] }

0 comments on commit 53d0c87

Please sign in to comment.