Skip to content

Commit

Permalink
add oom monitor for rustshim
Browse files Browse the repository at this point in the history
  • Loading branch information
aa624545345 committed Dec 29, 2023
1 parent 9ac1f26 commit 751a811
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 6 deletions.
12 changes: 12 additions & 0 deletions crates/runc-shim/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub trait Container {
async fn stats(&self) -> Result<Metrics>;
async fn all_processes(&self) -> Result<Vec<ProcessInfo>>;
async fn close_io(&mut self, exec_id: Option<&str>) -> Result<()>;
async fn mem_cgruop_path(&self) -> String;
async fn mem_root_cgruop_path(&self) -> String;
}

#[async_trait]
Expand Down Expand Up @@ -82,6 +84,8 @@ pub struct ContainerTemplate<T, E, P> {
pub process_factory: P,
/// exec processes of this container
pub processes: HashMap<String, E>,
// memory cgroup path of this container
pub mem_cgruop_path: (String, String),
}

#[async_trait]
Expand Down Expand Up @@ -164,6 +168,14 @@ where
self.id.to_string()
}

async fn mem_cgruop_path(&self) -> String {
self.mem_cgruop_path.0.to_string()
}

async fn mem_root_cgruop_path(&self) -> String {
self.mem_cgruop_path.1.to_string()
}

#[cfg(target_os = "linux")]
async fn update(&mut self, resources: &LinuxResources) -> Result<()> {
self.init.update(resources).await
Expand Down
1 change: 1 addition & 0 deletions crates/runc-shim/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::env;

use containerd_shim::{asynchronous::run, parse};

mod cgroup_memory;
mod common;
mod console;
mod container;
Expand Down
9 changes: 9 additions & 0 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use super::{
container::{ContainerFactory, ContainerTemplate, ProcessFactory},
processes::{ProcessLifecycle, ProcessTemplate},
};
use crate::cgroup_memory;
use crate::{
common::{
check_kill_error, create_io, create_runc, get_spec_from_request, receive_socket,
Expand Down Expand Up @@ -125,6 +126,13 @@ impl ContainerFactory<RuncContainer> for RuncFactory {

let config = CreateConfig::default();
self.do_create(&mut init, config).await?;

// tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
let pid = init.pid;
let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid as u32).await?;
let get_path_from_mountinfo =
cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup).await?;

let container = RuncContainer {
id: id.to_string(),
bundle: bundle.to_string(),
Expand All @@ -136,6 +144,7 @@ impl ContainerFactory<RuncContainer> for RuncFactory {
io_gid: opts.io_gid,
},
processes: Default::default(),
mem_cgruop_path: get_path_from_mountinfo,
};
Ok(container)
}
Expand Down
47 changes: 41 additions & 6 deletions crates/runc-shim/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use containerd_shim::{
CloseIORequest, ConnectRequest, ConnectResponse, DeleteResponse, PidsRequest,
PidsResponse, StatsRequest, StatsResponse, UpdateTaskRequest,
},
events::task::{TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart},
events::task::{
TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart, TaskOOM,
},
protobuf::MessageDyn,
shim_async::Task,
ttrpc,
Expand All @@ -41,11 +43,11 @@ use containerd_shim::{
};
use log::{debug, info, warn};
use oci_spec::runtime::LinuxResources;
use tokio::sync::{mpsc::Sender, MappedMutexGuard, Mutex, MutexGuard};

use super::container::{Container, ContainerFactory};

type EventSender = Sender<(String, Box<dyn MessageDyn>)>;
use tokio::sync::{
mpsc::{Receiver, Sender},
MappedMutexGuard, Mutex, MutexGuard,
};
use tokio::task::spawn;

/// TaskService is a Task template struct, it is considered a helper struct,
/// which has already implemented `Task` trait, so that users can make it the type `T`
Expand Down Expand Up @@ -95,6 +97,22 @@ impl<F, C> TaskService<F, C> {
}
}

fn run_oom_monitor(mut rx: Receiver<String>, id: String, tx: EventSender) {
let oom_event = TaskOOM {
container_id: id,
..Default::default()
};
let topic = oom_event.topic();
let oom_box = Box::new(oom_event);
spawn(async move {
while let Some(_item) = rx.recv().await {
tx.send((topic.to_string(), oom_box.clone()))
.await
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
}
});
}

#[async_trait]
impl<F, C> Task for TaskService<F, C>
where
Expand Down Expand Up @@ -164,6 +182,23 @@ where
..Default::default()
})
.await;
// register oom event
let mem_cgroup_path =
container.mem_root_cgruop_path().await + &container.mem_cgruop_path().await;
let rx =
match cgroup_memory::register_memory_event_v1(&req.id, Path::new(&mem_cgroup_path))
.await
{
Ok(rx) => rx,
Err(e) => {
return Err(ttrpc::Error::Others(format!(
"notify_on_oom_v1 failed: {}",
e
)))
}
};
// run_oom_monitor
run_oom_monitor(rx, req.id.to_string(), self.tx.clone());
} else {
self.send_event(TaskExecStarted {
container_id: req.id.to_string(),
Expand Down

0 comments on commit 751a811

Please sign in to comment.