Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
enabled JSONPath evaluation of correlation id in EventHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed May 17, 2019
1 parent 7d3e1df commit 02c93b5
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 174 deletions.
Expand Up @@ -24,9 +24,9 @@
import com.google.inject.multibindings.StringMapKey;
import com.google.inject.name.Named;
import com.netflix.conductor.core.events.ActionProcessor;
import com.netflix.conductor.core.events.ActionProcessorImpl;
import com.netflix.conductor.core.events.SimpleActionProcessor;
import com.netflix.conductor.core.events.EventProcessor;
import com.netflix.conductor.core.events.EventProcessorImpl;
import com.netflix.conductor.core.events.SimpleEventProcessor;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.EventQueues;
import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider;
Expand Down Expand Up @@ -77,14 +77,14 @@ public JsonUtils getJsonUtils() {
@Provides
@Singleton
public ActionProcessor getActionProcessor(WorkflowExecutor executor, ParametersUtils parametersUtils, JsonUtils jsonUtils) {
return new ActionProcessorImpl(executor, parametersUtils, jsonUtils);
return new SimpleActionProcessor(executor, parametersUtils, jsonUtils);
}

@Provides
@Singleton
public EventProcessor getEventProcessor(ExecutionService executionService, MetadataService metadataService,
ActionProcessor actionProcessor, EventQueues eventQueues, JsonUtils jsonUtils, Configuration configuration) {
return new EventProcessorImpl(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, configuration);
return new SimpleEventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, configuration);
}

@ProvidesIntoMap
Expand Down

This file was deleted.

