Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Addressed code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pgolash committed Jul 28, 2021
1 parent 1859fff commit f7c6271
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 4 deletions.
Expand Up @@ -136,9 +136,11 @@ public long getCallbackAfterSeconds() {
}

/**
* @return Return the retry delay for the FAILED tasks.
* @return the retry delay for the FAILED tasks.
*/
public int getRetryDelaySeconds() {return retryDelaySeconds; }
public int getRetryDelaySeconds() {
return retryDelaySeconds;
}

/**
* When set to non-zero values, the task remains in the queue for the specified seconds before sent back to the
Expand All @@ -154,8 +156,8 @@ public void setCallbackAfterSeconds(long callbackAfterSeconds) {
/**
* Retry delay. It follows following logic to decide on retry interval:
* <p>NO retry delay if the worker sends a negative value (<0) in the TaskResult</p>
* <p>Retry delay from the task definition or (workflow task?) if the worker sends 0 in the TaskResult</p>
* <p>Retry delay from the workflow task if the worker sends 0 in the TaskResult</p>
* <p>Retry delay from the task definition if the worker sends 0 in the TaskResult</p>
* <p>Retry delay from the workflow task if the worker sends a positive value (>0) value in the TaskResult</p>
*
* @param retryDelaySeconds
*/
Expand Down
Expand Up @@ -3641,6 +3641,92 @@ public void testRetries() {

}

private void verifyRetriedTask(String wfId, String taskType, String workerId, boolean failed, int retryDelay, boolean terminal) {
Task task = workflowExecutionService.poll(taskType, workerId);
assertNotNull(task);
assertTrue(workflowExecutionService.ackTaskReceived(task.getTaskId()));
if (failed) {
task.setStatus(FAILED);
task.setStartDelayInSeconds(retryDelay);
task.setReasonForIncompletion("failure...0");
} else {
task.setStatus(COMPLETED);
}
workflowExecutionService.updateTask(task);
Workflow es = workflowExecutionService.getExecutionStatus(wfId, false);
if (terminal && !failed) {
assertEquals(WorkflowStatus.COMPLETED, es.getStatus());
} else {
assertEquals(WorkflowStatus.RUNNING, es.getStatus());
}
}

@Test
public void testCustomRetryPolicy() {
String taskName = "junit_task_2";
TaskDef taskDef = notFoundSafeGetTaskDef(taskName);
taskDef.setRetryCount(2);
taskDef.setRetryLogic(RetryLogic.CUSTOM);
taskDef.setRetryDelaySeconds(2);
metadataService.updateTaskDef(taskDef);

taskName = "junit_task_3";
taskDef = notFoundSafeGetTaskDef(taskName);
taskDef.setRetryCount(2);
taskDef.setRetryLogic(RetryLogic.CUSTOM);
taskDef.setRetryDelaySeconds(2);
metadataService.updateTaskDef(taskDef);

metadataService.getWorkflowDef(TEST_WORKFLOW, 1);

String correlationId = "unit_test_1";
Map<String, Object> input = new HashMap<String, Object>();
String inputParam1 = "p1 value";
input.put("param1", inputParam1);
input.put("param2", "p2 value");
String wfid = startOrLoadWorkflowExecution(TEST_WORKFLOW, 1, correlationId, input, null, null);
System.out.println("testRetries.wfid=" + wfid);
assertNotNull(wfid);

List<String> ids = workflowExecutionService.getRunningWorkflows(TEST_WORKFLOW, 1);
assertNotNull(ids);
assertTrue("found no ids: " + ids, ids.size() > 0); //if there are concurrent tests running, this would be more than 1
boolean foundId = false;
for (String id : ids) {
if (id.equals(wfid)) {
foundId = true;
}
}
assertTrue(foundId);
Workflow es = workflowExecutionService.getExecutionStatus(wfid, true);
assertNotNull(es);
assertEquals(RUNNING, es.getStatus());

Task task = workflowExecutionService.poll("junit_task_1", "task1.junit.worker");
assertNotNull(task);
assertTrue(workflowExecutionService.ackTaskReceived(task.getTaskId()));
task.setStatus(COMPLETED);
workflowExecutionService.updateTask(task);

verifyRetriedTask(wfid, "junit_task_2", "task2.junit.worker", true, -1, false);
// Should have not delay since retry delay < 0
Uninterruptibles.sleepUninterruptibly(0, TimeUnit.SECONDS);
verifyRetriedTask(wfid, "junit_task_2", "task2.junit.worker", false, -1, false);

verifyRetriedTask(wfid, "junit_task_3", "task3.junit.worker", true, 0, true);
// Should use retry from task definition since retry delay = 0
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
verifyRetriedTask(wfid, "junit_task_3", "task3.junit.worker", false, 0, true);

es = workflowExecutionService.getExecutionStatus(wfid, true);
assertEquals(5, es.getTasks().size());
assertEquals(COMPLETED, es.getTasks().get(0).getStatus());
assertEquals(FAILED, es.getTasks().get(1).getStatus());
assertEquals(COMPLETED, es.getTasks().get(2).getStatus());
assertEquals(FAILED, es.getTasks().get(3).getStatus());
assertEquals(COMPLETED, es.getTasks().get(4).getStatus());
}

@Test
public void testSuccess() {

Expand Down

0 comments on commit f7c6271

Please sign in to comment.