Skip to content

Commit

Permalink
unit test for pegasus timeout (#2964)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

## Related issue number

#2857

Fixes

---------

Co-authored-by: Longbin Lai <longbin.lailb@alibaba-inc.com>
  • Loading branch information
lnfjpt and longbinlai committed Jul 13, 2023
1 parent b1e1891 commit de8d150
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 40 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/gaia.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ jobs:
run: |
cd ${GITHUB_WORKSPACE}/interactive_engine/executor && ./check_format.sh
- name: Build Pegasus and Test
run: |
echo $(pwd)
cd ${GITHUB_WORKSPACE}/interactive_engine/executor/engine/pegasus
cargo build --verbose
cargo test --verbose
- name: Build Ir on Experimental Store and Csr Store
run: |
cd ${GITHUB_WORKSPACE}/interactive_engine/compiler
Expand Down
78 changes: 39 additions & 39 deletions .github/workflows/pegasus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ on:
workflow_dispatch:
push:
branches:
- gaia-x
- gaia
paths:
- 'research/engine/pegasus/**'
- 'interactive_engine/executor/engine/pegasus/**'
- '.github/workflows/pegasus.yml'

pull_request:
branches:
- gaia-x
- gaia
paths:
- 'research/engine/pegasus/**'
- 'interactive_engine/executor/engine/pegasus/**'
- '.github/workflows/pegasus.yml'

env:
Expand All @@ -28,7 +28,7 @@ jobs:
- name: Build & Test
run: |
echo $(pwd)
cd research/engine/pegasus
cd interactive_engine/executor/engine/pegasus
cargo build --verbose
cargo test --verbose
Expand All @@ -48,37 +48,37 @@ jobs:
# Run logistic regression
target/release/examples/logistic_regression --data pegasus/examples/data/binary.csv
k8s-test:
runs-on: [self-hosted, ubuntu2004]
steps:
- uses: actions/checkout@v3

- name: Detect the tmate session
run: |
if grep -v "grep" .github/workflows/pegasus.yml | grep "action-tmate"; then
echo 'WARNING!!!the self-hosted machine can not run tmate session, please debug it manually'
exit 1
fi
- name : Prepare Image
run: |
docker pull registry-vpc.cn-hongkong.aliyuncs.com/graphscope/pegasus-base:latest
docker tag registry-vpc.cn-hongkong.aliyuncs.com/graphscope/pegasus-base:latest \
registry.cn-hongkong.aliyuncs.com/graphscope/pegasus-base:latest
docker build -t registry.cn-hongkong.aliyuncs.com/graphscope/pegasus:${{ github.sha }} \
--network=host \
-f research/engine/pegasus/test/k8s/pegasus.Dockerfile .
echo " - name: CONFIG_MAP_NAME" >> research/engine/pegasus/test/k8s/pegasus-set.yaml
echo " value: \"${{ github.sha }}\"" >> research/engine/pegasus/test/k8s/pegasus-set.yaml
echo " image: registry.cn-hongkong.aliyuncs.com/graphscope/pegasus:${{ github.sha }}" >> research/engine/pegasus/test/k8s/pegasus-set.yaml
- name: Start k8s cluster
run: |
minikube start --base-image='registry-vpc.cn-hongkong.aliyuncs.com/graphscope/kicbase:v0.0.30' \
--cpus='12' --memory='32000mb' --disk-size='40000mb'
minikube image load registry.cn-hongkong.aliyuncs.com/graphscope/pegasus:${{ github.sha }}
kubectl get ns pegasus-ci || kubectl create ns pegasus-ci
kubectl --namespace pegasus-ci create -f research/engine/pegasus/test/k8s/role_binding.yaml
kubectl --namespace pegasus-ci create -f research/engine/pegasus/test/k8s/pegasus-set.yaml
bash research/engine/pegasus/test/k8s/read_result.sh ${{ github.sha }}
# k8s-test:
# runs-on: [self-hosted, ubuntu2004]
# steps:
# - uses: actions/checkout@v3
#
# - name: Detect the tmate session
# run: |
# if grep -v "grep" .github/workflows/pegasus.yml | grep "action-tmate"; then
# echo 'WARNING!!!the self-hosted machine can not run tmate session, please debug it manually'
# exit 1
# fi
#
# - name : Prepare Image
# run: |
# docker pull registry-vpc.cn-hongkong.aliyuncs.com/graphscope/pegasus-base:latest
# docker tag registry-vpc.cn-hongkong.aliyuncs.com/graphscope/pegasus-base:latest \
# registry.cn-hongkong.aliyuncs.com/graphscope/pegasus-base:latest
# docker build -t registry.cn-hongkong.aliyuncs.com/graphscope/pegasus:${{ github.sha }} \
# --network=host \
# -f research/engine/pegasus/test/k8s/pegasus.Dockerfile .
# echo " - name: CONFIG_MAP_NAME" >> research/engine/pegasus/test/k8s/pegasus-set.yaml
# echo " value: \"${{ github.sha }}\"" >> research/engine/pegasus/test/k8s/pegasus-set.yaml
# echo " image: registry.cn-hongkong.aliyuncs.com/graphscope/pegasus:${{ github.sha }}" >> research/engine/pegasus/test/k8s/pegasus-set.yaml
#
# - name: Start k8s cluster
# run: |
# minikube start --base-image='registry-vpc.cn-hongkong.aliyuncs.com/graphscope/kicbase:v0.0.30' \
# --cpus='12' --memory='32000mb' --disk-size='40000mb'
# minikube image load registry.cn-hongkong.aliyuncs.com/graphscope/pegasus:${{ github.sha }}
#
# kubectl get ns pegasus-ci || kubectl create ns pegasus-ci
# kubectl --namespace pegasus-ci create -f research/engine/pegasus/test/k8s/role_binding.yaml
# kubectl --namespace pegasus-ci create -f research/engine/pegasus/test/k8s/pegasus-set.yaml
# bash research/engine/pegasus/test/k8s/read_result.sh ${{ github.sha }}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl Decode for Entry {
}
}

