Skip to content

Commit

Permalink
add oom monitor for rustshim
Browse files Browse the repository at this point in the history
  • Loading branch information
aa624545345 committed Dec 29, 2023
1 parent 9ac1f26 commit cc3b2bb
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 2 deletions.
134 changes: 134 additions & 0 deletions crates/runc-shim/src/cgroup_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use containerd_shim::{
error::{Error, Result},
io_error, other_error,
};
use std::path::Path;
use tokio::{
fs::{self, File},
io::{AsyncBufReadExt, AsyncReadExt, BufReader},
sync::mpsc::{self, Receiver},
};

use std::os::unix::io::{AsRawFd, FromRawFd};

pub async fn get_path_from_cgorup(pid: u32) -> Result<String> {
let proc_path = format!("/proc/{}/cgroup", pid);
let file = File::open(proc_path.clone())
.await
.map_err(io_error!(e, "open {}.", proc_path))?;
let mut lines = BufReader::new(file).lines();
while let Some(line) =
lines
.next_line()
.await
.map_err(io_error!(e, "read file {} failed.", proc_path))?
{
if line.contains("memory") {
let parts: Vec<&str> = line.split(':').collect();
if let Some(path) = parts.last() {
return Ok(path.to_string());
}
break;
}
}
Err("Memory cgroup path not found").map_err(other_error!(e, "Error get path:"))?
}

pub async fn get_existing_cgroup_mem_path(pid_path: String) -> Result<(String, String)> {
let mut mountinfo_path = get_path_from_mountinfo().await?;
if mountinfo_path.0 == "/" {
mountinfo_path.0 = String::from("");
}
let res = pid_path.trim_start_matches(&mountinfo_path.0).to_string();
Ok((res, mountinfo_path.1))
}

async fn get_path_from_mountinfo() -> Result<(String, String)> {
let mountinfo_file = File::open("/proc/self/mountinfo".clone())
.await
.map_err(io_error!(e, "open {}", "/proc/self/mountinfo"))?;

// for line in BufReader::new(mountinfo_file).lines() {
let mut lines = BufReader::new(mountinfo_file).lines();
while let Some(line) = lines
.next_line()
.await
.map_err(io_error!(e, "read file /proc/self/mountinfo failed."))?
{
if line.contains("cgroup") && line.contains("memory") {
return parse_memory_mountroot(&line);
}
}

Err("No memory found in mountinfo file.")
.map_err(other_error!(e, "Error get path from montinfo:"))?
}

fn parse_memory_mountroot(line: &str) -> Result<(String, String)> {
let s_values: Vec<_> = line.split(" - ").collect();
if s_values.len() != 2 {
return Err("Parse from mountinfo failed")
.map_err(other_error!(e, "Error get path from montinfo:"))?;
}

let s0_values: Vec<_> = s_values[0].trim().split(' ').collect();
if s0_values.len() < 6 {
return Err("Parse from mountinfo failed")
.map_err(other_error!(e, "Error get path from montinfo:"))?;
}
Ok((s0_values[3].to_string(), s0_values[4].to_string()))
}

pub async fn register_memory_event_v1(key: &str, dir: &Path) -> Result<Receiver<String>> {
register_memory_event(key, dir, "memory.oom_control", "").await
}

async fn register_memory_event(
key: &str,
cg_dir: &Path,
event_name: &str,
arg: &str,
) -> Result<Receiver<String>> {
let path = cg_dir.join(event_name);
let event_file = fs::File::open(path.clone())
.await
.map_err(other_error!(e, "Error get path:"))?;

let eventfd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC) };

let event_control_path = cg_dir.join("cgroup.event_control");
let data = if arg.is_empty() {
format!("{} {}", eventfd, event_file.as_raw_fd())
} else {
format!("{} {} {}", eventfd, event_file.as_raw_fd(), arg)
};
fs::write(&event_control_path, data.clone())
.await
.map_err(other_error!(e, "Error write eventfd:"))?;

let mut buf = [0u8; 8];

let (sender, receiver) = mpsc::channel(128);
let key = key.to_string();
let mut eventfd_file = unsafe { File::from_raw_fd(eventfd) };
tokio::spawn(async move {
loop {
match eventfd_file.read(&mut buf).await {
Ok(bytes_read) => {
if bytes_read == 0 {
return;
}
}
Err(_) => {
return;
}
}
if !Path::new(&event_control_path).exists() {
return;
}
sender.send(key.clone()).await.unwrap();
}
});

Ok(receiver)
}
12 changes: 12 additions & 0 deletions crates/runc-shim/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub trait Container {
async fn stats(&self) -> Result<Metrics>;
async fn all_processes(&self) -> Result<Vec<ProcessInfo>>;
async fn close_io(&mut self, exec_id: Option<&str>) -> Result<()>;
async fn mem_cgruop_path(&self) -> String;
async fn mem_root_cgruop_path(&self) -> String;
}

