Skip to content

Commit

Permalink
add oom monitor for rustshim
Browse files Browse the repository at this point in the history
  • Loading branch information
aa624545345 committed Feb 1, 2024
1 parent 9ac1f26 commit 796f337
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 5 deletions.
5 changes: 4 additions & 1 deletion crates/runc-shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ containerd-shim = { path = "../shim", version = "0.6.0", features = ["async"] }
crossbeam = "0.8.1"
libc.workspace = true
log.workspace = true
nix = { workspace = true, features = ["socket", "uio", "term"] }
nix = { workspace = true, features = ["socket", "uio", "term", "event"] }
oci-spec.workspace = true
runc = { path = "../runc", version = "0.2.0", features = ["async"] }
serde.workspace = true
serde_json.workspace = true
time.workspace = true
uuid.workspace = true

[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.3.3"

# Async dependencies
async-trait.workspace = true
futures.workspace = true
Expand Down
184 changes: 184 additions & 0 deletions crates/runc-shim/src/cgroup_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
use containerd_shim::{
error::{Error, Result},
io_error, other_error,
};
use std::path::Path;
use tokio::{
fs::{self, read_to_string, File},
io::AsyncReadExt,
sync::mpsc::{self, Receiver},
};

use nix::sys::eventfd;
use std::os::unix::io::{AsRawFd, FromRawFd};

pub async fn get_path_from_cgorup(pid: u32) -> Result<String> {
let proc_path = format!("/proc/{}/cgroup", pid);
let path_string = read_to_string(proc_path.clone()).await.map_err(io_error!(
e,
"open {}.",
proc_path.clone()
))?;

let (_, path) = path_string
.lines()
.find(|line| line.contains("memory"))
.ok_or(Error::Other("Memory line not found".into()))?
.split_once(":memory:")
.ok_or(Error::Other("Failed to parse memory line".into()))?;

Ok(path.to_string())
}

pub async fn get_existing_cgroup_mem_path(pid_path: String) -> Result<(String, String)> {
let (mut mount_root, mount_point) = get_path_from_mountinfo().await?;
if mount_root == "/" {
mount_root = String::from("");
}
let mount_root = pid_path.trim_start_matches(&mount_root).to_string();
Ok((mount_root, mount_point))
}

async fn get_path_from_mountinfo() -> Result<(String, String)> {
let mountinfo_path = "/proc/self/mountinfo".to_string();
let mountinfo_string = read_to_string(mountinfo_path.clone())
.await
.map_err(io_error!(e, "open {}.", mountinfo_path.clone()))?;

let line = mountinfo_string
.lines()
.find(|line| line.contains("cgroup") && line.contains("memory"))
.ok_or(Error::Other(
"Lines containers cgroup and memory not found in mountinfo".into(),
))?;

parse_memory_mountroot(line)
}

fn parse_memory_mountroot(line: &str) -> Result<(String, String)> {
let mut columns = line.split_whitespace();
columns.nth(2);
let mount_root = columns.next().ok_or(Error::Other(
"Invalid input information about mountinfo".into(),
))?;
let mount_point = columns.next().ok_or(Error::Other(
"Invalid input information about mountinfo".into(),
))?;
Ok((mount_root.to_string(), mount_point.to_string()))
}

pub async fn register_memory_event(
key: &str,
cg_dir: &Path,
event_name: &str,
) -> Result<Receiver<String>> {
let path = cg_dir.join(event_name);
let event_file = fs::File::open(path.clone())
.await
.map_err(other_error!(e, "Error get path:"))?;

let eventfd = eventfd::eventfd(0, eventfd::EfdFlags::EFD_CLOEXEC)?;

let event_control_path = cg_dir.join("cgroup.event_control");
let data = format!("{} {}", eventfd.as_raw_fd(), event_file.as_raw_fd());
fs::write(&event_control_path, data.clone())
.await
.map_err(other_error!(e, "Error write eventfd:"))?;

let mut buf = [0u8; 8];

let (sender, receiver) = mpsc::channel(128);
let key = key.to_string();

tokio::spawn(async move {
let mut eventfd_file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
loop {
match eventfd_file.read(&mut buf).await {
Ok(bytes_read) if bytes_read == 0 => return,
Err(_) => return,
_ => (),
}
if !Path::new(&event_control_path).exists() {
return;
}
sender.send(key.clone()).await.unwrap();
}
});

Ok(receiver)
}

#[cfg(test)]
mod tests {
use std::path::Path;

use tokio::{fs::remove_file, io::AsyncWriteExt, process::Command};

use crate::cgroup_memory;
use cgroups_rs::{
hierarchies::{self, is_cgroup2_unified_mode},
memory::MemController,
Cgroup, CgroupPid,
};

#[tokio::test]
async fn test_cgroupv1_oom_monitor() {
if !is_cgroup2_unified_mode() {
// Create a memory cgroup with limits on both memory and swap.
let path = "cgroupv1_oom_monitor";
let cg = Cgroup::new(hierarchies::auto(), path).unwrap();

let mem_controller: &MemController = cg.controller_of().unwrap();
mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M
mem_controller.set_swappiness(0).unwrap();

// Create a sh sub process, and let it wait for the stdinput.
let mut child_process = Command::new("sh")
.stdin(std::process::Stdio::piped())
.spawn()
.unwrap();

let pid = child_process.id().unwrap();

// Add the sh subprocess to the cgroup.
cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap();

// Set oom monitor
let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await.unwrap();
let (mount_root, mount_point) =
cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup)
.await
.unwrap();

let mem_cgroup_path = mount_point + &mount_root;
let mut rx = cgroup_memory::register_memory_event(
&pid.to_string().as_str(),
Path::new(&mem_cgroup_path),
"memory.oom_control",
)
.await
.unwrap();

// Exec the sh subprocess to a dd command that consumes more than 10M of memory.
if let Some(mut stdin) = child_process.stdin.take() {
stdin
.write_all(
b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n",
)
.await
.unwrap();
stdin.flush().await.unwrap();
}

// Wait for the oom message.
if let Some(item) = rx.recv().await {
assert_eq!(pid.to_string(), item, "Receive error oom message");
}

// Clean.
child_process.wait().await.unwrap();
cg.delete().unwrap();
remove_file("/tmp/test_oom_monitor_file").await.unwrap();
}
}
}
1 change: 1 addition & 0 deletions crates/runc-shim/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::env;