#[ignore]
#[test]
fn ipc_test() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ mod test {
channel_test(1, 2, 0, &ServerConf::Local);
}

#[ignore]
#[test]
fn test_channel_between_2_servers() {
pegasus_common::logs::init_log();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use std::time::Duration;

use pegasus::api::{IterCondition, Iteration, Map, Sink};
use pegasus::api::{Collect, IterCondition, Iteration, Map, Sink};
use pegasus::JobConf;

/// test binary merge pipeline;
Expand Down Expand Up @@ -80,3 +80,56 @@ fn timeout_test_02() {
}
assert!(results.is_cancel());
}

#[test]
fn timeout_resubmit_test() {
let mut conf = JobConf::new("timeout_test");
conf.time_limit = 5000;
conf.set_workers(2);
let mut results = pegasus::run(conf, || {
|input, output| {
let worker_id = input.get_worker_index();
input
.input_from(vec![0u32])?
.iterate_until(IterCondition::max_iters(20), move |iter| {
iter.map(move |input| {
if worker_id == 1 {
std::thread::sleep(Duration::from_millis(1000));
}
Ok(input + 1)
})
})?
.sink_into(output)
}
})
.expect("submit job failure;");
let mut count = 0;
while let Some(result) = results.next() {
if let Ok(data) = result {
count += data;
} else {
let err = result.err().unwrap();
assert_eq!(err.to_string(), "Job is canceled;".to_string());
break;
}
}
assert!(results.is_cancel());
let mut conf = JobConf::new("resubmit_test");
conf.time_limit = 5000;
conf.set_workers(2);
let mut results = pegasus::run(conf, || {
|input, output| {
input
.input_from(vec![0u32])?
.iterate_until(IterCondition::max_iters(20), move |iter| {
iter.map(move |input| Ok(input + 1))
})?
.collect::<Vec<u32>>()?
.sink_into(output)
}
})
.expect("submit job failure;");
let mut result = results.next().unwrap().unwrap();
result.sort();
assert_eq!(result, [20, 20]);
}

0 comments on commit de8d150

Please sign in to comment.