#[async_trait]
Expand Down Expand Up @@ -82,6 +84,8 @@ pub struct ContainerTemplate<T, E, P> {
pub process_factory: P,
/// exec processes of this container
pub processes: HashMap<String, E>,
// memory cgroup path of this container
pub mem_cgruop_path: (String, String),
}

#[async_trait]
Expand Down Expand Up @@ -164,6 +168,14 @@ where
self.id.to_string()
}

async fn mem_cgruop_path(&self) -> String {
self.mem_cgruop_path.0.to_string()
}

async fn mem_root_cgruop_path(&self) -> String {
self.mem_cgruop_path.1.to_string()
}

#[cfg(target_os = "linux")]
async fn update(&mut self, resources: &LinuxResources) -> Result<()> {
self.init.update(resources).await
Expand Down
1 change: 1 addition & 0 deletions crates/runc-shim/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::env;

use containerd_shim::{asynchronous::run, parse};

mod cgroup_memory;
mod common;
mod console;
mod container;
Expand Down
9 changes: 9 additions & 0 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use super::{
container::{ContainerFactory, ContainerTemplate, ProcessFactory},
processes::{ProcessLifecycle, ProcessTemplate},
};
use crate::cgroup_memory;
use crate::{
common::{
check_kill_error, create_io, create_runc, get_spec_from_request, receive_socket,
Expand Down Expand Up @@ -125,6 +126,13 @@ impl ContainerFactory<RuncContainer> for RuncFactory {

let config = CreateConfig::default();
self.do_create(&mut init, config).await?;

// tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
let pid = init.pid;
let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid as u32).await?;
let get_path_from_mountinfo =
cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup).await?;

let container = RuncContainer {
id: id.to_string(),
bundle: bundle.to_string(),
Expand All @@ -136,6 +144,7 @@ impl ContainerFactory<RuncContainer> for RuncFactory {
io_gid: opts.io_gid,
},
processes: Default::default(),
mem_cgruop_path: get_path_from_mountinfo,
};
Ok(container)
}
Expand Down
45 changes: 43 additions & 2 deletions crates/runc-shim/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use containerd_shim::{
CloseIORequest, ConnectRequest, ConnectResponse, DeleteResponse, PidsRequest,
PidsResponse, StatsRequest, StatsResponse, UpdateTaskRequest,
},
events::task::{TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart},
events::task::{
TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskOOM, TaskStart,
},
protobuf::MessageDyn,
shim_async::Task,
ttrpc,
Expand All @@ -41,10 +43,16 @@ use containerd_shim::{
};
use log::{debug, info, warn};
use oci_spec::runtime::LinuxResources;
use tokio::sync::{mpsc::Sender, MappedMutexGuard, Mutex, MutexGuard};
use tokio::sync::{
mpsc::{Receiver, Sender},
MappedMutexGuard, Mutex, MutexGuard,
};
use tokio::task::spawn;

use super::container::{Container, ContainerFactory};

use crate::cgroup_memory;
use std::path::Path;
type EventSender = Sender<(String, Box<dyn MessageDyn>)>;

/// TaskService is a Task template struct, it is considered a helper struct,
Expand Down Expand Up @@ -95,6 +103,22 @@ impl<F, C> TaskService<F, C> {
}
}

fn run_oom_monitor(mut rx: Receiver<String>, id: String, tx: EventSender) {
let oom_event = TaskOOM {
container_id: id,
..Default::default()
};
let topic = oom_event.topic();
let oom_box = Box::new(oom_event);
spawn(async move {
while let Some(_item) = rx.recv().await {
tx.send((topic.to_string(), oom_box.clone()))
.await
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
}
});
}

#[async_trait]
impl<F, C> Task for TaskService<F, C>
where
Expand Down Expand Up @@ -164,6 +188,23 @@ where
..Default::default()
})
.await;
// register oom event
let mem_cgroup_path =
container.mem_root_cgruop_path().await + &container.mem_cgruop_path().await;
let rx =
match cgroup_memory::register_memory_event_v1(&req.id, Path::new(&mem_cgroup_path))
.await
{
Ok(rx) => rx,
Err(e) => {
return Err(ttrpc::Error::Others(format!(
"notify_on_oom_v1 failed: {}",
e
)))
}
};
// run_oom_monitor
run_oom_monitor(rx, req.id.to_string(), self.tx.clone());
} else {
self.send_event(TaskExecStarted {
container_id: req.id.to_string(),
Expand Down

0 comments on commit cc3b2bb

Please sign in to comment.