From 8e11d5afcf3e7525ebe2e54ce711740d04d156a6 Mon Sep 17 00:00:00 2001 From: yyc <395976266@qq.com> Date: Mon, 14 Sep 2020 17:03:18 +0800 Subject: [PATCH] fix(pipeline): now if one job in the pipeline stream err, it will trigger the stream's error handler --- .../service/ParseDataPipelineDoService.re | 4 -- .../integration/pipeline/pipeline_api_test.re | 38 +++++++++++++++++++ test/construct/tool/ExpectStreamTool.re | 25 ++++++++++++ test/construct/tool/PipelineTool.re | 14 ++++++- 4 files changed, 75 insertions(+), 6 deletions(-) diff --git a/src/construct/domain_layer/domain/pipeline/pipeline/service/ParseDataPipelineDoService.re b/src/construct/domain_layer/domain/pipeline/pipeline/service/ParseDataPipelineDoService.re index bf09d6125b..30c1cd1565 100755 --- a/src/construct/domain_layer/domain/pipeline/pipeline/service/ParseDataPipelineDoService.re +++ b/src/construct/domain_layer/domain/pipeline/pipeline/service/ParseDataPipelineDoService.re @@ -62,10 +62,6 @@ let parse = ({name, groups, firstGroup}) => { pipelineStream ->Obj.magic ->WonderBsMost.Most.map(_ => Result.succeed(), _) - ->WonderBsMost.Most.recoverWith( - err => {WonderBsMost.Most.just(err->Result.fail)}, - _, - ) }) }) ->Result.mapSuccess(Tuple2.create(PipelineEntity.create(name))); diff --git a/test/construct/integration/pipeline/pipeline_api_test.re b/test/construct/integration/pipeline/pipeline_api_test.re index 53439927cd..f602ba909f 100755 --- a/test/construct/integration/pipeline/pipeline_api_test.re +++ b/test/construct/integration/pipeline/pipeline_api_test.re @@ -172,5 +172,43 @@ let _ = ~message, ); }); + testPromise("if one job fail, then not exec the remain jobs", () => { + let value = ref(0); + let message = "fail"; + PipelineTool.registerJobs( + ~jobs=[ + _createJob(~jobName="fail", ~execFunc=() => { + Result.failWith(message)->WonderBsMost.Most.just + }), + _createJob(~jobName="do", ~execFunc=() => { + value := 10; + Result.succeed()->WonderBsMost.Most.just; + }), + ], + (), + ); + + ExpectStreamTool.testAfterFail( + ~execFunc= + _execPipelineStream( + ~pipelineData={ + name: "init", + firstGroup: "frame", + groups: [ + { + name: "frame", + link: Concat, + elements: [ + {name: "fail", type_: Job}, + {name: "do", type_: Job}, + ], + }, + ], + }: PipelineVOType.pipelineData, + ), + ~handleFunc=errMessage => { + (errMessage, value^)->expect == (message, 0) + }); + }); }); }); diff --git a/test/construct/tool/ExpectStreamTool.re b/test/construct/tool/ExpectStreamTool.re index a0144ac342..04a715b671 100755 --- a/test/construct/tool/ExpectStreamTool.re +++ b/test/construct/tool/ExpectStreamTool.re @@ -23,3 +23,28 @@ let toFail = (), ); }; + +let testAfterFail = + ( + ~execFunc: + ( + ~handleSuccessFunc: unit => 'a, + ~handleFailFunc: Js.Exn.t => unit=?, + unit + ) => + Js.Promise.t('a), + ~handleFunc, + ) => { + open Wonder_jest; + open Expect; + open! Expect.Operators; + + let resultMessage = ref(""); + + execFunc( + ~handleFailFunc= + err => {resultMessage := err->Js.Exn.message->OptionSt.getExn}, + ~handleSuccessFunc=() => {handleFunc(resultMessage^)}, + (), + ); +}; diff --git a/test/construct/tool/PipelineTool.re b/test/construct/tool/PipelineTool.re index aade8b7b26..b4dc286e8e 100755 --- a/test/construct/tool/PipelineTool.re +++ b/test/construct/tool/PipelineTool.re @@ -13,13 +13,23 @@ let execPipelineStream = (), ) => { pipelineStream + ->WonderBsMost.Most.recoverWith( + err => {WonderBsMost.Most.just(err->Result.fail)}, + _, + ) ->WonderBsMost.Most.tap( result => {result->Result.handleFail(handleFailFunc)->ignore}, _, ) ->WonderBsMost.Most.drain - ->Js.Promise.then_(() => handleSuccessFunc()->Js.Promise.resolve, _) - ->Js.Promise.catch(e => {Js.Promise.reject(e->Obj.magic)}, _); + ->Js.Promise.then_( + () => + { + handleSuccessFunc(); + } + ->Js.Promise.resolve, + _, + ); }; let buildEmptyPipelineData = (): PipelineVOType.pipelineData => {