Skip to content

Commit

Permalink
fix(cubestore): Do not block scheduler loop during event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Feb 17, 2021
1 parent af920ca commit 3a0875e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion rust/cubestore/src/config/mod.rs
Expand Up @@ -53,7 +53,7 @@ impl CubeServices {
let cluster = self.cluster.clone();
tokio::spawn(async move { ClusterImpl::listen_on_metastore_port(cluster).await });
let scheduler = self.scheduler.clone();
tokio::spawn(async move { scheduler.run_scheduler().await });
tokio::spawn(async move { SchedulerImpl::run_scheduler(scheduler).await });
} else {
let cluster = self.cluster.clone();
tokio::spawn(async move { ClusterImpl::listen_on_worker_port(cluster).await });
Expand Down
17 changes: 10 additions & 7 deletions rust/cubestore/src/scheduler/mod.rs
Expand Up @@ -42,10 +42,10 @@ impl SchedulerImpl {
}
}

pub async fn run_scheduler(&self) -> Result<(), CubeError> {
pub async fn run_scheduler(scheduler: Arc<SchedulerImpl>) -> Result<(), CubeError> {
loop {
let mut stop_receiver = self.stop_receiver.lock().await;
let mut event_receiver = self.event_receiver.lock().await;
let mut stop_receiver = scheduler.stop_receiver.lock().await;
let mut event_receiver = scheduler.event_receiver.lock().await;
let event = tokio::select! {
res = stop_receiver.changed() => {
if res.is_err() || *stop_receiver.borrow() {
Expand All @@ -58,10 +58,13 @@ impl SchedulerImpl {
event?
}
};
let res = self.process_event(event.clone()).await;
if let Err(e) = res {
error!("Error processing event {:?}: {}", event, e);
}
let scheduler_to_move = scheduler.clone();
tokio::spawn(async move {
let res = scheduler_to_move.process_event(event.clone()).await;
if let Err(e) = res {
error!("Error processing event {:?}: {}", event, e);
}
});
}
}

Expand Down

0 comments on commit 3a0875e

Please sign in to comment.