From c84e8a5b244b78520f5e4dd5dbc526e36288d1f1 Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Thu, 7 Mar 2019 12:53:32 -0800 Subject: [PATCH] add external storage paths to summary --- .../conductor/common/run/TaskSummary.java | 44 ++++++++++++++++++- .../conductor/common/run/WorkflowSummary.java | 41 +++++++++++++++++ .../conductor/grpc/AbstractProtoMapper.java | 20 +++++++++ grpc/src/main/proto/model/tasksummary.proto | 2 + .../main/proto/model/workflowsummary.proto | 2 + grpc/src/main/proto/model/workflowtask.proto | 1 + 6 files changed, 108 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java b/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java index c09289218d..5091e4e34a 100644 --- a/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java +++ b/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java @@ -25,6 +25,7 @@ import com.github.vmg.protogen.annotations.*; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.Task.Status; +import org.apache.commons.lang3.StringUtils; /** * @author Viren @@ -86,6 +87,12 @@ public class TaskSummary { @ProtoField(id = 16) private String taskId; + @ProtoField(id = 17) + private String externalInputPayloadStoragePath; + + @ProtoField(id = 18) + private String externalOutputPayloadStoragePath; + public TaskSummary() { } @@ -119,6 +126,13 @@ public TaskSummary(Task task) { if(task.getEndTime() > 0){ this.executionTime = task.getEndTime() - task.getStartTime(); } + + if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) { + this.externalInputPayloadStoragePath = task.getExternalInputPayloadStoragePath(); + } + if (StringUtils.isNotBlank(task.getExternalOutputPayloadStoragePath())) { + this.externalOutputPayloadStoragePath = task.getExternalOutputPayloadStoragePath(); + } } /** @@ -348,6 +362,32 @@ public String getTaskId() { public void setTaskId(String taskId) { this.taskId = taskId; } - - + + /** + * @return the external storage path for the task input payload + */ + public String getExternalInputPayloadStoragePath() { + return externalInputPayloadStoragePath; + } + + /** + * @param externalInputPayloadStoragePath the external storage path where the task input payload is stored + */ + public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) { + this.externalInputPayloadStoragePath = externalInputPayloadStoragePath; + } + + /** + * @return the external storage path for the task output payload + */ + public String getExternalOutputPayloadStoragePath() { + return externalOutputPayloadStoragePath; + } + + /** + * @param externalOutputPayloadStoragePath the external storage path where the task output payload is stored + */ + public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStoragePath) { + this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath; + } } diff --git a/common/src/main/java/com/netflix/conductor/common/run/WorkflowSummary.java b/common/src/main/java/com/netflix/conductor/common/run/WorkflowSummary.java index d92e07453e..c2c75a8608 100644 --- a/common/src/main/java/com/netflix/conductor/common/run/WorkflowSummary.java +++ b/common/src/main/java/com/netflix/conductor/common/run/WorkflowSummary.java @@ -25,6 +25,7 @@ import com.github.vmg.protogen.annotations.*; import com.netflix.conductor.common.run.Workflow.WorkflowStatus; +import org.apache.commons.lang3.StringUtils; /** * Captures workflow summary info to be indexed in Elastic Search. @@ -80,6 +81,12 @@ public class WorkflowSummary { @ProtoField(id = 14) private String failedReferenceTaskNames = ""; + + @ProtoField(id = 15) + private String externalInputPayloadStoragePath; + + @ProtoField(id = 16) + private String externalOutputPayloadStoragePath; public WorkflowSummary() { @@ -115,6 +122,12 @@ public WorkflowSummary(Workflow workflow) { } this.event = workflow.getEvent(); this.failedReferenceTaskNames = workflow.getFailedReferenceTaskNames().stream().collect(Collectors.joining(",")); + if (StringUtils.isNotBlank(workflow.getExternalInputPayloadStoragePath())) { + this.externalInputPayloadStoragePath = workflow.getExternalInputPayloadStoragePath(); + } + if (StringUtils.isNotBlank(workflow.getExternalOutputPayloadStoragePath())) { + this.externalOutputPayloadStoragePath = workflow.getExternalOutputPayloadStoragePath(); + } } /** @@ -282,4 +295,32 @@ public void setReasonForIncompletion(String reasonForIncompletion) { public void setExecutionTime(long executionTime) { this.executionTime = executionTime; } + + /** + * @return the external storage path of the workflow input payload + */ + public String getExternalInputPayloadStoragePath() { + return externalInputPayloadStoragePath; + } + + /** + * @param externalInputPayloadStoragePath the external storage path where the workflow input payload is stored + */ + public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) { + this.externalInputPayloadStoragePath = externalInputPayloadStoragePath; + } + + /** + * @return the external storage path of the workflow output payload + */ + public String getExternalOutputPayloadStoragePath() { + return externalOutputPayloadStoragePath; + } + + /** + * @param externalOutputPayloadStoragePath the external storage path where the workflow output payload is stored + */ + public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStoragePath) { + this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath; + } } diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 224f5c42aa..17fb4465fb 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -873,6 +873,12 @@ public TaskSummaryPb.TaskSummary toProto(TaskSummary from) { if (from.getTaskId() != null) { to.setTaskId( from.getTaskId() ); } + if (from.getExternalInputPayloadStoragePath() != null) { + to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() ); + } + if (from.getExternalOutputPayloadStoragePath() != null) { + to.setExternalOutputPayloadStoragePath( from.getExternalOutputPayloadStoragePath() ); + } return to.build(); } @@ -1078,6 +1084,12 @@ public WorkflowSummaryPb.WorkflowSummary toProto(WorkflowSummary from) { if (from.getFailedReferenceTaskNames() != null) { to.setFailedReferenceTaskNames( from.getFailedReferenceTaskNames() ); } + if (from.getExternalInputPayloadStoragePath() != null) { + to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() ); + } + if (from.getExternalOutputPayloadStoragePath() != null) { + to.setExternalOutputPayloadStoragePath( from.getExternalOutputPayloadStoragePath() ); + } return to.build(); } @@ -1097,6 +1109,8 @@ public WorkflowSummary fromProto(WorkflowSummaryPb.WorkflowSummary from) { to.setExecutionTime( from.getExecutionTime() ); to.setEvent( from.getEvent() ); to.setFailedReferenceTaskNames( from.getFailedReferenceTaskNames() ); + to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() ); + to.setExternalOutputPayloadStoragePath( from.getExternalOutputPayloadStoragePath() ); return to; } @@ -1126,6 +1140,9 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) { if (from.getCaseExpression() != null) { to.setCaseExpression( from.getCaseExpression() ); } + if (from.getScriptExpression() != null) { + to.setScriptExpression( from.getScriptExpression() ); + } for (Map.Entry> pair : from.getDecisionCases().entrySet()) { to.putDecisionCases( pair.getKey(), toProto( pair.getValue() ) ); } @@ -1156,6 +1173,7 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) { if (from.isRateLimited() != null) { to.setRateLimited( from.isRateLimited() ); } + to.addAllDefaultExclusiveJoinTask( from.getDefaultExclusiveJoinTask() ); return to.build(); } @@ -1173,6 +1191,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) { to.setDynamicTaskNameParam( from.getDynamicTaskNameParam() ); to.setCaseValueParam( from.getCaseValueParam() ); to.setCaseExpression( from.getCaseExpression() ); + to.setScriptExpression( from.getScriptExpression() ); Map> decisionCasesMap = new HashMap>(); for (Map.Entry pair : from.getDecisionCasesMap().entrySet()) { decisionCasesMap.put( pair.getKey(), fromProto( pair.getValue() ) ); @@ -1193,6 +1212,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) { to.setTaskDefinition( fromProto( from.getTaskDefinition() ) ); } to.setRateLimited( from.getRateLimited() ); + to.setDefaultExclusiveJoinTask( from.getDefaultExclusiveJoinTaskList().stream().collect(Collectors.toCollection(ArrayList::new)) ); return to; } diff --git a/grpc/src/main/proto/model/tasksummary.proto b/grpc/src/main/proto/model/tasksummary.proto index 6ad99d0dc3..365e7b6c8e 100644 --- a/grpc/src/main/proto/model/tasksummary.proto +++ b/grpc/src/main/proto/model/tasksummary.proto @@ -24,4 +24,6 @@ message TaskSummary { string input = 14; string output = 15; string task_id = 16; + string external_input_payload_storage_path = 17; + string external_output_payload_storage_path = 18; } diff --git a/grpc/src/main/proto/model/workflowsummary.proto b/grpc/src/main/proto/model/workflowsummary.proto index 6e3d4202a0..226ca7bc5c 100644 --- a/grpc/src/main/proto/model/workflowsummary.proto +++ b/grpc/src/main/proto/model/workflowsummary.proto @@ -22,4 +22,6 @@ message WorkflowSummary { int64 execution_time = 12; string event = 13; string failed_reference_task_names = 14; + string external_input_payload_storage_path = 15; + string external_output_payload_storage_path = 16; } diff --git a/grpc/src/main/proto/model/workflowtask.proto b/grpc/src/main/proto/model/workflowtask.proto index 2cc09ea071..c6dae59fcb 100644 --- a/grpc/src/main/proto/model/workflowtask.proto +++ b/grpc/src/main/proto/model/workflowtask.proto @@ -21,6 +21,7 @@ message WorkflowTask { string dynamic_task_name_param = 6; string case_value_param = 7; string case_expression = 8; + string script_expression = 22; map decision_cases = 9; string dynamic_fork_tasks_param = 10; string dynamic_fork_tasks_input_param_name = 11;