Skip to content

Commit

Permalink
Add streaming server
Browse files Browse the repository at this point in the history
Signed-off-by: Sascha Grunert <sgrunert@redhat.com>
  • Loading branch information
saschagrunert committed Mar 21, 2024
1 parent 526bab4 commit b5702dc
Show file tree
Hide file tree
Showing 17 changed files with 1,827 additions and 266 deletions.
329 changes: 311 additions & 18 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ integration-static: .install.ginkgo # It needs to be release so we correctly tes
$(MAKE) release-static; \
fi && \
export RUNTIME_BINARY="$(RUNTIME_PATH)" && \
export MAX_RSS_KB=7500 && \
export MAX_RSS_KB=9500 && \
sudo -E "$(GOTOOLS_BINDIR)/ginkgo" $(TEST_FLAGS) $(GINKGO_FLAGS)

.install.ginkgo:
Expand Down
18 changes: 18 additions & 0 deletions conmon-rs/common/proto/conmon.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,22 @@ interface Conmon {
key @0 :Text;
value @1 :Text;
}

###############################################
# ServeExecContainer
struct ServeExecContainerRequest {
metadata @0 :Metadata; # Standard metadata to carry.
id @1 :Text;
cmd @2 :List(Text);
tty @3 :Bool;
stdin @4 :Bool;
stdout @5 :Bool;
stderr @6 :Bool;
}

struct ServeExecContainerResponse {
url @0 :Text;
}

serveExecContainer @8 (request: ServeExecContainerRequest) -> (response: ServeExecContainerResponse);
}
3 changes: 3 additions & 0 deletions conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ path = "src/main.rs"

[dependencies]
anyhow = "1.0.81"
async-channel = "2.2.0"
axum = { version = "0.7.4", features = ["ws"]}
capnp = "0.19.2"
capnp-rpc = "0.19.0"
clap = { version = "4.3.8", features = ["color", "cargo", "deprecated", "derive", "deprecated", "env", "string", "unicode", "wrap_help"] }
Expand Down Expand Up @@ -41,6 +43,7 @@ tokio = { version = "1.36.0", features = ["fs", "io-std", "io-util", "macros", "
tokio-eventfd = "0.2.1"
tokio-seqpacket = "0.7.1"
tokio-util = { version = "0.7.10", features = ["compat"] }
tower-http = { version = "0.5.2", features = ["trace"] }
tracing = "0.1.40"
tracing-opentelemetry = "0.23.0"
tracing-subscriber = "0.3.18"
Expand Down
5 changes: 5 additions & 0 deletions conmon-rs/server/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ impl SharedContainerAttach {
}
Ok(())
}

/// Retrieve a clone of the stdin sender.
pub fn stdin(&self) -> &Sender<Vec<u8>> {
&self.read_half_tx
}
}

#[derive(Clone, Debug)]
Expand Down
131 changes: 131 additions & 0 deletions conmon-rs/server/src/bounded_hashmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::{
collections::HashMap,
fmt::Debug,
hash::Hash,
time::{Duration, Instant},
};
use tracing::warn;

#[derive(Debug)]
/// A HashMap bounded by element age and maximum amount of items
pub struct BoundedHashMap<K, V> {
map: HashMap<K, (Instant, V)>,
max_duration: Duration,
max_items: usize,
}

impl<K, V> BoundedHashMap<K, V>
where
K: Eq + Hash + Clone + Debug,
V: Debug,
{
/// Insert an element into the hashmap by:
/// - removing timed-out elements
/// - removing the oldest element if no space left
pub fn insert(&mut self, k: K, v: V) -> Option<V> {
let now = Instant::now();

// Remove timed-out items
let old_len = self.map.len();
self.map
.retain(|_, (inserted, _)| now - *inserted <= self.max_duration);
if old_len < self.map.len() {
warn!("Removed {} timed out elements", self.map.len() - old_len)
}

// Remove the oldest entry if still not enough space left
if self.map.len() >= self.max_items {
let mut key_to_remove = k.clone();

let mut oldest = now;
for (key, (inserted, _)) in self.map.iter() {
if *inserted < oldest {
oldest = *inserted;
key_to_remove = key.clone();
}
}

warn!("Removing oldest key: {:?}", key_to_remove);
self.map.remove(&key_to_remove);
}

self.map.insert(k, (Instant::now(), v)).map(|v| v.1)
}

/// Remove an element from the hashmap and return it if the element has not expired.
pub fn remove(&mut self, k: &K) -> Option<V> {
let now = Instant::now();

if let Some((key, (inserted, value))) = self.map.remove_entry(k) {
if now - inserted > self.max_duration {
warn!("Max duration expired for key: {:?}", key);
None
} else {
Some(value)
}
} else {
None
}
}
}

