Skip to content

Commit

Permalink
parallel_in_parallel_pend_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
deepakarora3 committed Oct 3, 2023
1 parent bf38f3d commit 1611f25
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 19 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

<groupId>com.americanexpress.unify.flowret</groupId>
<artifactId>unify-flowret</artifactId>
<version>1.4.3</version>
<version>1.4.4</version>
<packaging>jar</packaging>

<name>unify-flowret</name>
Expand Down Expand Up @@ -75,7 +75,7 @@
<dependency>
<groupId>com.americanexpress.unify.jdocs</groupId>
<artifactId>unify-jdocs</artifactId>
<version>1.4.0</version>
<version>1.5.0</version>
</dependency>

<!-- test dependencies -->
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Flowret is available as a jar file in Maven central with the following Maven coo
````pom
<groupId>com.americanexpress.unify.flowret</groupId>
<artifactId>unify-flowret</artifactId>
<version>1.4.3</version>
<version>1.4.4</version>
````

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,24 +773,16 @@ private String executeThreads(ExecPath parentExecPath, Route route, List<String>
// run threads and wait for them to finish
runThreads(tasks);

// check if all have completed
boolean isPend = false;
String joinPoint = null;
for (int i = 0; i < tasks.length; i++) {
ExecThreadTask in = tasks[i];
// we now check if there has been a pend in any of the levels that this thread has called. A pend
// can not only occur in one of the threads created by this thread but can exist many levels deep
if (pi.getPendExecPath().isEmpty() == true) {
// no pend occurred -> return the join point
ExecThreadTask in = tasks[0];
ExecPath ep = in.execPath;
joinPoint = ep.getStep();

if (ep.getPendWorkBasket().isEmpty() == false) {
isPend = true;
break;
}
}

if (isPend == false) {
return joinPoint;
return ep.getStep();
}
else {
// a pend occurred somewhere in the hierarchy
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ProcessInfo {

// this variable is populated while writing and is for trouble shooting and information only
// this variable is not used while reading the document
private Unit lastUnitExecuted = null;
private volatile Unit lastUnitExecuted = null;

protected volatile boolean isPendAtSameStep = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2020 American Express Travel Related Services Company, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.americanexpress.unify.flowret.test_parallel_in_dyn_parallel;

import com.americanexpress.unify.flowret.ProcessComponentFactory;
import com.americanexpress.unify.flowret.ProcessContext;
import com.americanexpress.unify.flowret.UnitType;

/*
* @author Deepak Arora
*/
public class TestComponentFactoryPidp implements ProcessComponentFactory {

@Override
public Object getObject(ProcessContext pc) {
Object o = null;

if ((pc.getCompType() == UnitType.S_ROUTE) || (pc.getCompType() == UnitType.P_ROUTE) || (pc.getCompType() == UnitType.P_ROUTE_DYNAMIC)) {
o = new TestRulePidp(pc);
}
else if (pc.getCompType() == UnitType.STEP) {
o = new TestStepPidp(pc);
}

return o;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2020 American Express Travel Related Services Company, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.americanexpress.unify.flowret.test_parallel_in_dyn_parallel;

import com.americanexpress.unify.base.BaseUtils;
import com.americanexpress.unify.base.UnifyException;
import com.americanexpress.unify.flowret.*;
import org.junit.jupiter.api.*;

import java.io.File;

/*
* @author Deepak Arora
*/
public class TestFlowretPidp {

private static String dirPath = "./target/test-data-results/";
private static Rts rts = null;

@BeforeAll
protected static void setEnv() throws Exception {
File directory = new File(dirPath);
if (!directory.exists()) {
directory.mkdir();
}

ERRORS_FLOWRET.load();
Flowret.init(10, 30000, "-");
}

@BeforeEach
protected void beforeEach() {
TestUtils.deleteFiles(dirPath);
StepResponseFactory.clear();
}

@AfterEach
protected void afterEach() {
// nothing to do
}

@AfterAll
protected static void afterAll() {
Flowret.instance().close();
}

// happy path
public static void setScenario0() {
// nothing to do
}

// pend in a step in dynamic parallel route scope
public static void setScenario1() {
StepResponseFactory.addResponse("r2_1_s1", UnitResponseType.ERROR_PEND, "tech", "");
}

private static void runJourney(String journey) {
String json = BaseUtils.getResourceAsString(TestFlowretPidp.class, "/flowret/" + journey + ".json");

if (new File(dirPath + "flowret_journey-3.json ").exists() == false) {
rts.startCase("3", json, null, null);
}

try {
while (true) {
System.out.println();
rts.resumeCase("3");
}
}
catch (UnifyException e) {
System.out.println("Exception -> " + e.getMessage());
}

}

private static void init(FlowretDao dao, ProcessComponentFactory factory, EventHandler handler, ISlaQueueManager sqm) {
rts = Flowret.instance().getRunTimeService(dao, factory, handler, sqm);
}

@Test
void testScenario0() {
setScenario0();
init(new FileDao(dirPath), new TestComponentFactoryPidp(), new TestHandler(), null);
runJourney("pidp_test");
}

@Test
void testScenario1() {
setScenario1();
init(new FileDao(dirPath), new TestComponentFactoryPidp(), new TestHandler(), null);
runJourney("pidp_test");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2020 American Express Travel Related Services Company, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.americanexpress.unify.flowret.test_parallel_in_dyn_parallel;

import com.americanexpress.unify.base.BaseUtils;
import com.americanexpress.unify.flowret.InvokableRoute;
import com.americanexpress.unify.flowret.ProcessContext;
import com.americanexpress.unify.flowret.RouteResponse;
import com.americanexpress.unify.flowret.UnitResponseType;

import java.util.ArrayList;
import java.util.List;

/*
* @author Deepak Arora
*/
public class TestRulePidp implements InvokableRoute {

private String name = null;
private ProcessContext pc = null;

public TestRulePidp(ProcessContext pc) {
this.name = pc.getCompName();
this.pc = pc;
}

public String getName() {
return name;
}

public RouteResponse executeRoute() {
List<String> branches = new ArrayList<>();
RouteResponse resp = null;
String name = pc.getCompName();

while (true) {
if (BaseUtils.compareWithMany(name, "r1_c")) {
branches.add("1");
branches.add("2");
branches.add("3");
break;
}

if (BaseUtils.compareWithMany(name, "r2_c")) {
branches.add("1");
branches.add("2");
branches.add("3");
break;
}

break;
}

resp = new RouteResponse(UnitResponseType.OK_PROCEED, branches, null);

return resp;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2020 American Express Travel Related Services Company, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.americanexpress.unify.flowret.test_parallel_in_dyn_parallel;

import com.americanexpress.unify.base.BaseUtils;
import com.americanexpress.unify.flowret.*;

/*
* @author Deepak Arora
*/
public class TestStepPidp implements InvokableStep {

private String name = null;
private ProcessContext pc = null;

public TestStepPidp(ProcessContext pc) {
this.name = pc.getCompName();
this.pc = pc;
}

public String getName() {
return name;
}

public StepResponse executeStep() {
String stepName = pc.getStepName();
TestStepResponse tsr = StepResponseFactory.getResponse(stepName);
StepResponse sr = tsr.getStepResponse();
long delay = tsr.getDelay();
if (delay > 0) {
BaseUtils.sleep(delay);
}
return sr;
}

}

0 comments on commit 1611f25

Please sign in to comment.