Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
remove deprecated client APIs and fix start workflow from event
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Oct 19, 2018
1 parent c428a7d commit 458f160
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 239 deletions.
Expand Up @@ -117,15 +117,6 @@ public WorkflowDef getWorkflowDef(String name, Integer version) {
return getForEntity("metadata/workflow/{name}", new Object[]{"version", version}, WorkflowDef.class, name);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* This API can return 503 for a large number of workflow definitions because of no pagination.
*/
@Deprecated
public List<WorkflowDef> getAllWorkflowDefs() {
return getForEntity("metadata/workflow", null, workflowDefList);
}

/**
* Removes the workflow definition of a workflow from the conductor server.
* It does not remove associated workflows. Use with caution.
Expand Down Expand Up @@ -161,15 +152,6 @@ public void updateTaskDef(TaskDef taskDef) {
put("metadata/taskdefs", null, taskDef);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* This API can return 503 for a large number of workflow definitions because of no pagination.
*/
@Deprecated
public List<TaskDef> getAllTaskDefs() {
return getForEntity("metadata/taskdefs", null, taskDefList);
}

/**
* Retrieve the task definition of a given task type
*
Expand Down
Expand Up @@ -22,7 +22,6 @@
import com.netflix.conductor.client.task.WorkflowTaskMetrics;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.run.SearchResult;
Expand Down Expand Up @@ -53,9 +52,6 @@ public class TaskClient extends ClientBase {
private static GenericType<List<Task>> taskList = new GenericType<List<Task>>() {
};

private static GenericType<List<TaskDef>> taskDefList = new GenericType<List<TaskDef>>() {
};

private static GenericType<List<TaskExecLog>> taskExecLogList = new GenericType<List<TaskExecLog>>() {
};

Expand All @@ -65,6 +61,9 @@ public class TaskClient extends ClientBase {
private static GenericType<SearchResult<TaskSummary>> searchResultTaskSummary = new GenericType<SearchResult<TaskSummary>>() {
};

private static GenericType<Map<String, Integer>> queueSizeMap = new GenericType<Map<String, Integer>>() {
};

private static final Logger logger = LoggerFactory.getLogger(TaskClient.class);

/**
Expand Down Expand Up @@ -130,15 +129,6 @@ public Task pollTask(String taskType, String workerId, String domain) {
return task;
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link #batchPollTasksByTaskType(String, String, int, int)} instead
*/
@Deprecated
public List<Task> poll(String taskType, String workerId, int count, int timeoutInMillisecond) {
return batchPollTasksByTaskType(taskType, workerId, count, timeoutInMillisecond);
}

/**
* Perform a batch poll for tasks by task type. Batch size is configurable by count.
*
Expand All @@ -159,15 +149,6 @@ public List<Task> batchPollTasksByTaskType(String taskType, String workerId, int
return tasks;
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link #batchPollTasksInDomain(String, String, String, int, int)} instead
*/
@Deprecated
public List<Task> poll(String taskType, String domain, String workerId, int count, int timeoutInMillisecond) {
return batchPollTasksInDomain(taskType, domain, workerId, count, timeoutInMillisecond);
}

/**
* Batch poll for tasks in a domain. Batch size is configurable by count.
*
Expand Down Expand Up @@ -202,15 +183,6 @@ private void populateTaskInput(Task task) {
}
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link #getPendingTasksByType(String, String, Integer)} instead
*/
@Deprecated
public List<Task> getTasks(String taskType, String startKey, Integer count) {
return getPendingTasksByType(taskType, startKey, count);
}

/**
* Retrieve pending tasks by type
*
Expand Down Expand Up @@ -294,15 +266,6 @@ public Boolean ack(String taskId, String workerId) {
return Boolean.valueOf(response);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link #logMessageForTask(String, String)} instead
*/
@Deprecated
public void log(String taskId, String logMessage) {
logMessageForTask(taskId, logMessage);
}

/**
* Log execution messages for a task.
*
Expand All @@ -324,15 +287,6 @@ public List<TaskExecLog> getTaskLogs(String taskId) {
return getForEntity("tasks/{taskId}/log", null, taskExecLogList, taskId);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link #getTaskDetails(String)} instead
*/
@Deprecated
public Task get(String taskId) {
return getTaskDetails(taskId);
}

/**
* Retrieve information about the task
*
Expand Down Expand Up @@ -360,9 +314,9 @@ public void removeTaskFromQueue(String taskType, String taskId) {
public int getQueueSizeForTask(String taskType) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank");

Map<String, Integer> queueSizeMap = getForEntity("tasks/queue/sizes", new Object[]{"taskType", taskType}, Map.class);
if (queueSizeMap.keySet().contains(taskType)) {
return queueSizeMap.get(taskType);
Map<String, Integer> taskTypeToQueueSizeMap = getForEntity("tasks/queue/sizes", new Object[]{"taskType", taskType}, queueSizeMap);
if (taskTypeToQueueSizeMap.keySet().contains(taskType)) {
return taskTypeToQueueSizeMap.get(taskType);
}
return 0;
}
Expand Down Expand Up @@ -432,43 +386,4 @@ public SearchResult<TaskSummary> search(Integer start, Integer size, String sort
Object[] params = new Object[]{"start", start, "size", size, "sort", sort, "freeText", freeText, "query", query};
return getForEntity("tasks/search", params, searchResultTaskSummary);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link MetadataClient#getAllTaskDefs()} instead
*/
@Deprecated
public List<TaskDef> getTaskDef() {
return getForEntity("metadata/taskdefs", null, taskDefList);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link MetadataClient#getTaskDef(String)} instead
*/
@Deprecated
public TaskDef getTaskDef(String taskType) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank");
return getForEntity("metadata/taskdefs/{tasktype}", null, TaskDef.class, taskType);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link MetadataClient#unregisterTaskDef(String)} instead
*/
@Deprecated
public void unregisterTaskDef(String taskType) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank");
delete("metadata/taskdefs/{tasktype}", taskType);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link MetadataClient#registerTaskDefs(List)} instead
*/
@Deprecated
public void registerTaskDefs(List<TaskDef> taskDefs) {
Preconditions.checkNotNull(taskDefs, "Task defs cannot be null");
postForEntityWithRequestOnly("metadata/taskdefs", taskDefs);
}
}
Expand Up @@ -22,7 +22,6 @@
import com.netflix.conductor.client.task.WorkflowTaskMetrics;
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
Expand All @@ -39,17 +38,13 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;


/**
* @author Viren
*/
public class WorkflowClient extends ClientBase {

private static GenericType<List<WorkflowDef>> workflowDefList = new GenericType<List<WorkflowDef>>() {
};

private static GenericType<SearchResult<WorkflowSummary>> searchResultWorkflowSummary = new GenericType<SearchResult<WorkflowSummary>>() {
};

Expand Down Expand Up @@ -99,63 +94,6 @@ public WorkflowClient(ClientConfig config, ConductorClientConfiguration clientCo
}
}


//Metadata Operations

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link MetadataClient#getAllWorkflowDefs()} instead
*/
@Deprecated
public List<WorkflowDef> getAllWorkflowDefs() {
return getForEntity("metadata/workflow", null, workflowDefList);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link MetadataClient#registerWorkflowDef(WorkflowDef)} instead
*/
@Deprecated
public void registerWorkflow(WorkflowDef workflowDef) {
Preconditions.checkNotNull(workflowDef, "Worfklow definition cannot be null");
postForEntityWithRequestOnly("metadata/workflow", workflowDef);
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link MetadataClient#getWorkflowDef(String, Integer)} instead
*/
@Deprecated
public WorkflowDef getWorkflowDef(String name, Integer version) {
Preconditions.checkArgument(StringUtils.isNotBlank(name), "name cannot be blank");
return getForEntity("metadata/workflow/{name}", new Object[]{"version", version}, WorkflowDef.class, name);
}


//Runtime Operations

/**
* Starts a workflow identified by the name and version
*
* @param name the name of the workflow
* @param version the version of the workflow def
* @param correlationId the correlation id
* @param input the input to set in the workflow
* @return the id of the workflow instance that can be used for tracking
* @deprecated This API is deprecated and will be removed in the next version
* use {@link #startWorkflow(StartWorkflowRequest)} instead
*/
@Deprecated
public String startWorkflow(String name, Integer version, String correlationId, Map<String, Object> input) {
Preconditions.checkArgument(StringUtils.isNotBlank(name), "name cannot be blank");
StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest();
startWorkflowRequest.setName(name);
startWorkflowRequest.setVersion(version);
startWorkflowRequest.setCorrelationId(correlationId);
startWorkflowRequest.setInput(input);
return startWorkflow(startWorkflowRequest);
}

/**
* Starts a workflow.
* If the size of the workflow input payload is bigger than {@link ConductorClientConfiguration#getWorkflowInputPayloadThresholdKB()},
Expand Down Expand Up @@ -196,16 +134,6 @@ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) {
return postForEntity("workflow", startWorkflowRequest, null, String.class, startWorkflowRequest.getName());
}

/**
* @deprecated This API is deprecated and will be removed in the next version
* use {@link #getWorkflow(String, boolean)} instead
*/
@Deprecated
public Workflow getExecutionStatus(String workflowId, boolean includeTasks) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowId), "workflow id cannot be blank");
return getWorkflow(workflowId, includeTasks);
}

