Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #900 from Netflix/feature/grpc_and_rest_to_use_sha…
Browse files Browse the repository at this point in the history
…red_service_layer

Feature/grpc and rest to use shared service layer
  • Loading branch information
Alex committed Dec 18, 2018
2 parents eccf466 + 3232f77 commit 163f88f
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 234 deletions.
26 changes: 23 additions & 3 deletions core/src/main/java/com/netflix/conductor/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,29 @@ public String updateTask(TaskResult taskResult) {
* @return `true|false` if task if received or not
*/
public String ackTaskReceived(String taskId, String workerId) {
ServiceUtils.checkNotNullOrEmpty(taskId, "TaskId cannot be null or empty.");
LOGGER.debug("Ack received for task: {} from worker: {}", taskId, workerId);
return String.valueOf(ackTaskReceived(taskId));
}

/**
* Ack Task is received.
*
* @param taskId Id of the task
* @return `true|false` if task if received or not
*/
public boolean ackTaskReceived(String taskId) {
ServiceUtils.checkNotNullOrEmpty(taskId, "TaskId cannot be null or empty.");
LOGGER.debug("Ack received for task: {}", taskId);
boolean ackResult;
try {
ackResult = executionService.ackTaskReceived(taskId);
} catch (Exception e) {
// safe to ignore exception here, since the task will not be processed by the worker due to ack failure
// The task will eventually be available to be polled again after the unack timeout
LOGGER.error("Exception when trying to ack task {} from worker {}", taskId, workerId, e);
LOGGER.error("Exception when trying to ack task {}", taskId, e);
ackResult = false;
}
return String.valueOf(ackResult);
return ackResult;
}

/**
Expand Down Expand Up @@ -206,6 +217,15 @@ public Task getTask(String taskId) {
*/
public void removeTaskFromQueue(String taskType, String taskId) {
ServiceUtils.checkNotNullOrEmpty(taskType, "TaskType cannot be null or empty.");
removeTaskFromQueue(taskId);
}

/**
* Remove Task from a Task type queue.
*
* @param taskId ID of the task
*/
public void removeTaskFromQueue(String taskId) {
ServiceUtils.checkNotNullOrEmpty(taskId, "TaskId cannot be null or empty.");
executionService.removeTaskfromQueue(taskId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.google.inject.Singleton;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.service.common.BulkResponse;
import com.netflix.conductor.service.utils.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.List;
Expand All @@ -29,48 +32,124 @@ public class WorkflowBulkService {
private static final int MAX_REQUEST_ITEMS = 1000;
private final WorkflowExecutor workflowExecutor;

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowBulkService.class);

@Inject
public WorkflowBulkService(WorkflowExecutor workflowExecutor) {
this.workflowExecutor = workflowExecutor;
}

public void pauseWorkflow(List<String> workflowIds) {
/**
* Pause the list of workflows.
* @param workflowIds - list of workflow Ids to perform pause operation on
* @return bulk response object containing a list of succeeded workflows and a list of failed ones with errors
*/
public BulkResponse pauseWorkflow(List<String> workflowIds) {
ServiceUtils.checkNotNullOrEmpty(workflowIds, "WorkflowIds list cannot be null.");
ServiceUtils.checkArgument(workflowIds.size() < MAX_REQUEST_ITEMS, String.format("Cannot process more than %s workflows. Please use multiple requests", MAX_REQUEST_ITEMS));
for (String workflowId : workflowIds) {
workflowExecutor.pauseWorkflow(workflowId);
}

BulkResponse bulkResponse = new BulkResponse();
for (String workflowId : workflowIds) {
try {
workflowExecutor.pauseWorkflow(workflowId);
bulkResponse.appendSuccessResponse(workflowId);
} catch (Exception e) {
LOGGER.error("bulk pauseWorkflow exception, workflowId {}, message: {} ",workflowId, e.getMessage(), e);
bulkResponse.appendFailedResponse(workflowId, e.getMessage());
}
}

return bulkResponse;
}

public void resumeWorkflow(List<String> workflowIds) {
/**
* Resume the list of workflows.
* @param workflowIds - list of workflow Ids to perform resume operation on
* @return bulk response object containing a list of succeeded workflows and a list of failed ones with errors
*/
public BulkResponse resumeWorkflow(List<String> workflowIds) {
ServiceUtils.checkNotNullOrEmpty(workflowIds, "WorkflowIds list cannot be null.");
ServiceUtils.checkArgument(workflowIds.size() < MAX_REQUEST_ITEMS, String.format("Cannot process more than %s workflows. Please use multiple requests", MAX_REQUEST_ITEMS));

BulkResponse bulkResponse = new BulkResponse();
for (String workflowId : workflowIds) {
workflowExecutor.resumeWorkflow(workflowId);
try {
workflowExecutor.resumeWorkflow(workflowId);
bulkResponse.appendSuccessResponse(workflowId);
} catch (Exception e) {
LOGGER.error("bulk resumeWorkflow exception, workflowId {}, message: {} ",workflowId, e.getMessage(), e);
bulkResponse.appendFailedResponse(workflowId, e.getMessage());
}
}
return bulkResponse;
}

public void restart(List<String> workflowIds, boolean useLatestDefinitions) {
/**
* Restart the list of workflows.
*
* @param workflowIds - list of workflow Ids to perform restart operation on
* @param useLatestDefinitions if true, use latest workflow and task definitions upon restart
* @return bulk response object containing a list of succeeded workflows and a list of failed ones with errors
*/
public BulkResponse restart(List<String> workflowIds, boolean useLatestDefinitions) {
ServiceUtils.checkNotNullOrEmpty(workflowIds, "WorkflowIds list cannot be null.");
ServiceUtils.checkArgument(workflowIds.size() < MAX_REQUEST_ITEMS, String.format("Cannot process more than %s workflows. Please use multiple requests", MAX_REQUEST_ITEMS));
BulkResponse bulkResponse = new BulkResponse();
for (String workflowId : workflowIds) {
workflowExecutor.rewind(workflowId, useLatestDefinitions);
try {
workflowExecutor.rewind(workflowId, useLatestDefinitions);
bulkResponse.appendSuccessResponse(workflowId);
} catch (Exception e) {
LOGGER.error("bulk restart exception, workflowId {}, message: {} ",workflowId, e.getMessage(), e);
bulkResponse.appendFailedResponse(workflowId, e.getMessage());
}
}
return bulkResponse;
}

public void retry(List<String> workflowIds) {
/**
* Retry the last failed task for each workflow from the list.
* @param workflowIds - list of workflow Ids to perform retry operation on
* @return bulk response object containing a list of succeeded workflows and a list of failed ones with errors
*/
public BulkResponse retry(List<String> workflowIds) {
ServiceUtils.checkNotNullOrEmpty(workflowIds, "WorkflowIds list cannot be null.");
ServiceUtils.checkArgument(workflowIds.size() < MAX_REQUEST_ITEMS, String.format("Cannot process more than %s workflows. Please use multiple requests", MAX_REQUEST_ITEMS));
BulkResponse bulkResponse = new BulkResponse();
for (String workflowId : workflowIds) {
workflowExecutor.retry(workflowId);
try {
workflowExecutor.retry(workflowId);
bulkResponse.appendSuccessResponse(workflowId);
} catch (Exception e) {
LOGGER.error("bulk retry exception, workflowId {}, message: {} ",workflowId, e.getMessage(), e);
bulkResponse.appendFailedResponse(workflowId, e.getMessage());
}
}
return bulkResponse;
}

public void terminate(List<String> workflowIds, String reason) {
/**
* Terminate workflows execution.
* @param workflowIds - list of workflow Ids to perform terminate operation on
* @param reason - description to be specified for the terminated workflow for future references.
* @return bulk response object containing a list of succeeded workflows and a list of failed ones with errors
*/
public BulkResponse terminate(List<String> workflowIds, String reason) {
ServiceUtils.checkNotNullOrEmpty(workflowIds, "workflowIds list cannot be null.");
ServiceUtils.checkArgument(workflowIds.size() < MAX_REQUEST_ITEMS, String.format("Cannot process more than %s workflows. Please use multiple requests", MAX_REQUEST_ITEMS));
BulkResponse bulkResponse = new BulkResponse();
for (String workflowId : workflowIds) {
workflowExecutor.terminateWorkflow(workflowId, reason);
try {
workflowExecutor.terminateWorkflow(workflowId, reason);
bulkResponse.appendSuccessResponse(workflowId);
} catch (Exception e) {
LOGGER.error("bulk terminate exception, workflowId {}, message: {} ",workflowId, e.getMessage(), e);
bulkResponse.appendFailedResponse(workflowId, e.getMessage());
}
}
return bulkResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

@Singleton
@Trace
Expand All @@ -54,7 +55,7 @@ public WorkflowService(WorkflowExecutor workflowExecutor, ExecutionService execu
this.workflowExecutor = workflowExecutor;
this.executionService = executionService;
this.metadataService = metadataService;
this.maxSearchSize = config.getIntProperty("workflow.max.search.size", 5_000);
this.maxSearchSize = config.getIntProperty("workflow.max.search.size", 5000);
}

/**
Expand All @@ -65,35 +66,51 @@ public WorkflowService(WorkflowExecutor workflowExecutor, ExecutionService execu
*/
public String startWorkflow(StartWorkflowRequest startWorkflowRequest) {
ServiceUtils.checkNotNull(startWorkflowRequest, "StartWorkflowRequest cannot be null");
ServiceUtils.checkNotNullOrEmpty(startWorkflowRequest.getName(), "Workflow name cannot be null or empty");
return startWorkflow(startWorkflowRequest.getName(), startWorkflowRequest.getVersion(), startWorkflowRequest.getCorrelationId(), startWorkflowRequest.getInput(),
startWorkflowRequest.getExternalInputPayloadStoragePath(), startWorkflowRequest.getTaskToDomain(), startWorkflowRequest.getWorkflowDef());
}

WorkflowDef workflowDef = startWorkflowRequest.getWorkflowDef();
/**
* Start a new workflow with StartWorkflowRequest, which allows task to be executed in a domain.
* @param name Name of the workflow you want to start.
* @param version Version of the workflow you want to start.
* @param correlationId CorrelationID of the workflow you want to start.
* @param input Input to the workflow you want to start.
* @param externalInputPayloadStoragePath
* @param taskToDomain
* @param workflowDef - workflow definition
* @return the id of the workflow instance that can be use for tracking.
*/
public String startWorkflow(String name, Integer version, String correlationId, Map<String, Object> input,
String externalInputPayloadStoragePath, Map<String, String> taskToDomain, WorkflowDef workflowDef) {

ServiceUtils.checkNotNullOrEmpty(name, "Workflow name cannot be null or empty");

if (workflowDef == null) {
workflowDef = metadataService.getWorkflowDef(startWorkflowRequest.getName(), startWorkflowRequest.getVersion());
workflowDef = metadataService.getWorkflowDef(name, version);
if (workflowDef == null) {
throw new ApplicationException(ApplicationException.Code.NOT_FOUND,
String.format("No such workflow found by name: %s, version: %d", startWorkflowRequest.getName(),
startWorkflowRequest.getVersion()));
String.format("No such workflow found by name: %s, version: %d", name,
version));
}

return workflowExecutor.startWorkflow(
startWorkflowRequest.getName(),
startWorkflowRequest.getVersion(),
startWorkflowRequest.getCorrelationId(),
startWorkflowRequest.getInput(),
startWorkflowRequest.getExternalInputPayloadStoragePath(),
name,
version,
correlationId,
input,
externalInputPayloadStoragePath,
null,
startWorkflowRequest.getTaskToDomain()
taskToDomain
);
} else {
return workflowExecutor.startWorkflow(
startWorkflowRequest.getWorkflowDef(),
startWorkflowRequest.getInput(),
startWorkflowRequest.getExternalInputPayloadStoragePath(),
startWorkflowRequest.getCorrelationId(),
workflowDef,
input,
externalInputPayloadStoragePath,
correlationId,
null,
startWorkflowRequest.getTaskToDomain()
taskToDomain
);
}
}
Expand Down Expand Up @@ -189,7 +206,7 @@ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) {
public List<String> getRunningWorkflows(String workflowName, Integer version,
Long startTime, Long endTime) {
ServiceUtils.checkNotNullOrEmpty(workflowName,"Workflow name cannot be null or empty.");
if (startTime != null && endTime != null) {
if (Optional.ofNullable(startTime).orElse(0l) != 0 && Optional.ofNullable(endTime).orElse(0l) != 0) {
return workflowExecutor.getWorkflows(workflowName, version, startTime, endTime);
} else {
return workflowExecutor.getRunningWorkflowIds(workflowName);
Expand Down Expand Up @@ -299,11 +316,27 @@ public void terminateWorkflow(String workflowId, String reason) {
* @return instance of {@link SearchResult}
*/
public SearchResult<WorkflowSummary> searchWorkflows(int start, int size, String sort, String freeText, String query) {
ServiceUtils.checkArgument(size < maxSearchSize, String.format("Cannot return more than %d workflows." +
ServiceUtils.checkArgument(size <= maxSearchSize, String.format("Cannot return more than %d workflows." +
" Please use pagination.", maxSearchSize));
return executionService.search(query, freeText, start, size, ServiceUtils.convertStringToList(sort));
}

/**
* Search for workflows based on payload and given parameters. Use sort options as sort ASCor DESC
* e.g. sort=name or sort=workflowId:DESC. If order is not specified, defaults to ASC.
* @param start Start index of pagination
* @param size Number of entries
* @param sort list of sorting options, separated by "|" delimiter
* @param freeText Text you want to search
* @param query Query you want to search
* @return instance of {@link SearchResult}
*/
public SearchResult<WorkflowSummary> searchWorkflows(int start, int size, List<String> sort, String freeText, String query) {
ServiceUtils.checkArgument(size <= maxSearchSize, String.format("Cannot return more than %d workflows." +
" Please use pagination.", maxSearchSize));
return executionService.search(query, freeText, start, size, sort);
}

/**
* Search for workflows based on task parameters. Use sort options as sort ASC or DESC e.g.
* sort=name or sort=workflowId:DESC. If order is not specified, defaults to ASC.
Expand All @@ -318,6 +351,20 @@ public SearchResult<WorkflowSummary> searchWorkflowsByTasks(int start, int size,
return executionService.searchWorkflowByTasks(query, freeText, start, size, ServiceUtils.convertStringToList(sort));
}

/**
* Search for workflows based on task parameters. Use sort options as sort ASC or DESC e.g.
* sort=name or sort=workflowId:DESC. If order is not specified, defaults to ASC.
* @param start Start index of pagination
* @param size Number of entries
* @param sort list of sorting options, separated by "|" delimiter
* @param freeText Text you want to search
* @param query Query you want to search
* @return instance of {@link SearchResult}
*/
public SearchResult<WorkflowSummary> searchWorkflowsByTasks(int start, int size, List<String> sort, String freeText, String query) {
return executionService.searchWorkflowByTasks(query, freeText, start, size, sort);
}

/**
* Get the external storage location where the workflow input payload is stored/to be stored
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.netflix.conductor.server.common;
package com.netflix.conductor.service.common;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down
Loading

0 comments on commit 163f88f

Please sign in to comment.