@@ -0,0 +1,153 @@
/*
* Copyright 2017 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.conductor.core.events;

import com.netflix.conductor.common.metadata.events.EventHandler.Action;
import com.netflix.conductor.common.metadata.events.EventHandler.StartWorkflow;
import com.netflix.conductor.common.metadata.events.EventHandler.TaskDetails;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.JsonUtils;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.inject.Inject;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Viren
* Action Processor subscribes to the Event Actions queue and processes the actions (e.g. start workflow etc)
*/
public class SimpleActionProcessor implements ActionProcessor {
private static final Logger logger = LoggerFactory.getLogger(SimpleActionProcessor.class);

private final WorkflowExecutor executor;
private final ParametersUtils parametersUtils;
private final JsonUtils jsonUtils;

@Inject
public SimpleActionProcessor(WorkflowExecutor executor, ParametersUtils parametersUtils, JsonUtils jsonUtils) {
this.executor = executor;
this.parametersUtils = parametersUtils;
this.jsonUtils = jsonUtils;
}

public Map<String, Object> execute(Action action, Object payloadObject, String event, String messageId) {

logger.debug("Executing action: {} for event: {} with messageId:{}", action.getAction(), event, messageId);

Object jsonObject = payloadObject;
if (action.isExpandInlineJSON()) {
jsonObject = jsonUtils.expand(payloadObject);
}

switch (action.getAction()) {
case start_workflow:
return startWorkflow(action, jsonObject, event, messageId);
case complete_task:
return completeTask(action, jsonObject, action.getComplete_task(), Status.COMPLETED, event, messageId);
case fail_task:
return completeTask(action, jsonObject, action.getFail_task(), Status.FAILED, event, messageId);
default:
break;
}
throw new UnsupportedOperationException("Action not supported " + action.getAction() + " for event " + event);
}

private Map<String, Object> completeTask(Action action, Object payload, TaskDetails taskDetails, Status status, String event, String messageId) {

Map<String, Object> input = new HashMap<>();
input.put("workflowId", taskDetails.getWorkflowId());
input.put("taskId", taskDetails.getTaskId());
input.put("taskRefName", taskDetails.getTaskRefName());
input.putAll(taskDetails.getOutput());

Map<String, Object> replaced = parametersUtils.replace(input, payload);
String workflowId = (String) replaced.get("workflowId");
String taskId = (String) replaced.get("taskId");
String taskRefName = (String) replaced.get("taskRefName");

Task task = null;
if (StringUtils.isNotEmpty(taskId)) {
task = executor.getTask(taskId);
} else if (StringUtils.isNotEmpty(workflowId) && StringUtils.isNotEmpty(taskRefName)) {
Workflow workflow = executor.getWorkflow(workflowId, true);
if (workflow == null) {
replaced.put("error", "No workflow found with ID: " + workflowId);
return replaced;
}
task = workflow.getTaskByRefName(taskRefName);
}

if (task == null) {
replaced.put("error", "No task found with taskId: " + taskId + ", reference name: " + taskRefName + ", workflowId: " + workflowId);
return replaced;
}

task.setStatus(status);
task.setOutputData(replaced);
task.setOutputMessage(taskDetails.getOutputMessage());
task.getOutputData().put("conductor.event.messageId", messageId);
task.getOutputData().put("conductor.event.name", event);

try {
executor.updateTask(new TaskResult(task));
} catch (RuntimeException e) {
logger.error("Error updating task: {} in workflow: {} in action: {} for event: {} for message: {}", taskDetails.getTaskRefName(), taskDetails.getWorkflowId(), action.getAction(), event, messageId, e);
replaced.put("error", e.getMessage());
throw e;
}
return replaced;
}

private Map<String, Object> startWorkflow(Action action, Object payload, String event, String messageId) {
StartWorkflow params = action.getStart_workflow();
Map<String, Object> output = new HashMap<>();
try {
Map<String, Object> inputParams = params.getInput();
Map<String, Object> workflowInput = parametersUtils.replace(inputParams, payload);

Map<String, Object> paramsMap = new HashMap<>();
Optional.ofNullable(params.getCorrelationId())
.ifPresent(value -> paramsMap.put("correlationId", value));
Map<String, Object> replaced = parametersUtils.replace(paramsMap, payload);

workflowInput.put("conductor.event.messageId", messageId);
workflowInput.put("conductor.event.name", event);

String id = executor.startWorkflow(params.getName(), params.getVersion(),
Optional.ofNullable(replaced.get("correlationId")).map(Object::toString)
.orElse(params.getCorrelationId()),
workflowInput, null, event, params.getTaskToDomain());
output.put("workflowId", id);

} catch (RuntimeException e) {
logger.error("Error starting workflow: {}, version: {}, for event: {} for message: {}", params.getName(), params.getVersion(), event, messageId, e);
output.put("error", e.getMessage());
throw e;
}
return output;
}
}
Expand Up @@ -55,10 +55,10 @@
* @author Viren
* Event Processor is used to dispatch actions based on the incoming events to execution queue.
*/
public class EventProcessorImpl implements EventProcessor{
public class SimpleEventProcessor implements EventProcessor{

private static final Logger logger = LoggerFactory.getLogger(EventProcessorImpl.class);
private static final String className = EventProcessorImpl.class.getSimpleName();
private static final Logger logger = LoggerFactory.getLogger(SimpleEventProcessor.class);
private static final String className = SimpleEventProcessor.class.getSimpleName();
private static final int RETRY_COUNT = 3;


Expand All @@ -73,8 +73,8 @@ public class EventProcessorImpl implements EventProcessor{
private final JsonUtils jsonUtils;

@Inject
public EventProcessorImpl(ExecutionService executionService, MetadataService metadataService,
ActionProcessor actionProcessor, EventQueues eventQueues, JsonUtils jsonUtils, Configuration config) {
public SimpleEventProcessor(ExecutionService executionService, MetadataService metadataService,
ActionProcessor actionProcessor, EventQueues eventQueues, JsonUtils jsonUtils, Configuration config) {
this.executionService = executionService;
this.metadataService = metadataService;
this.actionProcessor = actionProcessor;
Expand Down

0 comments on commit 02c93b5

Please sign in to comment.