Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #433 from Netflix/cleanup
Browse files Browse the repository at this point in the history
Logger statements and clean up
  • Loading branch information
pctreddy committed Feb 23, 2018
2 parents 4df0f91 + 17abc49 commit 504bd40
Show file tree
Hide file tree
Showing 24 changed files with 356 additions and 306 deletions.
Expand Up @@ -31,9 +31,8 @@
public class TaskResult {

public enum Status {

IN_PROGRESS, FAILED, COMPLETED, SCHEDULED; //SCHEDULED is added for the backward compatibility and should NOT be used when updating the task result
};
IN_PROGRESS, FAILED, COMPLETED, SCHEDULED; //SCHEDULED is added for the backward compatibility and should NOT be used when updating the task result
}

private String workflowInstanceId;

Expand Down Expand Up @@ -100,7 +99,7 @@ public long getCallbackAfterSeconds() {
/**
* When set to non-zero values, the task remains in the queue for the specified seconds before sent back to the worker when polled.
* Useful for the long running task, where the task is updated as IN_PROGRESS and should not be polled out of the queue for a specified amount of time. (delayed queue implementation)
* @param callbackAfterSeconds. Amount of time in seconds the task should be held in the queue before giving it to a polling worker.
* @param callbackAfterSeconds Amount of time in seconds the task should be held in the queue before giving it to a polling worker.
*/
public void setCallbackAfterSeconds(long callbackAfterSeconds) {
this.callbackAfterSeconds = callbackAfterSeconds;
Expand Down Expand Up @@ -219,7 +218,6 @@ public static TaskResult newTaskResult(Status status) {
result.setStatus(status);
return result;
}




}
Expand Up @@ -37,7 +37,7 @@ public class EventQueues {

private static Logger logger = LoggerFactory.getLogger(EventQueues.class);

private static ParametersUtils pu = new ParametersUtils();
private static ParametersUtils parametersUtils = new ParametersUtils();

private static Map<String, EventQueueProvider> providers = new HashMap<>();

Expand All @@ -54,7 +54,7 @@ public static List<String> providers() {
}

public static ObservableQueue getQueue(String eventt, boolean throwException) {
String event = pu.replace(eventt).toString();
String event = parametersUtils.replace(eventt).toString();
String type = event.substring(0, event.indexOf(':'));
String queueURI = event.substring(event.indexOf(':') + 1);
EventQueueProvider provider = providers.get(type);
Expand Down
Expand Up @@ -32,7 +32,7 @@
@SuppressWarnings("serial")
public class ApplicationException extends RuntimeException {

public static enum Code {
public enum Code {
INVALID_INPUT(400), INTERNAL_ERROR(500), NOT_FOUND(404), CONFLICT(409), UNAUTHORIZED(403), BACKEND_ERROR(500);

private int statusCode;
Expand Down
Expand Up @@ -70,7 +70,7 @@ public DeciderService(MetadataDAO metadataDAO, @Named("TaskMappers") Map<String,
}

//QQ public method validation of the input params
public DeciderOutcome decide(Workflow workflow, WorkflowDef def) throws TerminateWorkflow {
public DeciderOutcome decide(Workflow workflow, WorkflowDef def) throws TerminateWorkflowException {

workflow.setSchemaVersion(def.getSchemaVersion());
//In case of a new workflow the list of tasks will be empty
Expand All @@ -91,7 +91,7 @@ public DeciderOutcome decide(Workflow workflow, WorkflowDef def) throws Terminat
return decide(def, workflow, tasksToBeScheduled);
}

private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, List<Task> preScheduledTasks) throws TerminateWorkflow {
private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, List<Task> preScheduledTasks) throws TerminateWorkflowException {

DeciderOutcome outcome = new DeciderOutcome();

Expand Down Expand Up @@ -193,7 +193,7 @@ private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, Li

}

private List<Task> startWorkflow(Workflow workflow, WorkflowDef def) throws TerminateWorkflow {
private List<Task> startWorkflow(Workflow workflow, WorkflowDef def) throws TerminateWorkflowException {

logger.debug("Starting workflow " + def.getName() + "/" + workflow.getWorkflowId());
//The tasks will be empty in case of new workflow
Expand All @@ -202,7 +202,7 @@ private List<Task> startWorkflow(Workflow workflow, WorkflowDef def) throws Term
if (workflow.getReRunFromWorkflowId() == null || tasks.isEmpty()) {

if (def.getTasks().isEmpty()) {
throw new TerminateWorkflow("No tasks found to be executed", WorkflowStatus.COMPLETED);
throw new TerminateWorkflowException("No tasks found to be executed", WorkflowStatus.COMPLETED);
}

WorkflowTask taskToSchedule = def.getTasks().getFirst(); //Nothing isSystemTask running yet - so schedule the first task
Expand All @@ -227,7 +227,7 @@ private List<Task> startWorkflow(Workflow workflow, WorkflowDef def) throws Term
.orElseThrow(() -> {
String reason = String.format("The workflow %s isSystemTask marked for re-run from %s but could not find the starting task",
workflow.getWorkflowId(), workflow.getReRunFromWorkflowId());
return new TerminateWorkflow(reason);
return new TerminateWorkflowException(reason);
});

return Arrays.asList(rerunFromTask);
Expand All @@ -251,7 +251,7 @@ private void updateOutput(final WorkflowDef def, final Workflow workflow) {
workflow.setOutput(output);
}

private boolean checkForWorkflowCompletion(final WorkflowDef def, final Workflow workflow) throws TerminateWorkflow {
private boolean checkForWorkflowCompletion(final WorkflowDef def, final Workflow workflow) throws TerminateWorkflowException {

List<Task> allTasks = workflow.getTasks();
if (allTasks.isEmpty()) {
Expand Down Expand Up @@ -319,13 +319,13 @@ private String getNextTasksToBeScheduled(WorkflowDef def, Workflow workflow, Tas
}

@VisibleForTesting
Task retry(TaskDef taskDefinition, WorkflowTask workflowTask, Task task, Workflow workflow) throws TerminateWorkflow {
Task retry(TaskDef taskDefinition, WorkflowTask workflowTask, Task task, Workflow workflow) throws TerminateWorkflowException {

int retryCount = task.getRetryCount();
if (!task.getStatus().isRetriable() || SystemTaskType.isBuiltIn(task.getTaskType()) || taskDefinition == null || taskDefinition.getRetryCount() <= retryCount) {
WorkflowStatus status = task.getStatus().equals(Status.TIMED_OUT) ? WorkflowStatus.TIMED_OUT : WorkflowStatus.FAILED;
task.setRetried(true);
throw new TerminateWorkflow(task.getReasonForIncompletion(), status, task);
throw new TerminateWorkflowException(task.getReasonForIncompletion(), status, task);
}

// retry... - but not immediately - put a delay...
Expand Down Expand Up @@ -393,7 +393,7 @@ void checkForTimeout(TaskDef taskType, Task task) {
case TIME_OUT_WF:
task.setStatus(Status.TIMED_OUT);
task.setReasonForIncompletion(reason);
throw new TerminateWorkflow(reason, WorkflowStatus.TIMED_OUT, task);
throw new TerminateWorkflowException(reason, WorkflowStatus.TIMED_OUT, task);
}
}

Expand Down Expand Up @@ -428,8 +428,10 @@ public List<Task> getTasksToBeScheduled(WorkflowDef def, Workflow workflow,

public List<Task> getTasksToBeScheduled(WorkflowDef workflowDefinition, Workflow workflowInstance,
WorkflowTask taskToSchedule, int retryCount, String retriedTaskId) {

Map<String, Object> input = parametersUtils.getTaskInput(taskToSchedule.getInputParameters(),
workflowInstance, null, null);

Type taskType = Type.USER_DEFINED;
String type = taskToSchedule.getType();
if (Type.isSystemTask(type)) {
Expand All @@ -455,7 +457,7 @@ private boolean isTaskSkipped(WorkflowTask taskToSchedule, Workflow workflow) {
}
return retval;
} catch (Exception e) {
throw new TerminateWorkflow(e.getMessage());
throw new TerminateWorkflowException(e.getMessage());
}

}
Expand Down
Expand Up @@ -25,21 +25,21 @@
*
*/
@SuppressWarnings("serial")
public class TerminateWorkflow extends RuntimeException {
public class TerminateWorkflowException extends RuntimeException {

WorkflowStatus workflowStatus;

Task task;

public TerminateWorkflow(String reason) {
public TerminateWorkflowException(String reason) {
this(reason, WorkflowStatus.FAILED);
}

public TerminateWorkflow(String reason, WorkflowStatus workflowStatus) {
public TerminateWorkflowException(String reason, WorkflowStatus workflowStatus) {
this(reason, workflowStatus, null);
}

public TerminateWorkflow(String reason, WorkflowStatus workflowStatus, Task task) {
public TerminateWorkflowException(String reason, WorkflowStatus workflowStatus, Task task) {
super(reason);
this.workflowStatus = workflowStatus;
this.task = task;
Expand Down

0 comments on commit 504bd40

Please sign in to comment.