Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(windows): add subzone worker subprocess crate from abandoned IPC branch #3414

Closed
wants to merge 93 commits into from
Closed
Show file tree
Hide file tree
Changes from 92 commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
d659577
wip(windows): IPC mock
ReactorScram Jan 18, 2024
8ff6a68
wip(windows): IPC mock
ReactorScram Jan 18, 2024
390c38c
Merge remote-tracking branch 'origin/main' into reactorscram/ipc-test
ReactorScram Jan 18, 2024
30950fc
wip(windows): try out ipc-channel crate
ReactorScram Jan 18, 2024
fce1c57
wip(windows): add timeouts and stuff
ReactorScram Jan 18, 2024
88ed540
wip(windows): replace ipc-channel crate with Tokio's named_pipe module
ReactorScram Jan 18, 2024
a30f11f
wip(windows): upgrade from u8 to full enum messages
ReactorScram Jan 18, 2024
5c08b7a
wip(windows): add a subcommand for tests that don't fit into 'cargo t…
ReactorScram Jan 18, 2024
27c05bc
fix(windows): typo
ReactorScram Jan 18, 2024
ee5e424
Merge remote-tracking branch 'origin/main' into reactorscram/ipc-test
ReactorScram Jan 18, 2024
91e5a0e
Merge remote-tracking branch 'origin/main' into reactorscram/ipc-test
ReactorScram Jan 18, 2024
7eac132
refactor(windows): move CLI things inside client.rs
ReactorScram Jan 18, 2024
1fc8656
refactor(windows): remove unused subcommands and redundant arguments,…
ReactorScram Jan 18, 2024
ba432bf
refactor(windows): move debug subcommands closer to their code and fu…
ReactorScram Jan 18, 2024
fe8d358
Merge branch 'reactorscram/refactor-debug-subcommands' into reactorsc…
ReactorScram Jan 18, 2024
5f21c2c
wip(windows): multi-process test for IPC
ReactorScram Jan 18, 2024
3f6a295
wip(windows): make Windows kill child processes when the parent exits
ReactorScram Jan 19, 2024
831ec3f
Merge remote-tracking branch 'origin/main' into reactorscram/refactor…
ReactorScram Jan 19, 2024
37d888c
Merge branch 'reactorscram/refactor-subcommands' into reactorscram/re…
ReactorScram Jan 19, 2024
06dff39
Merge branch 'reactorscram/refactor-debug-subcommands' into reactorsc…
ReactorScram Jan 19, 2024
1145279
Merge branch 'reactorscram/ipc-test' into reactorscram/ipc-crash-test
ReactorScram Jan 19, 2024
47288d3
Merge branch 'reactorscram/ipc-crash-test' into reactorscram/process-…
ReactorScram Jan 19, 2024
1fae3f6
Merge remote-tracking branch 'origin/main' into reactorscram/refactor…
ReactorScram Jan 19, 2024
5de44d1
Merge branch 'main' into reactorscram/refactor-debug-subcommands
ReactorScram Jan 19, 2024
e4692ac
Merge branch 'reactorscram/refactor-debug-subcommands' into reactorsc…
ReactorScram Jan 19, 2024
4afd0e2
Merge branch 'reactorscram/refactor-debug-subcommands' into reactorsc…
ReactorScram Jan 19, 2024
dd8c1c9
Merge branch 'reactorscram/ipc-test' into reactorscram/ipc-crash-test
ReactorScram Jan 19, 2024
fe64c47
Merge branch 'reactorscram/ipc-crash-test' into reactorscram/process-…
ReactorScram Jan 19, 2024
a1a31a4
refactor(windows): move the IPC test code into the IPC module, seems …
ReactorScram Jan 19, 2024
b260a40
ci(windows): trigger the multi-process IPC test during CI runs
ReactorScram Jan 19, 2024
254d0f5
Merge branch 'reactorscram/ipc-crash-test' into reactorscram/process-…
ReactorScram Jan 19, 2024
c904901
fix(windows): fix some bugs and incorrect names in IPC tests
ReactorScram Jan 19, 2024
7b7bb1e
windows: improve the IPC tests
ReactorScram Jan 19, 2024
40aa9c0
Merge branch 'main' into reactorscram/refactor-debug-subcommands
ReactorScram Jan 19, 2024
ff0e2e5
Merge branch 'reactorscram/refactor-debug-subcommands' into reactorsc…
ReactorScram Jan 19, 2024
6666102
Merge branch 'reactorscram/ipc-test' into reactorscram/ipc-crash-test
ReactorScram Jan 19, 2024
b4e589b
Merge branch 'reactorscram/ipc-crash-test' into reactorscram/process-…
ReactorScram Jan 19, 2024
cd3d8cf
wip(windows): check out a security issue
ReactorScram Jan 19, 2024
7905eaf
Merge branch 'reactorscram/process-leak' into reactorscram/ipc-security
ReactorScram Jan 19, 2024
1ddcdb2
Merge remote-tracking branch 'origin/main' into reactorscram/ipc-cras…
ReactorScram Jan 22, 2024
6239043
Merge branch 'reactorscram/ipc-crash-test' into reactorscram/process-…
ReactorScram Jan 22, 2024
600a33f
Merge branch 'reactorscram/process-leak' into reactorscram/ipc-security
ReactorScram Jan 22, 2024
3778006
test(windows): testing security plan for IPC
ReactorScram Jan 22, 2024
ac3a45a
Merge remote-tracking branch 'origin/main' into reactorscram/ipc-secu…
ReactorScram Jan 22, 2024
e46a70f
Merge branch 'main' into reactorscram/ipc-crash-test
ReactorScram Jan 22, 2024
ff6f82d
Merge branch 'reactorscram/ipc-crash-test' into reactorscram/process-…
ReactorScram Jan 22, 2024
cbc6d14
Merge branch 'reactorscram/process-leak' into reactorscram/ipc-security
ReactorScram Jan 22, 2024
652d7d9
Merge branch 'main' into reactorscram/ipc-crash-test
ReactorScram Jan 22, 2024
8da53f2
Merge branch 'reactorscram/ipc-crash-test' into reactorscram/process-…
ReactorScram Jan 22, 2024
b49606d
Merge branch 'reactorscram/process-leak' into reactorscram/ipc-security
ReactorScram Jan 22, 2024
d5c0fa7
refactor(windows): clean up the IPC module's API
ReactorScram Jan 22, 2024
c172bda
refactor: remove AwaitCallback, that's not going to work
ReactorScram Jan 22, 2024
faa40ab
Merge remote-tracking branch 'origin/main' into reactorscram/process-…
ReactorScram Jan 22, 2024
bac2b8d
Merge remote-tracking branch 'origin/reactorscram/process-leak' into …
ReactorScram Jan 22, 2024
8772e36
Merge branch 'reactorscram/ipc-security' into reactorscram/ipc-api
ReactorScram Jan 22, 2024
270ad5e
Merge branch 'main' into reactorscram/ipc-security
ReactorScram Jan 22, 2024
83a4b6f
refactor(windows): remove request-response because that won't work
ReactorScram Jan 22, 2024
d79fcb6
ci(windows): fix cargo clippy error
ReactorScram Jan 22, 2024
c19ba12
Merge branch 'reactorscram/ipc-security' into reactorscram/ipc-api
ReactorScram Jan 22, 2024
985230e
refactor(windows IPC): rename Request and Response since those will n…
ReactorScram Jan 22, 2024
b1eb536
wip(windows): start making a connlib subprocess
ReactorScram Jan 23, 2024
8ca917f
refactor(windows IPC): rename Request and Response (#3348)
ReactorScram Jan 23, 2024
99450a6
Merge branch 'main' into reactorscram/ipc-security
ReactorScram Jan 23, 2024
66e5666
Merge branch 'main' into reactorscram/ipc-security
ReactorScram Jan 23, 2024
1071497
Merge branch 'main' into reactorscram/ipc-security
ReactorScram Jan 23, 2024
e23544d
Merge branch 'reactorscram/ipc-security' into reactorscram/ipc-api
ReactorScram Jan 23, 2024
8947417
Merge remote-tracking branch 'origin/reactorscram/ipc-api' into react…
ReactorScram Jan 23, 2024
cb371a8
wip(windows): named pipes bug
ReactorScram Jan 23, 2024
15dbd27
wip(windows): it's getting closer
ReactorScram Jan 24, 2024
7bfddb2
Merge remote-tracking branch 'origin/main' into reactorscram/ipc-api
ReactorScram Jan 24, 2024
1aeea0b
Merge branch 'main' into reactorscram/ipc-api
ReactorScram Jan 24, 2024
a9e97a3
Merge branch 'reactorscram/ipc-api' into reactorscram/ipc-connlib
ReactorScram Jan 24, 2024
199273c
Merge branch 'main' into reactorscram/ipc-api
ReactorScram Jan 24, 2024
898920e
Merge branch 'reactorscram/ipc-api' into reactorscram/ipc-connlib
ReactorScram Jan 24, 2024
1feca4e
Merge remote-tracking branch 'origin/reactorscram/ipc-connlib' into r…
ReactorScram Jan 24, 2024
b4e6fba
wip(windows IPC): fix numerous bugs in IPC so that the connlib subpro…
ReactorScram Jan 24, 2024
262d654
test(windows): make sure processes really have unique IDs
ReactorScram Jan 24, 2024
7531077
wip(windows): this one works
ReactorScram Jan 25, 2024
3c6a77b
Merge remote-tracking branch 'origin/main' into reactorscram/ipc-connlib
ReactorScram Jan 25, 2024
aacc07a
fix(windows): fix graceful shutdown and other IPC bugs
ReactorScram Jan 25, 2024
cec0d5c
fix(windows): use tokio's process module to remove some sleeps
ReactorScram Jan 25, 2024
a0c990f
Merge remote-tracking branch 'origin/main' into reactorscram/ipc-connlib
ReactorScram Jan 25, 2024
a3a8a32
refactor(windows): apply feedback from Thomas
ReactorScram Jan 25, 2024
1bdd5a9
docs(windows): comments
ReactorScram Jan 25, 2024
c83865d
refactor(windows): split up IPC into sub-modules
ReactorScram Jan 25, 2024
4ce083b
refactor(windows): use 'tokio::io::split' to reduce 'client.rs' lines…
ReactorScram Jan 25, 2024
de87c11
refactor(windows): use 'tokio::io::split' for the server too, also re…
ReactorScram Jan 25, 2024
f49af01
fix(windows): separate log files between the two processes
ReactorScram Jan 26, 2024
8f87069
fix(windows): fix log export and count and add flag to crash connlib …
ReactorScram Jan 26, 2024
fa6e319
refactor(windows): split out the IPC / subprocess stuff into a crate …
ReactorScram Jan 26, 2024
7811e3b
Merge remote-tracking branch 'origin/main' into reactorscram/split-su…
ReactorScram Jan 26, 2024
39a2af2
feat(windows): add subzone worker process crate from abandoned IPC br…
ReactorScram Jan 26, 2024
634a543
Merge branch 'main' into reactorscram/split-subzone-crate
ReactorScram Jan 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions rust/windows-client/subzone/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "subzone"
version = "0.1.0"
keywords = ["command", "process", "subprocess", "worker"]
description = "Worker subprocesses with async IPC for Windows"
edition = "2021"

[dependencies]
anyhow = { version = "1.0" }
clap = { version = "4.4", features = ["derive", "env"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = { version = "1.0", default-features = false }
tokio = { version = "1.33.0", features = ["io-util", "net", "process", "rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
uuid = { version = "1.7.0", features = ["v4"] }

[target.'cfg(windows)'.dependencies.windows]
version = "0.52.0"
features = [
# Needed for `CreateJobObjectA`
"Win32_Foundation",
# Needed for `CreateJobObjectA`
"Win32_Security",
# Needed for Windows to automatically kill child processes if the main process crashes
"Win32_System_JobObjects",
# Needed to check process ID of named pipe clients
"Win32_System_Pipes",
"Win32_System_Threading",
]
5 changes: 5 additions & 0 deletions rust/windows-client/subzone/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Testing

```bash
cargo test && cargo run && echo good
```
89 changes: 89 additions & 0 deletions rust/windows-client/subzone/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use anyhow::Result;
use serde::{de::DeserializeOwned, Serialize};
use std::marker::PhantomData;
use tokio::{
io::AsyncWriteExt,
net::windows::named_pipe::{self, NamedPipeClient},
sync::mpsc,
};

use crate::{read_deserialize, write_serialize, Error, ManagerMsgInternal, WorkerMsgInternal};

/// A client that's connected to a server
///
/// Manual testing shows that if the corresponding Server's process crashes, Windows will
/// be nice and return errors for anything trying to read from the Client
pub struct Client<M, W> {
pipe_writer: tokio::io::WriteHalf<NamedPipeClient>,
/// Needed to make `next` cancel-safe
read_rx: mpsc::Receiver<Vec<u8>>,
/// Needed to make `next` cancel-safe
reader_task: tokio::task::JoinHandle<Result<()>>,
_manager_msg: PhantomData<M>,
_worker_msg: PhantomData<W>,
}

impl<M: DeserializeOwned, W: Serialize> Client<M, W> {
/// Creates a `Client` and echoes the security cookie back to the `Server`
///
/// Doesn't block, fails instantly if the server isn't up.
pub async fn new(server_id: &str) -> Result<Self> {
let mut client = Client::new_unsecured(server_id)?;
let mut cookie = String::new();
std::io::stdin().read_line(&mut cookie)?;
let cookie = WorkerMsgInternal::Cookie(cookie.trim().to_string());
client.send_internal(&cookie).await?;
Ok(client)
}

/// Creates a `Client`. Requires a Tokio context
///
/// Doesn't block, will fail instantly if the server isn't ready
#[tracing::instrument(skip_all)]
pub(crate) fn new_unsecured(server_id: &str) -> Result<Self> {
let pipe = named_pipe::ClientOptions::new().open(server_id)?;
let (mut pipe_reader, pipe_writer) = tokio::io::split(pipe);
let (read_tx, read_rx) = mpsc::channel(1);
let reader_task = tokio::spawn(async move {
loop {
let msg = read_deserialize(&mut pipe_reader).await?;
read_tx.send(msg).await?;
}
});

Ok(Self {
pipe_writer,
read_rx,
reader_task,
_manager_msg: Default::default(),
_worker_msg: Default::default(),
})
}

pub async fn close(mut self) -> Result<()> {
self.pipe_writer.shutdown().await?;
self.reader_task.abort();
tracing::debug!("Client closing gracefully");
Ok(())
}

/// Receives a message from the server
///
/// # Cancel safety
///
/// This method is cancel-safe, internally it calls `tokio::sync::mpsc::Receiver::recv`
pub async fn next(&mut self) -> Result<ManagerMsgInternal<M>, Error> {
let buf = self.read_rx.recv().await.ok_or_else(|| Error::Eof)?;
let buf = std::str::from_utf8(&buf)?;
let msg = serde_json::from_str(buf)?;
Ok(msg)
}

pub async fn send(&mut self, msg: W) -> Result<(), Error> {
self.send_internal(&WorkerMsgInternal::User(msg)).await
}

async fn send_internal(&mut self, msg: &WorkerMsgInternal<W>) -> Result<(), Error> {
write_serialize(&mut self.pipe_writer, msg).await
}
}
247 changes: 247 additions & 0 deletions rust/windows-client/subzone/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
//! Worker subprocesses with async IPC for Windows
//!
//! To run the unit tests and multi-process tests, use
//! ```bash
//! cargo test && cargo run && echo good
//! ```
//!
//! # Security
//!
//! The IPC module uses Windows' named pipes primitive.
//!
//! These seem *relatively* secure. Chromium also uses them.
//! Privileged applications with admin powers or kernel
//! modules, like Wireshark, can snoop on named pipes, because they're running as root.
//!
//! Non-privileged processes can enumerate the names of named pipes. To prevent
//! a process that isn't our child from connecting to our named pipe, I check the
//! process ID before communicating, and then require the first message to be a cookie
//! echoed to the child's stdin and back through the pipe, similar to a CSRF token.
//!
//! Also by default, non-elevated processes cannot connect to named pipe servers
//! inside elevated processes.
//!
//! # Design
//!
//! subzone has these features:
//!
//! - Kill unresponsive worker if needed
//! - Graceful shutdown
//! - Automatically kill workers even if the manager process crashes
//! - Bails out if some other process tries to intercept IPC between the two processes

use anyhow::Result;
use clap::Parser;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, marker::Unpin};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

mod client;
mod server;
// Always enabled, since the integration tests can't run in `cargo test` yet
pub(crate) mod multi_process_tests;

pub use client::Client;
pub use server::{LeakGuard, Server, SubcommandChild, SubcommandExit, Subprocess};

#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Used to detected graceful named pipe closes
#[error("EOF")]
Eof,
/// Any IO error except EOF
#[error(transparent)]
Io(std::io::Error),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error("Something went wrong while converting message length to u32 or usize")]
MessageLength,
#[error("Protocol error, got Cookie or Shutdown at an incorrect time")]
Protocol,
#[error(transparent)]
Utf8(#[from] std::str::Utf8Error),
}

#[derive(Deserialize, Serialize)]
pub enum ManagerMsgInternal<T> {
Shutdown,
User(T),
}

#[derive(Deserialize, Serialize)]
pub enum WorkerMsgInternal<T> {
Cookie(String),
User(T),
}

impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
Self::Eof
} else {
Self::Io(e)
}
}
}