use containerd_shim::{asynchronous::run, parse};

mod cgroup_memory;
mod common;
mod console;
mod container;
Expand Down
2 changes: 1 addition & 1 deletion crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,8 @@ mod tests {
write_str_to_file(Path::new(test_dir).join(LOG_JSON_FILE).as_path(), log_json)
.await
.expect("write log json should not be error");

let expectd_msg = "panic";

let actual_err = runtime_error(test_dir, empty_err, "").await;
remove_dir_all(test_dir)
.await
Expand Down
50 changes: 47 additions & 3 deletions crates/runc-shim/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,37 @@ use containerd_shim::{
},
asynchronous::ExitSignal,
event::Event,
other_error,
protos::{
api::{
CloseIORequest, ConnectRequest, ConnectResponse, DeleteResponse, PidsRequest,
PidsResponse, StatsRequest, StatsResponse, UpdateTaskRequest,
},
events::task::{TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart},
events::task::{
TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskOOM, TaskStart,
},
protobuf::MessageDyn,
shim_async::Task,
ttrpc,
ttrpc::r#async::TtrpcContext,
},
util::{convert_to_any, convert_to_timestamp, AsOption},
TtrpcResult,
Error, TtrpcResult,
};
use log::{debug, info, warn};
use oci_spec::runtime::LinuxResources;
use tokio::sync::{mpsc::Sender, MappedMutexGuard, Mutex, MutexGuard};
use tokio::sync::{
mpsc::{Receiver, Sender},
MappedMutexGuard, Mutex, MutexGuard,
};
use tokio::task::spawn;

use super::container::{Container, ContainerFactory};

use crate::cgroup_memory;
use cgroups_rs::hierarchies::is_cgroup2_unified_mode;

use std::path::Path;
type EventSender = Sender<(String, Box<dyn MessageDyn>)>;

/// TaskService is a Task template struct, it is considered a helper struct,
Expand Down Expand Up @@ -95,6 +106,22 @@ impl<F, C> TaskService<F, C> {
}
}

fn run_oom_monitor(mut rx: Receiver<String>, id: String, tx: EventSender) {
let oom_event = TaskOOM {
container_id: id,
..Default::default()
};
let topic = oom_event.topic();
let oom_box = Box::new(oom_event);
spawn(async move {
while let Some(_item) = rx.recv().await {
tx.send((topic.to_string(), oom_box.clone()))
.await
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
}
});
}

#[async_trait]
impl<F, C> Task for TaskService<F, C>
where
Expand Down Expand Up @@ -164,6 +191,23 @@ where
..Default::default()
})
.await;
std::thread::sleep(std::time::Duration::from_secs(30));
if !is_cgroup2_unified_mode() {
let path_from_cgorup = cgroup_memory::get_path_from_cgorup(resp.pid).await?;
let (mount_root, mount_point) =
cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup).await?;

let mem_cgroup_path = mount_point + &mount_root;
let rx = cgroup_memory::register_memory_event(
&req.id,
Path::new(&mem_cgroup_path),
"memory.oom_control",
)
.await
.map_err(other_error!(e, "register_memory_event failed:"))?;

run_oom_monitor(rx, req.id.to_string(), self.tx.clone());
}
} else {
self.send_event(TaskExecStarted {
container_id: req.id.to_string(),
Expand Down

0 comments on commit 796f337

Please sign in to comment.