Skip to content

Commit

Permalink
Add streaming server
Browse files Browse the repository at this point in the history
Signed-off-by: Sascha Grunert <sgrunert@redhat.com>
  • Loading branch information
saschagrunert committed Mar 20, 2024
1 parent 1e8095d commit a875baa
Show file tree
Hide file tree
Showing 11 changed files with 1,576 additions and 243 deletions.
285 changes: 267 additions & 18 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions conmon-rs/common/proto/conmon.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,22 @@ interface Conmon {
key @0 :Text;
value @1 :Text;
}

###############################################
# ServeExecContainer
struct ServeExecContainerRequest {
metadata @0 :Metadata; # Standard metadata to carry.
id @1 :Text;
cmd @2 :List(Text);
tty @3 :Bool;
stdin @4 :Bool;
stdout @5 :Bool;
stderr @6 :Bool;
}

struct ServeExecContainerResponse {
url @0 :Text;
}

serveExecContainer @8 (request: ServeExecContainerRequest) -> (response: ServeExecContainerResponse);
}
2 changes: 2 additions & 0 deletions conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ path = "src/main.rs"

[dependencies]
anyhow = "1.0.81"
axum = { version = "0.7.4", features = ["ws"]}
capnp = "0.17.2"
capnp-rpc = "0.17.0"
clap = { version = "4.3.8", features = ["color", "cargo", "deprecated", "derive", "deprecated", "env", "string", "unicode", "wrap_help"] }
Expand Down Expand Up @@ -41,6 +42,7 @@ tokio = { version = "1.36.0", features = ["fs", "io-std", "io-util", "macros", "
tokio-eventfd = "0.2.1"
tokio-seqpacket = "0.7.1"
tokio-util = { version = "0.7.10", features = ["compat"] }
tower-http = { version = "0.5.2", features = ["trace"] }
tracing = "0.1.40"
tracing-opentelemetry = "0.23.0"
tracing-subscriber = "0.3.18"
Expand Down
4 changes: 3 additions & 1 deletion conmon-rs/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ macro_rules! prefix {
};
}

#[derive(CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters)]
#[derive(
Clone, CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters,
)]
#[serde(rename_all = "kebab-case")]
#[command(
after_help("More info at: https://github.com/containers/conmon-rs"),
Expand Down
27 changes: 27 additions & 0 deletions conmon-rs/server/src/container_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl SharedContainerIO {
pub async fn attach(&self) -> SharedContainerAttach {
self.0.read().await.attach().clone()
}

/// Retrieve the underlying ContainerIO instance.
pub fn container_io(&mut self) -> Arc<RwLock<ContainerIO>> {
self.0.clone()
}
}

#[derive(Debug, Getters, MutGetters)]
Expand Down Expand Up @@ -155,6 +160,28 @@ impl ContainerIO {
Ok(path)
}

pub fn channels(
&mut self,
) -> Result<(
&mut UnboundedReceiver<Message>,
Option<&mut UnboundedReceiver<Message>>,
)> {
match self.typ_mut() {
ContainerIOType::Terminal(t) => {
if let Some(message_rx) = t.message_rx_mut() {
Ok((message_rx, None))
} else {
bail!("read_all_with_timeout called before message_rx was registered");
}
}
ContainerIOType::Streams(s) => {
let stdout_rx = &mut s.message_rx_stdout;
let stderr_rx = &mut s.message_rx_stderr;
Ok((stdout_rx, Some(stderr_rx)))
}
}
}

pub async fn read_all_with_timeout(
&mut self,
time_to_timeout: Option<Instant>,
Expand Down
1 change: 1 addition & 0 deletions conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod oom_watcher;
mod pause;
mod rpc;
mod server;
mod streaming_server;
mod streams;
mod telemetry;
mod terminal;
Expand Down
42 changes: 42 additions & 0 deletions conmon-rs/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,4 +458,46 @@ impl conmon::Server for Server {
.instrument(debug_span!("promise")),
)
}

fn serve_exec_container(
&mut self,
params: conmon::ServeExecContainerParams,
mut results: conmon::ServeExecContainerResults,
) -> Promise<(), capnp::Error> {
debug!("Got a serve exec container request");
let req = pry!(pry!(params.get()).get_request());

let span = debug_span!(
"serve_exec_container",
uuid = Uuid::new_v4().to_string().as_str()
);
let _enter = span.enter();
pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));

let id = pry!(req.get_id()).to_string();
let cmd = capnp_vec_str!(req.get_cmd());
let (tty, stdin, stdout, stderr) = (
req.get_tty(),
req.get_stdin(),
req.get_stdout(),
req.get_stderr(),
);
let mut streaming_server = self.streaming_server().clone();
let child_reaper = self.reaper().clone();
let config = self.config().clone();

Promise::from_future(
async move {
let url = capnp_err!(
streaming_server
.exec_url(child_reaper, config, id, cmd, tty, stdin, stdout, stderr)
.await
)?;

results.get().init_response().set_url(&url);
Ok(())
}
.instrument(debug_span!("promise")),
)
}
}
30 changes: 23 additions & 7 deletions conmon-rs/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
journal::Journal,
listener::{DefaultListener, Listener},
pause::Pause,
streaming_server::StreamingServer,
telemetry::Telemetry,
version::Version,
};
Expand Down Expand Up @@ -53,6 +54,9 @@ pub struct Server {
/// Fd socket instance.
#[getset(get = "pub(crate)")]
fd_socket: Arc<FdSocket>,

#[getset(get = "pub(crate)")]
streaming_server: Box<StreamingServer>,
}

impl Server {
Expand All @@ -62,6 +66,7 @@ impl Server {
config: Default::default(),
reaper: Default::default(),
fd_socket: Default::default(),
streaming_server: Default::default(),
};

if let Some(v) = server.config().version() {
Expand Down Expand Up @@ -273,7 +278,12 @@ impl Server {
.context("remove existing fd socket file")
}

async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
async fn start_backend(mut self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
self.streaming_server
.start()
.await
.context("start streaming server")?;

let listener =
Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
let client: conmon::Client = capnp_rpc::new_client(self);
Expand Down Expand Up @@ -357,6 +367,17 @@ impl GenerateRuntimeArgs<'_> {

/// Generate the OCI runtime CLI arguments from the provided parameters.
pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
let mut args = self.exec_sync_args_without_command();

for arg in command {
args.push(arg?.to_string());
}

debug!("Exec args {:?}", args.join(" "));
Ok(args)
}

pub(crate) fn exec_sync_args_without_command(&self) -> Vec<String> {
let mut args = vec![];

if let Some(rr) = self.config.runtime_root() {
Expand All @@ -378,11 +399,6 @@ impl GenerateRuntimeArgs<'_> {
args.push(format!("--pid-file={}", self.pidfile.display()));
args.push(self.id.into());

for arg in command {
args.push(arg?.to_string());
}

debug!("Exec args {:?}", args.join(" "));
Ok(args)
args
}
}

0 comments on commit a875baa

Please sign in to comment.