Skip to content

Commit

Permalink
Fix: memory check handling in scheduler
Browse files Browse the repository at this point in the history
The memory check does start a scan although there is insufficient
memory. This is fixed by moving the check before iterating through the
queued scans.
  • Loading branch information
nichtsfrei committed Mar 6, 2024
1 parent a32a282 commit 6e826f9
Showing 1 changed file with 243 additions and 7 deletions.
250 changes: 243 additions & 7 deletions rust/openvasd/src/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl From<Error> for ScanError {
pub struct Scheduler<DB, Scanner> {
/// Contains the currently queued scan ids.
queued: RwLock<Vec<String>>,

/// Contains the currently running scan ids.
running: RwLock<Vec<String>>,
/// Is used to retrieve scans and update status.
Expand Down Expand Up @@ -176,14 +177,15 @@ where

tracing::debug!(%amount_to_start, "handling scans");
for _ in 0..amount_to_start {
if let Some(scan_id) = queued.pop() {
sys.refresh_memory();
if let Some(min_free_memory) = self.config.min_free_mem {
let available_memory = sys.available_memory();
if available_memory < min_free_memory {
tracing::debug!(%min_free_memory, %available_memory, %scan_id, "insufficient memory to start scan.");
}
sys.refresh_memory();
if let Some(min_free_memory) = self.config.min_free_mem {
let available_memory = sys.available_memory();
if available_memory < min_free_memory {
tracing::debug!(%min_free_memory, %available_memory, "insufficient memory to start a scan.");
continue;
}
}
if let Some(scan_id) = queued.pop() {
let (scan, status) = self.db.get_decrypted_scan(&scan_id).await?;
tracing::debug!(?status, %scan_id, "starting scan");

Expand Down Expand Up @@ -460,6 +462,7 @@ where

#[cfg(test)]
mod tests {
use tracing_test::traced_test;

use models::Scan;

Expand All @@ -469,9 +472,239 @@ mod tests {
storage::{inmemory, ScanStorer as _},
};

mod synchronize {
use models::scanner::ScanStopper as _;

use super::*;

#[traced_test]
#[tokio::test]
async fn set_running() {
let scans = std::iter::repeat(Scan::default())
.take(10)
.map(|x| {
let mut y = x.clone();
y.scan_id = uuid::Uuid::new_v4().to_string();
y
})
.collect::<Vec<_>>();
let config = config::Scheduler::default();
let db = inmemory::Storage::default();
for s in scans.clone().into_iter() {
db.insert_scan(s).await.unwrap();
}
let scanner = models::scanner::Lambda::default();
let scheduler = Scheduler::new(config, scanner, db);
for s in scans {
scheduler.start_scan_by_id(&s.scan_id).await.unwrap();
}
scheduler.sync_scans().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 0);
assert_eq!(scheduler.running.read().await.len(), 10);
}

#[traced_test]
#[tokio::test]
async fn not_move_from_queue_on_max_running() {
let scans = std::iter::repeat(Scan::default())
.take(10)
.map(|x| {
let mut y = x.clone();
y.scan_id = uuid::Uuid::new_v4().to_string();
y
})
.collect::<Vec<_>>();
let mut config = config::Scheduler::default();
config.max_running_scans = Some(5);
let db = inmemory::Storage::default();
for s in scans.clone().into_iter() {
db.insert_scan(s).await.unwrap();
}
let scanner = models::scanner::Lambda::default();
let scheduler = Scheduler::new(config, scanner, db);
for s in scans {
scheduler.start_scan_by_id(&s.scan_id).await.unwrap();
}
scheduler.sync_scans().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 5);
assert_eq!(scheduler.running.read().await.len(), 5);
// no change
scheduler.sync_scans().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 5);
assert_eq!(scheduler.running.read().await.len(), 5);
scheduler.running.write().await.clear();
scheduler.sync_scans().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 0);
assert_eq!(scheduler.running.read().await.len(), 5);
}
#[traced_test]
#[tokio::test]
async fn not_move_from_queue_on_insufficient_memory() {
let scans = std::iter::repeat(Scan::default())
.take(10)
.map(|x| {
let mut y = x.clone();
y.scan_id = uuid::Uuid::new_v4().to_string();
y
})
.collect::<Vec<_>>();
let mut config = config::Scheduler::default();

let mut sys = sysinfo::System::new();
sys.refresh_memory();
config.min_free_mem = Some(sys.available_memory() + 1000);

let db = inmemory::Storage::default();
for s in scans.clone().into_iter() {
db.insert_scan(s).await.unwrap();
}
let scanner = models::scanner::Lambda::default();
let scheduler = Scheduler::new(config, scanner, db);
for s in scans {
scheduler.start_scan_by_id(&s.scan_id).await.unwrap();
}
scheduler.sync_scans().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 10);
assert_eq!(scheduler.running.read().await.len(), 0);
}

#[traced_test]
#[tokio::test]
async fn not_move_from_queue_on_connection_error() {
let scans = std::iter::repeat(Scan::default())
.take(10)
.map(|x| {
let mut y = x.clone();
y.scan_id = uuid::Uuid::new_v4().to_string();
y
})
.collect::<Vec<_>>();
let config = config::Scheduler::default();

let db = inmemory::Storage::default();
for s in scans.clone().into_iter() {
db.insert_scan(s).await.unwrap();
}
let scanner = models::scanner::LambdaBuilder::new()
.with_start(|_| Err(models::scanner::Error::Connection("m".to_string())))
.build();
let scheduler = Scheduler::new(config, scanner, db);
for s in scans {
scheduler.start_scan_by_id(&s.scan_id).await.unwrap();
}
scheduler.sync_scans().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 10);
assert_eq!(scheduler.running.read().await.len(), 0);
}

#[traced_test]
#[tokio::test]
async fn remove_from_queue_on_any_other_scan_error() {
let scans = std::iter::repeat(Scan::default())
.take(10)
.map(|x| {
let mut y = x.clone();
y.scan_id = uuid::Uuid::new_v4().to_string();
y
})
.collect::<Vec<_>>();
let config = config::Scheduler::default();

let db = inmemory::Storage::default();
for s in scans.clone().into_iter() {
db.insert_scan(s).await.unwrap();
}
let scanner = models::scanner::LambdaBuilder::new()
.with_start(|_| Err(models::scanner::Error::Unexpected("m".to_string())))
.build();
let scheduler = Scheduler::new(config, scanner, db);
for s in scans {
scheduler.start_scan_by_id(&s.scan_id).await.unwrap();
}
scheduler.sync_scans().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 0);
assert_eq!(scheduler.running.read().await.len(), 0);
}

#[traced_test]
#[tokio::test]
async fn remove_from_running_when_stop() {
let scans = std::iter::repeat(Scan::default())
.take(10)
.map(|x| {
let mut y = x.clone();
y.scan_id = uuid::Uuid::new_v4().to_string();
y
})
.collect::<Vec<_>>();
let config = config::Scheduler::default();
let db = inmemory::Storage::default();
for s in scans.clone().into_iter() {
db.insert_scan(s).await.unwrap();
}
let scanner = models::scanner::Lambda::default();
let scheduler = Scheduler::new(config, scanner, db);
for s in scans.iter() {
scheduler.start_scan_by_id(&s.scan_id).await.unwrap();
}
scheduler.sync_scans().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 0);
assert_eq!(scheduler.running.read().await.len(), 10);
for s in scans {
scheduler.stop_scan(s.scan_id).await.unwrap();
}
assert_eq!(scheduler.queued.read().await.len(), 0);
assert_eq!(scheduler.running.read().await.len(), 0);
}
#[traced_test]
#[tokio::test]
async fn remove_from_running_when_finished() {
let scans = std::iter::repeat(Scan::default())
.take(10)
.map(|x| {
let mut y = x.clone();
y.scan_id = uuid::Uuid::new_v4().to_string();
y
})
.collect::<Vec<_>>();
let config = config::Scheduler::default();
let db = inmemory::Storage::default();
for s in scans.clone().into_iter() {
db.insert_scan(s).await.unwrap();
}
let scanner = models::scanner::LambdaBuilder::default()
.with_fetch(|s| {
Ok(models::scanner::ScanResults {
id: s.to_string(),
status: models::Status {
start_time: None,
end_time: None,
status: models::Phase::Succeeded,
host_info: None,
},
results: vec![],
})
})
.build();
let scheduler = Scheduler::new(config, scanner, db);
for s in scans.iter() {
scheduler.start_scan_by_id(&s.scan_id).await.unwrap();
}
// we cannot use overall sync as it does result fetching
//scheduler.sync_scans().await.unwrap();
scheduler.coordinate_scans().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 0);
assert_eq!(scheduler.running.read().await.len(), 10);
scheduler.handle_results().await.unwrap();
assert_eq!(scheduler.queued.read().await.len(), 0);
assert_eq!(scheduler.running.read().await.len(), 0);
}
}

mod start {
use super::*;

#[traced_test]
#[tokio::test]
async fn adds_scan_to_queue() {
let config = config::Scheduler::default();
Expand All @@ -484,6 +717,7 @@ mod tests {
assert_eq!(scheduler.queued.read().await.len(), 1);
assert_eq!(scheduler.running.read().await.len(), 0);
}
#[traced_test]
#[tokio::test]
async fn error_starting_twice() {
let config = config::Scheduler::default();
Expand All @@ -501,6 +735,7 @@ mod tests {
"should return a ScanAlreadyQueued"
);
}
#[traced_test]
#[tokio::test]
async fn error_not_found() {
let config = config::Scheduler::default();
Expand All @@ -515,6 +750,7 @@ mod tests {
"should return a not found"
);
}
#[traced_test]
#[tokio::test]
async fn error_queue_is_full() {
let mut config = config::Scheduler::default();
Expand Down

0 comments on commit 6e826f9

Please sign in to comment.