/**
* Retrieve a workflow by workflow id
*
Expand Down
@@ -1,3 +1,15 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.common.metadata.workflow;

import com.github.vmg.protogen.annotations.ProtoField;
Expand Down Expand Up @@ -66,10 +78,13 @@ public StartWorkflowRequest withCorrelationId(String correlationId) {
public String getExternalInputPayloadStoragePath() {
return externalInputPayloadStoragePath;
}

public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
this.externalInputPayloadStoragePath = externalInputPayloadStoragePath;
}
public StartWorkflowRequest withExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
this.externalInputPayloadStoragePath = externalInputPayloadStoragePath;
return this;
}

public Map<String, Object> getInput() {
return input;
Expand All @@ -96,11 +111,9 @@ public StartWorkflowRequest withTaskToDomain(Map<String, String> taskToDomain) {
public WorkflowDef getWorkflowDef() {
return workflowDef;
}

public void setWorkflowDef(WorkflowDef workflowDef) {
this.workflowDef = workflowDef;
}

public StartWorkflowRequest withWorkflowDef(WorkflowDef workflowDef) {
this.workflowDef = workflowDef;
return this;
Expand Down
Expand Up @@ -79,8 +79,7 @@ public Map<String, Object> execute(Action action, Object payloadObject, String e
throw new UnsupportedOperationException("Action not supported " + action.getAction() + " for event " + event);
}

@VisibleForTesting
Map<String, Object> completeTask(Action action, Object payload, TaskDetails taskDetails, Status status, String event, String messageId) {
private Map<String, Object> completeTask(Action action, Object payload, TaskDetails taskDetails, Status status, String event, String messageId) {

Map<String, Object> input = new HashMap<>();
input.put("workflowId", taskDetails.getWorkflowId());
Expand Down Expand Up @@ -135,7 +134,7 @@ private Map<String, Object> startWorkflow(Action action, Object payload, String
workflowInput.put("conductor.event.messageId", messageId);
workflowInput.put("conductor.event.name", event);

String id = executor.startWorkflow(params.getName(), params.getVersion(), params.getCorrelationId(), workflowInput, event);
String id = executor.startWorkflow(params.getName(), params.getVersion(), params.getCorrelationId(), workflowInput, null, event);
output.put("workflowId", id);

} catch (RuntimeException e) {
Expand Down
Expand Up @@ -53,7 +53,7 @@ public void testStartWorkflow() throws Exception {
workflowDef.setName("testWorkflow");
workflowDef.setVersion(1);

when(workflowExecutor.startWorkflow(eq("testWorkflow"), eq(null), any(), any(), eq("testEvent")))
when(workflowExecutor.startWorkflow(eq("testWorkflow"), eq(null), any(), any(), any(), eq("testEvent")))
.thenReturn("workflow_1");

Map<String, Object> output = actionProcessor.execute(action, payload, "testEvent", "testMessage");
Expand All @@ -62,7 +62,7 @@ public void testStartWorkflow() throws Exception {
assertEquals("workflow_1", output.get("workflowId"));

ArgumentCaptor<Map> argumentCaptor = ArgumentCaptor.forClass(Map.class);
verify(workflowExecutor).startWorkflow(eq("testWorkflow"), eq(null), any(), argumentCaptor.capture(), eq("testEvent"));
verify(workflowExecutor).startWorkflow(eq("testWorkflow"), eq(null), any(), argumentCaptor.capture(), any(), eq("testEvent"));
assertEquals("test_1", argumentCaptor.getValue().get("testInput"));
assertEquals("testMessage", argumentCaptor.getValue().get("conductor.event.messageId"));
assertEquals("testEvent", argumentCaptor.getValue().get("conductor.event.name"));
Expand Down

0 comments on commit 458f160

Please sign in to comment.