Skip to content

Commit

Permalink
wms_pend_eor_fix
Browse files Browse the repository at this point in the history
crash_recovery_fix
pause_fix
  • Loading branch information
deepakarora3 committed Aug 9, 2023
1 parent fcff113 commit bf38f3d
Show file tree
Hide file tree
Showing 17 changed files with 784 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

<groupId>com.americanexpress.unify.flowret</groupId>
<artifactId>unify-flowret</artifactId>
<version>1.4.2</version>
<version>1.4.3</version>
<packaging>jar</packaging>

<name>unify-flowret</name>
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Flowret is available as a jar file in Maven central with the following Maven coo
````pom
<groupId>com.americanexpress.unify.flowret</groupId>
<artifactId>unify-flowret</artifactId>
<version>1.4.2</version>
<version>1.4.3</version>
````

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public class ExecPath {
private String prevPendWorkBasket = "";

// To Be Cleared sla work basket. This will contain the name of the work basket for which SLA milestones
// are to be cleared in case we receive an ok_pend_eor response
// are to be cleared in case we receive an ok_pend_eor response and there is an error_pend after that. Then
// when the application is resumed, we would like the original work basket where the application had
// pended to fire the dequeue SLA when the error is cleared
private String tbcSlaWorkBasket = "";

// this contains the response type return from the last step or route executed by this execution path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ private void processPause(Pause pause) {
logger.info("Case id -> " + pi.getCaseId() + ", executing pause step -> " + pause.getName() + ", execution path -> " + execPath.getName());
try {
pi.getLock().lock();
execPath.set(pause.getName(), UnitResponseType.OK_PEND);
execPath.set(ExecPathStatus.COMPLETED, pause.getName(), UnitResponseType.OK_PEND);
execPath.setPendWorkBasket("flowret_pause");
pi.getSetter().setPendExecPath(execPath.getName());
}
Expand Down
74 changes: 63 additions & 11 deletions src/main/java/com/americanexpress/unify/flowret/Rts.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected Rts(FlowretDao dao, ProcessComponentFactory factory, EventHandler even
this.slaQm = slaQm;
}

protected void invokeEventHandler(EventType event, ProcessContext pc) {
private void invokeEvent(EventType event, ProcessContext pc) {
if (eventHandler == null) {
return;
}
Expand All @@ -59,21 +59,14 @@ protected void invokeEventHandler(EventType event, ProcessContext pc) {
wb = (wb == null) ? "" : wb;
logger.info("Case id -> {}, raising event -> {}, comp name -> {}, work basket -> {}", pi.getCaseId(), event.name(), pc.getCompName(), wb);

// invoke event on application
switch (event) {
case ON_PROCESS_START:
case ON_PROCESS_RESUME:
case ON_PROCESS_COMPLETE:
case ON_PROCESS_PEND:
case ON_PROCESS_REOPEN:
eventHandler.invoke(event, pc);
if ((slad != null) && (slaQm != null)) {
raiseSlaEvent(event, pc);
}
if (event == ON_PROCESS_PEND) {
// set the prev pend work basket
ExecPath ep = pi.getExecPath(pi.getPendExecPath());
ep.setPrevPendWorkBasket(ep.getPendWorkBasket());
}
break;

case ON_PERSIST:
Expand All @@ -89,6 +82,67 @@ protected void invokeEventHandler(EventType event, ProcessContext pc) {
}
}

private void raiseSla(EventType event, ProcessContext pc) {
if ((slad == null) || (slaQm == null)) {
return;
}

switch (event) {
case ON_PROCESS_START:
case ON_PROCESS_RESUME:
case ON_PROCESS_COMPLETE:
case ON_PROCESS_PEND:
case ON_PROCESS_REOPEN:
raiseSlaEvent(event, pc);
break;

case ON_PERSIST:
case ON_TICKET_RAISED:
break;
}
}

private void setPrevPendWorkbasket(EventType event) {
if (event == ON_PROCESS_PEND) {
// set the prev pend work basket
ExecPath ep = pi.getExecPath(pi.getPendExecPath());
ep.setPrevPendWorkBasket(ep.getPendWorkBasket());
}
}

// this method can be used in the future instead of the above one so that the current pend
// wb and the prev pend wb are correctly represented in the audit logs and process info
// in the persistent storage. Reason for not switching over just now is because of more extensive
// regression testing required
// private void setPrevPendWorkbasket(EventType event) {
// switch (event) {
// case ON_PROCESS_START:
// case ON_PROCESS_RESUME: {
// // set the prev pend work basket
// String s = pi.getPendExecPath();
// if (BaseUtils.isNullOrEmpty(s)) {
// return;
// }
// ExecPath ep = pi.getExecPath(s);
// ep.setPrevPendWorkBasket(ep.getPendWorkBasket());
// }
// break;
//
// case ON_PROCESS_PEND:
// case ON_PROCESS_COMPLETE:
// case ON_PROCESS_REOPEN:
// case ON_PERSIST:
// case ON_TICKET_RAISED:
// break;
// }
// }

protected void invokeEventHandler(EventType event, ProcessContext pc) {
invokeEvent(event, pc);
raiseSla(event, pc);
setPrevPendWorkbasket(event);
}

public boolean isCaseStarted(String caseId) {
String key = CONSTS_FLOWRET.DAO.PROCESS_INFO + CONSTS_FLOWRET.DAO.SEP + caseId;
Document d = dao.read(key);
Expand Down Expand Up @@ -285,8 +339,6 @@ public ProcessContext reopenCase(String caseId, String ticket, boolean pendBefor
}

private void raiseSlaEvent(EventType event, ProcessContext pc) {
Document d = null;

switch (event) {
case ON_PROCESS_START: {
Utils.enqueueCaseStartMilestones(pc, slad, slaQm);
Expand Down
30 changes: 22 additions & 8 deletions src/main/java/com/americanexpress/unify/flowret/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected static ProcessInfo getProcessInfo(FlowretDao dao, String caseId, Proce
return pi;
}

private static void setIsComplete(Document pid) {
private static void setIsComplete(String caseId, Document pid) {
Boolean isComplete = pid.getBoolean("$.process_info.is_complete");
boolean b = true;
if (isComplete == null) {
Expand All @@ -87,6 +87,7 @@ private static void setIsComplete(Document pid) {

isComplete = b;
pid.setBoolean("$.process_info.is_complete", isComplete);
logger.info("Case id -> {}, setting $.process_info.is_complete to -> {}", caseId, isComplete);
}
}

Expand All @@ -111,7 +112,7 @@ private static String getShortestExecPath(Document pid) {
return sep;
}

private static boolean checkAndSetTicketInExecPath(Document pid) {
private static boolean checkAndSetTicketInExecPath(String caseId, Document pid) {
boolean isTicketRaised = false;

String ticket = pid.getString("$.process_info.ticket");
Expand All @@ -124,6 +125,7 @@ private static boolean checkAndSetTicketInExecPath(Document pid) {
pid.setString("$.process_info.pend_exec_path", epName);
pid.setString("$.process_info.exec_paths[name=%].pend_workbasket", "flowret_temp_hold");
pid.setString("$.process_info.exec_paths[name=%].unit_response_type", UnitResponseType.OK_PEND.toString().toLowerCase(), epName);
logger.info("Case id -> {}, setting info for execution path -> {}", caseId, epName);
}

// set ticket as blank in all exec paths
Expand All @@ -144,7 +146,6 @@ private static boolean checkAndSetTicketInExecPath(Document pid) {

private static void checkExecPathCompletion(Document pid, String caseId, ProcessDefinition pd) {
int size = pid.getArraySize("$.process_info.exec_paths[]");
int oldLevel = 0;

for (int i = 0; i < size; i++) {
// get status
Expand All @@ -154,10 +155,18 @@ private static void checkExecPathCompletion(Document pid, String caseId, Process
if (epStatus == ExecPathStatus.STARTED) {
// we have an exec path that could not go to completion. Set status and wb
pid.setString("$.process_info.exec_paths[%].status", ExecPathStatus.COMPLETED.toString().toLowerCase(), i + "");
String wb = pid.getString("$.process_info.exec_paths[%].pend_workbasket", i + "");
if (wb == null) {
pid.setString("$.process_info.exec_paths[%].pend_workbasket", "flowret_temp_hold", i + "");
logger.info("Case id -> {}, setting status to complete for execution path -> {}", caseId, epName);

String prevWb = pid.getString("$.process_info.exec_paths[%].prev_pend_workbasket", i + "");
String wb = null;
if ((prevWb == null) || (prevWb.isEmpty() == true)) {
wb = "flowret_temp_hold";
}
else {
wb = prevWb;
}
pid.setString("$.process_info.exec_paths[%].pend_workbasket", wb, i + "");
logger.info("Case id -> {}, setting pend work basket to -> {} for execution path -> {}", caseId, wb, epName);

String urt = pid.getString("$.process_info.exec_paths[%].unit_response_type", i + "");
if (urt == null) {
Expand All @@ -175,16 +184,19 @@ private static void checkExecPathCompletion(Document pid, String caseId, Process
// but before the parent thread could join on child threads the crash happened. Since practically
// it is not possible to take care of every situation, we will live with this risk hoping that one of the child
// process would not have completed in which case we should be OK
logger.info("Case id -> {}, encountered p_route or p_route_dynamic. Doing nothing. Execution path -> {}", caseId, epName);
}
else if (unit.getType() == UnitType.S_ROUTE) {
// urt can be ok_proceed or error_pend. If ok_proceed we need to replace with ok_pend_eor
// as we need the rule to evaluate once again to decide where to go. For error pend we can
// leave it as it is
if (urt.equals(UnitResponseType.OK_PROCEED.toString().toLowerCase()) == true) {
pid.setString("$.process_info.exec_paths[%].unit_response_type", UnitResponseType.OK_PEND_EOR.toString().toLowerCase(), i + "");
logger.info("Case id -> {}, encountered s_route and urt as ok_proceed. Setting urt to ok_pend_eor. Execution path -> {}", caseId, epName);
}
else {
// nothing to do
logger.info("Case id -> {}, encountered s_route and urt as not ok_proceed. Doing nothing. Execution path -> {}", caseId, epName);
}
}
else {
Expand All @@ -197,6 +209,7 @@ else if (unit.getType() == UnitType.S_ROUTE) {
}
else {
// nothing to do
logger.info("Case id -> {}, encountered step and urt as not ok_proceed. Doing nothing. Execution path -> {}", caseId, epName);
}
}
}
Expand Down Expand Up @@ -231,6 +244,7 @@ private static void setPendExecPath(Document pid, String caseId) {

if (pendExecPath.isEmpty() == false) {
pid.setString("$.process_info.pend_exec_path", pendExecPath);
logger.info("Case id -> {}, setting pend exec path -> {}", caseId, pendExecPath);
}
else {
logger.info("Case id -> {}, could not find a exec path to pend", caseId);
Expand Down Expand Up @@ -271,9 +285,9 @@ private static void sanitize(Document pid, String caseId, ProcessDefinition pd)

// this logic should work for both single and multithreaded use cases

setIsComplete(pid);
setIsComplete(caseId, pid);

boolean isTicket = checkAndSetTicketInExecPath(pid);
boolean isTicket = checkAndSetTicketInExecPath(caseId, pid);
if (isTicket == false) {
checkExecPathCompletion(pid, caseId, pd);
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/americanexpress/unify/flowret/Wms.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public void changeWorkBasket(String caseId, String newWb) {
}
}

// copy the new work basket into prev work basket
ep.setPrevPendWorkBasket(newWb);

// write audit log
Utils.writeAuditLog(dao, pi, null, null, "Wms");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2020 American Express Travel Related Services Company, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.americanexpress.unify.flowret;

/*
* @author Deepak Arora
*/
public class TestWorkManager implements WorkManager {

@Override
public void changeWorkBasket(ProcessContext pc, String oldWb, String newWb) {
System.out.println("Received change work basket command. Old wb = " + oldWb + ", new wb = " + newWb);
}

}
Loading

0 comments on commit bf38f3d

Please sign in to comment.