From 1611f25ba6e0f5989c4154d7a3e96ead8ad0ac84 Mon Sep 17 00:00:00 2001 From: Deepak Arora Date: Tue, 3 Oct 2023 12:45:56 -0400 Subject: [PATCH] parallel_in_parallel_pend_fix --- pom.xml | 4 +- readme.md | 2 +- .../unify/flowret/ExecThreadTask.java | 22 ++-- .../unify/flowret/ProcessInfo.java | 2 +- .../TestComponentFactoryPidp.java | 40 +++++++ .../TestFlowretPidp.java | 106 ++++++++++++++++++ .../TestRulePidp.java | 71 ++++++++++++ .../TestStepPidp.java | 48 ++++++++ src/test/resources/flowret/pidp_test.json | 90 +++++++++++++++ 9 files changed, 366 insertions(+), 19 deletions(-) create mode 100644 src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestComponentFactoryPidp.java create mode 100644 src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestFlowretPidp.java create mode 100644 src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestRulePidp.java create mode 100644 src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestStepPidp.java create mode 100644 src/test/resources/flowret/pidp_test.json diff --git a/pom.xml b/pom.xml index 9cb70cc..c314beb 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ com.americanexpress.unify.flowret unify-flowret - 1.4.3 + 1.4.4 jar unify-flowret @@ -75,7 +75,7 @@ com.americanexpress.unify.jdocs unify-jdocs - 1.4.0 + 1.5.0 diff --git a/readme.md b/readme.md index 65c7826..8b8a85d 100644 --- a/readme.md +++ b/readme.md @@ -20,7 +20,7 @@ Flowret is available as a jar file in Maven central with the following Maven coo ````pom com.americanexpress.unify.flowret unify-flowret -1.4.3 +1.4.4 ```` --- diff --git a/src/main/java/com/americanexpress/unify/flowret/ExecThreadTask.java b/src/main/java/com/americanexpress/unify/flowret/ExecThreadTask.java index f1524c7..d5ff46a 100644 --- a/src/main/java/com/americanexpress/unify/flowret/ExecThreadTask.java +++ b/src/main/java/com/americanexpress/unify/flowret/ExecThreadTask.java @@ -773,24 +773,16 @@ private String executeThreads(ExecPath parentExecPath, Route route, List // run threads and wait for them to finish runThreads(tasks); - // check if all have completed - boolean isPend = false; - String joinPoint = null; - for (int i = 0; i < tasks.length; i++) { - ExecThreadTask in = tasks[i]; + // we now check if there has been a pend in any of the levels that this thread has called. A pend + // can not only occur in one of the threads created by this thread but can exist many levels deep + if (pi.getPendExecPath().isEmpty() == true) { + // no pend occurred -> return the join point + ExecThreadTask in = tasks[0]; ExecPath ep = in.execPath; - joinPoint = ep.getStep(); - - if (ep.getPendWorkBasket().isEmpty() == false) { - isPend = true; - break; - } - } - - if (isPend == false) { - return joinPoint; + return ep.getStep(); } else { + // a pend occurred somewhere in the hierarchy return null; } } diff --git a/src/main/java/com/americanexpress/unify/flowret/ProcessInfo.java b/src/main/java/com/americanexpress/unify/flowret/ProcessInfo.java index 3f34359..0a131b2 100644 --- a/src/main/java/com/americanexpress/unify/flowret/ProcessInfo.java +++ b/src/main/java/com/americanexpress/unify/flowret/ProcessInfo.java @@ -54,7 +54,7 @@ public class ProcessInfo { // this variable is populated while writing and is for trouble shooting and information only // this variable is not used while reading the document - private Unit lastUnitExecuted = null; + private volatile Unit lastUnitExecuted = null; protected volatile boolean isPendAtSameStep = false; diff --git a/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestComponentFactoryPidp.java b/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestComponentFactoryPidp.java new file mode 100644 index 0000000..6b860ff --- /dev/null +++ b/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestComponentFactoryPidp.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020 American Express Travel Related Services Company, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.americanexpress.unify.flowret.test_parallel_in_dyn_parallel; + +import com.americanexpress.unify.flowret.ProcessComponentFactory; +import com.americanexpress.unify.flowret.ProcessContext; +import com.americanexpress.unify.flowret.UnitType; + +/* + * @author Deepak Arora + */ +public class TestComponentFactoryPidp implements ProcessComponentFactory { + + @Override + public Object getObject(ProcessContext pc) { + Object o = null; + + if ((pc.getCompType() == UnitType.S_ROUTE) || (pc.getCompType() == UnitType.P_ROUTE) || (pc.getCompType() == UnitType.P_ROUTE_DYNAMIC)) { + o = new TestRulePidp(pc); + } + else if (pc.getCompType() == UnitType.STEP) { + o = new TestStepPidp(pc); + } + + return o; + } + +} diff --git a/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestFlowretPidp.java b/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestFlowretPidp.java new file mode 100644 index 0000000..a99eea9 --- /dev/null +++ b/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestFlowretPidp.java @@ -0,0 +1,106 @@ +/* + * Copyright 2020 American Express Travel Related Services Company, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.americanexpress.unify.flowret.test_parallel_in_dyn_parallel; + +import com.americanexpress.unify.base.BaseUtils; +import com.americanexpress.unify.base.UnifyException; +import com.americanexpress.unify.flowret.*; +import org.junit.jupiter.api.*; + +import java.io.File; + +/* + * @author Deepak Arora + */ +public class TestFlowretPidp { + + private static String dirPath = "./target/test-data-results/"; + private static Rts rts = null; + + @BeforeAll + protected static void setEnv() throws Exception { + File directory = new File(dirPath); + if (!directory.exists()) { + directory.mkdir(); + } + + ERRORS_FLOWRET.load(); + Flowret.init(10, 30000, "-"); + } + + @BeforeEach + protected void beforeEach() { + TestUtils.deleteFiles(dirPath); + StepResponseFactory.clear(); + } + + @AfterEach + protected void afterEach() { + // nothing to do + } + + @AfterAll + protected static void afterAll() { + Flowret.instance().close(); + } + + // happy path + public static void setScenario0() { + // nothing to do + } + + // pend in a step in dynamic parallel route scope + public static void setScenario1() { + StepResponseFactory.addResponse("r2_1_s1", UnitResponseType.ERROR_PEND, "tech", ""); + } + + private static void runJourney(String journey) { + String json = BaseUtils.getResourceAsString(TestFlowretPidp.class, "/flowret/" + journey + ".json"); + + if (new File(dirPath + "flowret_journey-3.json ").exists() == false) { + rts.startCase("3", json, null, null); + } + + try { + while (true) { + System.out.println(); + rts.resumeCase("3"); + } + } + catch (UnifyException e) { + System.out.println("Exception -> " + e.getMessage()); + } + + } + + private static void init(FlowretDao dao, ProcessComponentFactory factory, EventHandler handler, ISlaQueueManager sqm) { + rts = Flowret.instance().getRunTimeService(dao, factory, handler, sqm); + } + + @Test + void testScenario0() { + setScenario0(); + init(new FileDao(dirPath), new TestComponentFactoryPidp(), new TestHandler(), null); + runJourney("pidp_test"); + } + + @Test + void testScenario1() { + setScenario1(); + init(new FileDao(dirPath), new TestComponentFactoryPidp(), new TestHandler(), null); + runJourney("pidp_test"); + } + +} diff --git a/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestRulePidp.java b/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestRulePidp.java new file mode 100644 index 0000000..aab5ea5 --- /dev/null +++ b/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestRulePidp.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020 American Express Travel Related Services Company, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.americanexpress.unify.flowret.test_parallel_in_dyn_parallel; + +import com.americanexpress.unify.base.BaseUtils; +import com.americanexpress.unify.flowret.InvokableRoute; +import com.americanexpress.unify.flowret.ProcessContext; +import com.americanexpress.unify.flowret.RouteResponse; +import com.americanexpress.unify.flowret.UnitResponseType; + +import java.util.ArrayList; +import java.util.List; + +/* + * @author Deepak Arora + */ +public class TestRulePidp implements InvokableRoute { + + private String name = null; + private ProcessContext pc = null; + + public TestRulePidp(ProcessContext pc) { + this.name = pc.getCompName(); + this.pc = pc; + } + + public String getName() { + return name; + } + + public RouteResponse executeRoute() { + List branches = new ArrayList<>(); + RouteResponse resp = null; + String name = pc.getCompName(); + + while (true) { + if (BaseUtils.compareWithMany(name, "r1_c")) { + branches.add("1"); + branches.add("2"); + branches.add("3"); + break; + } + + if (BaseUtils.compareWithMany(name, "r2_c")) { + branches.add("1"); + branches.add("2"); + branches.add("3"); + break; + } + + break; + } + + resp = new RouteResponse(UnitResponseType.OK_PROCEED, branches, null); + + return resp; + } + +} diff --git a/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestStepPidp.java b/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestStepPidp.java new file mode 100644 index 0000000..f9585ae --- /dev/null +++ b/src/test/java/com/americanexpress/unify/flowret/test_parallel_in_dyn_parallel/TestStepPidp.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020 American Express Travel Related Services Company, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.americanexpress.unify.flowret.test_parallel_in_dyn_parallel; + +import com.americanexpress.unify.base.BaseUtils; +import com.americanexpress.unify.flowret.*; + +/* + * @author Deepak Arora + */ +public class TestStepPidp implements InvokableStep { + + private String name = null; + private ProcessContext pc = null; + + public TestStepPidp(ProcessContext pc) { + this.name = pc.getCompName(); + this.pc = pc; + } + + public String getName() { + return name; + } + + public StepResponse executeStep() { + String stepName = pc.getStepName(); + TestStepResponse tsr = StepResponseFactory.getResponse(stepName); + StepResponse sr = tsr.getStepResponse(); + long delay = tsr.getDelay(); + if (delay > 0) { + BaseUtils.sleep(delay); + } + return sr; + } + +} diff --git a/src/test/resources/flowret/pidp_test.json b/src/test/resources/flowret/pidp_test.json new file mode 100644 index 0000000..2c01aa0 --- /dev/null +++ b/src/test/resources/flowret/pidp_test.json @@ -0,0 +1,90 @@ +{ + "journey": { + "name": "parallel_in_dyn_parallel_test", + "tickets": [ + { + "name": "reset", + "step": "start" + } + ], + "process_variables": [ + { + "name": "pcn", + "type": "string", + "value": "pcn1" + } + ], + "flow": [ + { + "name": "start", + "component": "s1_c", + "next": "r1" + }, + { + "name": "r1", + "type": "p_route_dynamic", + "component": "r1_c", + "next": "r1_s1" + }, + { + "name": "r1_s1", + "component": "r1_s1_c", + "next": "r2" + }, + { + "name": "r2", + "type": "p_route", + "component": "r2_c", + "branches": [ + { + "name": "1", + "next": "r2_1_s1" + }, + { + "name": "2", + "next": "r2_2_s1" + }, + { + "name": "3", + "next": "r2_3_s1" + } + ] + }, + { + "name": "r2_1_s1", + "component": "r2_1_s1_c", + "next": "j2" + }, + { + "name": "r2_2_s1", + "component": "r2_2_s1_c", + "next": "j2" + }, + { + "name": "r2_3_s1", + "component": "r2_3_s1_c", + "next": "j2" + }, + { + "name": "j2", + "type": "p_join", + "next": "r1_s2" + }, + { + "name": "r1_s2", + "component": "r1_s2_c", + "next": "j1" + }, + { + "name": "j1", + "type": "p_join", + "next": "s2" + }, + { + "name": "s2", + "component": "s2_c", + "next": "end" + } + ] + } +}