Skip to content

Commit

Permalink
[GIE pegasus] Support cancelling query. (#2168)
Browse files Browse the repository at this point in the history
  • Loading branch information
lnfjpt committed Nov 1, 2022
1 parent 26f0382 commit f983bd6
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 8 deletions.
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/kubernetes_launcher.py
Expand Up @@ -1090,7 +1090,7 @@ def create_analytical_instance(self):
)
setattr(self._analytical_engine_process, "stdout_watcher", stdout_watcher)
setattr(self._analytical_engine_process, "stderr_watcher", stderr_watcher)
time.sleep(2) # TODO: monitor engine process instead of sleep
time.sleep(4) # TODO: monitor engine process instead of sleep

def _delete_dangling_coordinator(self):
# delete service
Expand Down
44 changes: 42 additions & 2 deletions interactive_engine/executor/engine/pegasus/pegasus/src/result.rs
Expand Up @@ -8,6 +8,7 @@ use dyn_clonable::*;

use crate::api::function::FnResult;
use crate::api::FromStream;
use crate::errors::JobExecError;

#[clonable]
pub trait FromStreamExt<T>: FromStream<T> + Clone {
Expand Down Expand Up @@ -46,6 +47,20 @@ impl<T: 'static> ResultSink<T> {
&self.cancel
}

pub fn set_cancel_hook(&mut self, is_canceled: bool) {
self.cancel.store(is_canceled, Ordering::SeqCst);
if is_canceled {
match &mut self.kind {
ResultSinkKind::Customized(tx) => {
let msg = "Job is canceled".to_string();
let err = JobExecError::from(msg);
tx.on_error(Box::new(err));
}
_ => (),
}
}
}

pub fn on_error<E: std::error::Error + Send + 'static>(&mut self, error: E) {
match &mut self.kind {
ResultSinkKind::Default(tx) => {
Expand Down Expand Up @@ -131,14 +146,31 @@ impl<T> ResultStream<T> {
self.is_poison.load(Ordering::SeqCst)
}

#[inline]
pub fn is_cancel(&self) -> bool {
self.cancel_hook.load(Ordering::SeqCst)
}

fn report_cancel(&self) -> Option<Result<T, Box<dyn Error + Send>>> {
let err_msg = "Job is canceled;".to_owned();
let err: Box<dyn Error + Send + Sync> = err_msg.into();
Some(Err(err))
}

fn pull_next(&mut self) -> Option<Result<T, Box<dyn Error + Send>>> {
if self.is_exhaust.load(Ordering::SeqCst) {
if self.is_cancel() {
return self.report_cancel();
}
return None;
}

if self.is_poison.load(Ordering::SeqCst) {
let err_msg = "ResultSteam is poison because error already occurred;".to_owned();
let err: Box<dyn Error + Send + Sync> = err_msg.into();
if self.is_cancel() {
return self.report_cancel();
}
return Some(Err(err as Box<dyn Error + Send>));
}

Expand All @@ -147,11 +179,19 @@ impl<T> ResultStream<T> {
Ok(Ok(res)) => Some(Ok(res)),
Ok(Err(e)) => {
self.is_poison.store(true, Ordering::SeqCst);
Some(Err(e))
if self.is_cancel() {
self.report_cancel()
} else {
Some(Err(e))
}
}
Err(_) => {
self.is_exhaust.store(true, Ordering::SeqCst);
None
if self.is_cancel() {
self.report_cancel()
} else {
None
}
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs
Expand Up @@ -119,8 +119,15 @@ impl<D: Data, T: Debug + Send + 'static> Worker<D, T> {
}

fn check_cancel(&self) -> bool {
// TODO: check cancel impl;
false
if self.conf.time_limit > 0 {
let elapsed = self.start.elapsed().as_millis() as u64;
if elapsed >= self.conf.time_limit {
return true;
}
}
self.sink
.get_cancel_hook()
.load(Ordering::SeqCst)
}

fn release(&mut self) {
Expand Down Expand Up @@ -211,6 +218,7 @@ impl<D: Data, T: Debug + Send + 'static> Task for Worker<D, T> {
fn execute(&mut self) -> TaskState {
let _g = crate::worker_id::guard(self.id);
if self.check_cancel() {
self.sink.set_cancel_hook(true);
return TaskState::Finished;
}

Expand Down Expand Up @@ -239,6 +247,7 @@ impl<D: Data, T: Debug + Send + 'static> Task for Worker<D, T> {
fn check_ready(&mut self) -> TaskState {
let _g = crate::worker_id::guard(self.id);
if self.check_cancel() {
self.sink.set_cancel_hook(true);
return TaskState::Finished;
}

Expand Down
Expand Up @@ -14,7 +14,7 @@
//! limitations under the License.
//

use pegasus::api::{Collect, CorrelatedSubTask, Count, Filter, Fold, FoldByKey, KeyBy, Map, Sink};
use pegasus::api::{Collect, CorrelatedSubTask, Count, Filter, Fold, FoldByKey, KeyBy, Map, Sink, SortBy};
use pegasus::JobConf;

#[test]
Expand Down
@@ -0,0 +1,81 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.

use pegasus::api::{IterCondition, Iteration, Map, Sink};
use pegasus::JobConf;
use std::time::Duration;

/// test binary merge pipeline;
#[test]
fn timeout_test_01() {
let mut conf = JobConf::new("timeout_test_01");
conf.time_limit = 5000;
let mut result = pegasus::run(conf, || {
|input, output| {
input
.input_from(vec![0u32])?
.iterate_until(IterCondition::max_iters(20), |iter| {
iter.map(|input| {
std::thread::sleep(Duration::from_millis(1000));
Ok(input + 1)
})
})?
.sink_into(output)
}
})
.expect("submit job failure;");

let mut count = 0;
while let Some(Ok(data)) = result.next() {
count += data;
}
assert!(result.is_cancel());
assert_eq!(0, count);
}

#[test]
fn timeout_test_02() {
let mut conf = JobConf::new("timeout_test_2");
conf.time_limit = 5000;
conf.set_workers(2);
let mut results = pegasus::run(conf, || {
|input, output| {
let worker_id = input.get_worker_index();
input
.input_from(vec![0u32])?
.iterate_until(IterCondition::max_iters(20), move |iter| {
iter.map(move |input| {
if worker_id == 1 {
std::thread::sleep(Duration::from_millis(1000));
}
Ok(input + 1)
})
})?
.sink_into(output)
}
})
.expect("submit job failure;");
let mut count = 0;
while let Some(result) = results.next() {
if let Ok(data) = result {
count += data;
} else {
let err = result.err().unwrap();
assert_eq!(err.to_string(), "Job is canceled;".to_string());
break;
}
}
assert!(results.is_cancel());
}
Expand Up @@ -44,10 +44,10 @@ heartbeat_sec = 5
# If the cluster is standalone, the size of addresses should be equal to [server_size] set above, and the addresses
# should be in order, the fisrt address would be server 0.
[[network.servers]]
ip = '192.168.1.1'
hostname = '192.168.1.1'
port = 8080

[[network.servers]]
ip = '192.168.1.2'
hostname = '192.168.1.2'
port = 8080

0 comments on commit f983bd6

Please sign in to comment.