#[derive(Parser)]
struct Cli {
#[command(subcommand)]
cmd: Option<multi_process_tests::Subcommand>,
}

/// Don't use. This is just for internal tests that are difficult to do with `cargo test`
pub fn run_multi_process_tests() -> Result<()> {
let cli = Cli::parse();
multi_process_tests::run(cli.cmd)
}

/// Returns a random valid named pipe ID based on a UUIDv4
///
/// e.g. "\\.\pipe\dev.firezone.client\9508e87c-1c92-4630-bb20-839325d169bd"
///
/// Normally you don't need to call this directly. Tests may need it to inject
/// a known pipe ID into a process controlled by the test.
pub(crate) fn random_pipe_id() -> String {
named_pipe_path(&uuid::Uuid::new_v4().to_string())
}

/// Returns a valid named pipe ID
///
/// e.g. "\\.\pipe\dev.firezone.client\{path}"
pub(crate) fn named_pipe_path(path: &str) -> String {
format!(r"\\.\pipe\subzone\{path}")
}

/// Reads a message from an async reader, with a 32-bit little-endian length prefix
async fn read_deserialize<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Vec<u8>, Error> {
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf).await?;
let len = u32::from_le_bytes(len_buf);
tracing::trace!(?len, "reading message");
let len = usize::try_from(len).map_err(|_| Error::MessageLength)?;
let mut buf = vec![0u8; len];
reader.read_exact(&mut buf).await?;
Ok(buf)
}

