From 23488511176a38bb6c46f1329a4e99fbbe53b8bb Mon Sep 17 00:00:00 2001 From: huangxiao <68590897+youngledo@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:35:41 +0800 Subject: [PATCH 1/6] Update WorkflowExecutor.java --- .../workflow/executor/WorkflowExecutor.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index ae75b6c839..c47d5bc43a 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -154,7 +154,7 @@ public void initWorkers(String packagesToScan) { annotatedWorkerExecutor.initWorkers(packagesToScan); } - public CompletableFuture executeWorkflow(String name, Integer version, Object input) { + private String doExecuteWorkflow(String name, Integer version, WorkflowDef workflowDef, Object input) { CompletableFuture future = new CompletableFuture<>(); Map inputMap = objectMapper.convertValue(input, Map.class); @@ -162,28 +162,27 @@ public CompletableFuture executeWorkflow(String name, Integer version, request.setInput(inputMap); request.setName(name); request.setVersion(version); + request.setWorkflowDef(conductorWorkflow.toWorkflowDef()); - String workflowId = workflowClient.startWorkflow(request); + return workflowClient.startWorkflow(request); + } + + public String executeWorkflow(String name, Integer version, Object input) { + String workflowId = this.doExecuteWorkflow(name, version, input); + runningWorkflowFutures.put(workflowId, future); + return workflowId; + } + + public CompletableFuture executeWorkflow(String name, Integer version, Object input) { + String workflowId = this.doExecuteWorkflow(name, version, input); runningWorkflowFutures.put(workflowId, future); return future; } public CompletableFuture executeWorkflow( ConductorWorkflow conductorWorkflow, Object input) { - - CompletableFuture future = new CompletableFuture<>(); - - Map inputMap = objectMapper.convertValue(input, Map.class); - - StartWorkflowRequest request = new StartWorkflowRequest(); - request.setInput(inputMap); - request.setName(conductorWorkflow.getName()); - request.setVersion(conductorWorkflow.getVersion()); - request.setWorkflowDef(conductorWorkflow.toWorkflowDef()); - - String workflowId = workflowClient.startWorkflow(request); + String workflowId = this.doExecuteWorkflow(conductorWorkflow.getName(), conductorWorkflow.getVersion(), conductorWorkflow.toWorkflowDef(), input); runningWorkflowFutures.put(workflowId, future); - return future; } From 1e1ac469236e97ccbe7ffa2b53a84141a44b9b66 Mon Sep 17 00:00:00 2001 From: huangxiao <68590897+youngledo@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:47:05 +0800 Subject: [PATCH 2/6] Update WorkflowExecutor.java --- .../sdk/workflow/executor/WorkflowExecutor.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index c47d5bc43a..76d40bd699 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -154,7 +154,7 @@ public void initWorkers(String packagesToScan) { annotatedWorkerExecutor.initWorkers(packagesToScan); } - private String doExecuteWorkflow(String name, Integer version, WorkflowDef workflowDef, Object input) { + private String startWorkflow(String name, Integer version, WorkflowDef workflowDef, Object input) { CompletableFuture future = new CompletableFuture<>(); Map inputMap = objectMapper.convertValue(input, Map.class); @@ -167,21 +167,20 @@ private String doExecuteWorkflow(String name, Integer version, WorkflowDef workf return workflowClient.startWorkflow(request); } - public String executeWorkflow(String name, Integer version, Object input) { - String workflowId = this.doExecuteWorkflow(name, version, input); + public String executeWorkflowFuture(String name, Integer version, Object input) { + String workflowId = this.startWorkflow(name, version, input); runningWorkflowFutures.put(workflowId, future); return workflowId; } public CompletableFuture executeWorkflow(String name, Integer version, Object input) { - String workflowId = this.doExecuteWorkflow(name, version, input); + String workflowId = this.startWorkflow(name, version, input); runningWorkflowFutures.put(workflowId, future); return future; } - public CompletableFuture executeWorkflow( - ConductorWorkflow conductorWorkflow, Object input) { - String workflowId = this.doExecuteWorkflow(conductorWorkflow.getName(), conductorWorkflow.getVersion(), conductorWorkflow.toWorkflowDef(), input); + public CompletableFuture executeWorkflow(ConductorWorkflow conductorWorkflow, Object input) { + String workflowId = this.startWorkflow(conductorWorkflow.getName(), conductorWorkflow.getVersion(), conductorWorkflow.toWorkflowDef(), input); runningWorkflowFutures.put(workflowId, future); return future; } From 83c6e1d410ef6c6a6a8af0d5431233ca64e6a027 Mon Sep 17 00:00:00 2001 From: huangxiao <68590897+youngledo@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:53:54 +0800 Subject: [PATCH 3/6] Update WorkflowExecutor.java --- .../sdk/workflow/executor/WorkflowExecutor.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index 76d40bd699..485b218746 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -155,32 +155,34 @@ public void initWorkers(String packagesToScan) { } private String startWorkflow(String name, Integer version, WorkflowDef workflowDef, Object input) { - CompletableFuture future = new CompletableFuture<>(); Map inputMap = objectMapper.convertValue(input, Map.class); StartWorkflowRequest request = new StartWorkflowRequest(); request.setInput(inputMap); request.setName(name); request.setVersion(version); - request.setWorkflowDef(conductorWorkflow.toWorkflowDef()); + request.setWorkflowDef(workflowDef); return workflowClient.startWorkflow(request); } public String executeWorkflowFuture(String name, Integer version, Object input) { - String workflowId = this.startWorkflow(name, version, input); + String workflowId = this.startWorkflow(name, version, null, input); + CompletableFuture future = new CompletableFuture<>(); runningWorkflowFutures.put(workflowId, future); return workflowId; } public CompletableFuture executeWorkflow(String name, Integer version, Object input) { - String workflowId = this.startWorkflow(name, version, input); + String workflowId = this.startWorkflow(name, version, null, input); + CompletableFuture future = new CompletableFuture<>(); runningWorkflowFutures.put(workflowId, future); return future; } public CompletableFuture executeWorkflow(ConductorWorkflow conductorWorkflow, Object input) { String workflowId = this.startWorkflow(conductorWorkflow.getName(), conductorWorkflow.getVersion(), conductorWorkflow.toWorkflowDef(), input); + CompletableFuture future = new CompletableFuture<>(); runningWorkflowFutures.put(workflowId, future); return future; } From 774997e68e7ab6d06f3436f25bd6fdc146eb8b81 Mon Sep 17 00:00:00 2001 From: huangxiao <68590897+youngledo@users.noreply.github.com> Date: Fri, 1 Dec 2023 19:31:16 +0800 Subject: [PATCH 4/6] Update WorkflowExecutor.java --- .../conductor/sdk/workflow/executor/WorkflowExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index 485b218746..0089802b79 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -181,7 +181,8 @@ public CompletableFuture executeWorkflow(String name, Integer version, } public CompletableFuture executeWorkflow(ConductorWorkflow conductorWorkflow, Object input) { - String workflowId = this.startWorkflow(conductorWorkflow.getName(), conductorWorkflow.getVersion(), conductorWorkflow.toWorkflowDef(), input); + String workflowId = this.startWorkflow(conductorWorkflow.getName(), conductorWorkflow.getVersion(), + conductorWorkflow.toWorkflowDef(), input); CompletableFuture future = new CompletableFuture<>(); runningWorkflowFutures.put(workflowId, future); return future; From cd1a2db8418bd7714cb0f52eac004277a7399ed8 Mon Sep 17 00:00:00 2001 From: huangxiao <68590897+youngledo@users.noreply.github.com> Date: Fri, 1 Dec 2023 19:42:09 +0800 Subject: [PATCH 5/6] Update WorkflowExecutor.java --- .../workflow/executor/WorkflowExecutor.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index 0089802b79..2038f1089b 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -12,16 +12,8 @@ */ package com.netflix.conductor.sdk.workflow.executor; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.conductor.client.http.MetadataClient; import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.client.http.WorkflowClient; @@ -34,12 +26,18 @@ import com.netflix.conductor.sdk.workflow.def.tasks.*; import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor; import com.netflix.conductor.sdk.workflow.utils.ObjectMapperProvider; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import com.sun.jersey.api.client.ClientHandler; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.filter.ClientFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; public class WorkflowExecutor { @@ -154,7 +152,8 @@ public void initWorkers(String packagesToScan) { annotatedWorkerExecutor.initWorkers(packagesToScan); } - private String startWorkflow(String name, Integer version, WorkflowDef workflowDef, Object input) { + private String startWorkflow( + String name, Integer version, WorkflowDef workflowDef, Object input) { Map inputMap = objectMapper.convertValue(input, Map.class); StartWorkflowRequest request = new StartWorkflowRequest(); @@ -180,9 +179,14 @@ public CompletableFuture executeWorkflow(String name, Integer version, return future; } - public CompletableFuture executeWorkflow(ConductorWorkflow conductorWorkflow, Object input) { - String workflowId = this.startWorkflow(conductorWorkflow.getName(), conductorWorkflow.getVersion(), - conductorWorkflow.toWorkflowDef(), input); + public CompletableFuture executeWorkflow( + ConductorWorkflow conductorWorkflow, Object input) { + String workflowId = + this.startWorkflow( + conductorWorkflow.getName(), + conductorWorkflow.getVersion(), + conductorWorkflow.toWorkflowDef(), + input); CompletableFuture future = new CompletableFuture<>(); runningWorkflowFutures.put(workflowId, future); return future; From b76f1ee161f8ec60ba5893ce45fcfe3ac19df568 Mon Sep 17 00:00:00 2001 From: huangxiao <68590897+youngledo@users.noreply.github.com> Date: Fri, 1 Dec 2023 19:50:08 +0800 Subject: [PATCH 6/6] Update WorkflowExecutor.java --- .../workflow/executor/WorkflowExecutor.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index 2038f1089b..9d62cb0524 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -12,8 +12,16 @@ */ package com.netflix.conductor.sdk.workflow.executor; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.netflix.conductor.client.http.MetadataClient; import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.client.http.WorkflowClient; @@ -26,18 +34,12 @@ import com.netflix.conductor.sdk.workflow.def.tasks.*; import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor; import com.netflix.conductor.sdk.workflow.utils.ObjectMapperProvider; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.sun.jersey.api.client.ClientHandler; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.filter.ClientFilter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; public class WorkflowExecutor {