impl<K, V> Default for BoundedHashMap<K, V> {
fn default() -> Self {
Self {
map: Default::default(),
max_duration: Duration::new(60 * 60, 0), // 1 hour
max_items: 1000,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::thread::sleep;

#[test]
fn bounded_hashmap_test() {
let mut sut = BoundedHashMap::default();
sut.max_items = 2;

assert_eq!(sut.map.len(), 0);

// Insert first item should be fine
assert!(sut.insert(0, 0).is_none());
assert_eq!(sut.map.len(), 1);

// Insert second item should be fine, removal should work as well
assert!(sut.insert(1, 0).is_none());
assert_eq!(sut.map.len(), 2);
assert!(sut.remove(&1).is_some());
assert_eq!(sut.map.len(), 1);
assert!(sut.insert(1, 0).is_none());

// Insert third item should be fine, but remove oldest
assert!(sut.insert(2, 0).is_none());
assert_eq!(sut.map.len(), 2);
assert!(sut.map.get(&0).is_none());
assert!(sut.map.get(&1).is_some());
assert!(sut.map.get(&2).is_some());

// Insert another item should be fine, but remove oldest
assert!(sut.insert(3, 0).is_none());
assert_eq!(sut.map.len(), 2);
assert!(sut.map.get(&1).is_none());
assert!(sut.map.get(&2).is_some());
assert!(sut.map.get(&3).is_some());

// Change the max age of the elements, all should be timed out
sut.max_duration = Duration::from_millis(100);
sleep(Duration::from_millis(200));
assert!(sut.insert(0, 0).is_none());
assert!(sut.map.get(&1).is_none());
assert!(sut.map.get(&2).is_none());
assert!(sut.map.get(&3).is_none());

// The last element should be also timed out if we wait
sleep(Duration::from_millis(200));
assert!(sut.remove(&0).is_none());
}
}
4 changes: 3 additions & 1 deletion conmon-rs/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ macro_rules! prefix {
};
}

#[derive(CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters)]
#[derive(
Clone, CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters,
)]
#[serde(rename_all = "kebab-case")]
#[command(
after_help("More info at: https://github.com/containers/conmon-rs"),
Expand Down
43 changes: 32 additions & 11 deletions conmon-rs/server/src/container_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
terminal::Terminal,
};
use anyhow::{bail, Context, Result};
use async_channel::{Receiver, Sender};
use getset::{Getters, MutGetters};
use nix::errno::Errno;
use std::{
Expand All @@ -15,10 +16,7 @@ use tempfile::Builder;
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
select,
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
},
sync::RwLock,
time::{self, Instant},
};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -59,6 +57,11 @@ impl SharedContainerIO {
pub async fn attach(&self) -> SharedContainerAttach {
self.0.read().await.attach().clone()
}

/// Retrieve the underlying stdout and stderr channels.
pub async fn channels(&mut self) -> Result<(Receiver<Message>, Receiver<Message>)> {
self.0.read().await.channels()
}
}

#[derive(Debug, Getters, MutGetters)]
Expand Down Expand Up @@ -155,6 +158,22 @@ impl ContainerIO {
Ok(path)
}

pub fn channels(&self) -> Result<(Receiver<Message>, Receiver<Message>)> {
match self.typ() {
ContainerIOType::Terminal(t) => {
if let Some(message_rx) = t.message_rx() {
let (_, fake_rx) = async_channel::unbounded();
Ok((message_rx.clone(), fake_rx))
} else {
bail!("channels called before message_rx was registered");
}
}
ContainerIOType::Streams(s) => {
Ok((s.message_rx_stdout.clone(), s.message_rx_stderr.clone()))
}
}
}

pub async fn read_all_with_timeout(
&mut self,
time_to_timeout: Option<Instant>,
Expand Down Expand Up @@ -184,27 +203,26 @@ impl ContainerIO {

async fn read_stream_with_timeout(
time_to_timeout: Option<Instant>,
receiver: &mut UnboundedReceiver<Message>,
receiver: &mut Receiver<Message>,
) -> (Vec<u8>, bool) {
let mut stdio = vec![];
let mut timed_out = false;
loop {
let msg = if let Some(time_to_timeout) = time_to_timeout {
{
match time::timeout_at(time_to_timeout, receiver.recv()).await {
Ok(Some(msg)) => msg,
Err(_) => {
Ok(Ok(msg)) => msg,
_ => {
timed_out = true;
Message::Done
}
Ok(None) => unreachable!(),
}
}
} else {
{
match receiver.recv().await {
Some(msg) => msg,
None => Message::Done,
Ok(msg) => msg,
_ => Message::Done,
}
}
};
Expand All @@ -230,7 +248,7 @@ impl ContainerIO {
mut reader: T,
pipe: Pipe,
logger: SharedContainerLog,
message_tx: UnboundedSender<Message>,
message_tx: Sender<Message>,
mut attach: SharedContainerAttach,
) -> Result<()>
where
Expand All @@ -251,6 +269,7 @@ impl ContainerIO {
if !message_tx.is_closed() {
message_tx
.send(Message::Done)
.await
.context("send done message")?;
}

Expand All @@ -275,6 +294,7 @@ impl ContainerIO {
if !message_tx.is_closed() {
message_tx
.send(Message::Data(data.into(), pipe))
.await
.context("send data message")?;
}
}
Expand All @@ -290,6 +310,7 @@ impl ContainerIO {
if !message_tx.is_closed() {
message_tx
.send(Message::Done)
.await
.context("send done message")?;
}
return Ok(());
Expand Down
2 changes: 2 additions & 0 deletions conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub use version::Version;
mod macros;

mod attach;
mod bounded_hashmap;
mod capnp_util;
mod child;
mod child_reaper;
Expand All @@ -24,6 +25,7 @@ mod oom_watcher;
mod pause;
mod rpc;
mod server;
mod streaming_server;
mod streams;
mod telemetry;
mod terminal;
Expand Down

0 comments on commit b5702dc

Please sign in to comment.