/// Writes a message to an async writer, with a 32-bit little-endian length prefix
async fn write_serialize<W: AsyncWrite + Unpin, T: Serialize>(
writer: &mut W,
msg: &T,
) -> Result<(), Error> {
// Using JSON because `bincode` couldn't decode `ResourceDescription`
let buf = serde_json::to_string(msg)?;
let len = u32::try_from(buf.len())
.map_err(|_| Error::MessageLength)?
.to_le_bytes();
tracing::trace!(len = buf.len(), "writing message");
writer.write_all(&len).await?;
writer.write_all(buf.as_bytes()).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::server::UnconnectedServer;
use super::*;
use crate::multi_process_tests::{Callback, ManagerMsg, WorkerMsg};
use anyhow::Context;
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;

/// Because it turns out `bincode` can't deserialize `ResourceDescription` or something.
#[test]
fn round_trip_serde() -> Result<()> {
let cb: WorkerMsg = WorkerMsg::Callback(Callback::OnUpdateResources(sample_resources()));

let v = serde_json::to_string(&cb)?;
let roundtripped: WorkerMsg = serde_json::from_str(&v)?;

assert_eq!(roundtripped, cb);

Ok(())
}

/// Test just the happy path
/// It's hard to simulate a process crash because:
/// - If I Drop anything, Tokio will clean it up
/// - If I `std::mem::forget` anything, the test process is still running, so Windows will not clean it up
#[test]
#[tracing::instrument(skip_all)]
fn happy_path() -> Result<()> {
tracing_subscriber::fmt::try_init().ok();

let rt = Runtime::new()?;
rt.block_on(async move {
// Pretend we're in the main process
let (server, server_id) = UnconnectedServer::new()?;

let worker_task = tokio::spawn(async move {
// Pretend we're in a worker process
let mut client: Client<ManagerMsg, WorkerMsg> = Client::new_unsecured(&server_id)?;

client
.send(WorkerMsg::Callback(Callback::OnUpdateResources(
sample_resources(),
)))
.await?;

// Handle requests from the main process
loop {
let Ok(ManagerMsgInternal::User(req)) = client.next().await else {
tracing::debug!("shutting down worker_task");
break;
};
tracing::debug!(?req, "worker_task got request");
let resp = WorkerMsg::Response(req.clone());
client.send(resp).await?;
}
client.close().await?;
Ok::<_, anyhow::Error>(())
});

let mut server: Server<ManagerMsg, WorkerMsg> = server.accept().await?;

let start_time = Instant::now();

let cb = server
.next()
.await
.context("should have gotten a OnUpdateResources callback")?;
assert_eq!(
cb,
WorkerMsg::Callback(Callback::OnUpdateResources(sample_resources()))
);

server.send(ManagerMsg::Connect).await?;
assert_eq!(
server.next().await.unwrap(),
WorkerMsg::Response(ManagerMsg::Connect)
);
server.send(ManagerMsg::Connect).await?;
assert_eq!(
server.next().await.unwrap(),
WorkerMsg::Response(ManagerMsg::Connect)
);

let elapsed = start_time.elapsed();
assert!(elapsed < Duration::from_millis(20), "{:?}", elapsed);

server.close().await?;

// Make sure the worker 'process' exited
worker_task.await??;

Ok::<_, anyhow::Error>(())
})?;
Ok(())
}

fn sample_resources() -> Vec<String> {
vec![
"2efe9c25-bd92-49a0-99d7-8b92da014dd5".into(),
"613eaf56-6efa-45e5-88aa-ea4ad64d8c18".into(),
]
}
}
6 changes: 6 additions & 0 deletions rust/windows-client/subzone/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! Test driver for subzone's multi-process test, which are difficult to run
//! inside Cargo's test harness.

fn main() -> anyhow::Result<()> {
subzone::run_multi_process_tests()
}
Loading
Loading