Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
add support for domains when starting workflows with event handler
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Jan 10, 2019
1 parent e77a6f3 commit ee34e7b
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 5 deletions.
Expand Up @@ -339,6 +339,9 @@ public static class StartWorkflow {
@ProtoField(id = 5)
private Any inputMessage;

@ProtoField(id = 6)
private Map<String, String> taskToDomain;

/**
* @return the name
*/
Expand Down Expand Up @@ -407,6 +410,14 @@ public Any getInputMessage() {
public void setInputMessage(Any inputMessage) {
this.inputMessage = inputMessage;
}

public Map<String, String> getTaskToDomain() {
return taskToDomain;
}

public void setTaskToDomain(Map<String, String> taskToDomain) {
this.taskToDomain = taskToDomain;
}
}

}
Expand Up @@ -130,7 +130,7 @@ private Map<String, Object> startWorkflow(Action action, Object payload, String
workflowInput.put("conductor.event.messageId", messageId);
workflowInput.put("conductor.event.name", event);

String id = executor.startWorkflow(params.getName(), params.getVersion(), params.getCorrelationId(), workflowInput, null, event);
String id = executor.startWorkflow(params.getName(), params.getVersion(), params.getCorrelationId(), workflowInput, null, event, params.getTaskToDomain());
output.put("workflowId", id);

} catch (RuntimeException e) {
Expand Down
Expand Up @@ -32,12 +32,14 @@
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -61,6 +63,10 @@ public void testStartWorkflow() throws Exception {
startWorkflow.setName("testWorkflow");
startWorkflow.getInput().put("testInput", "${testId}");

Map<String, String> taskToDomain = new HashMap<>();
taskToDomain.put("*", "dev");
startWorkflow.setTaskToDomain(taskToDomain);

Action action = new Action();
action.setAction(Type.start_workflow);
action.setStart_workflow(startWorkflow);
Expand All @@ -71,7 +77,7 @@ public void testStartWorkflow() throws Exception {
workflowDef.setName("testWorkflow");
workflowDef.setVersion(1);

when(workflowExecutor.startWorkflow(eq("testWorkflow"), eq(null), any(), any(), any(), eq("testEvent")))
when(workflowExecutor.startWorkflow(eq("testWorkflow"), eq(null), any(), any(), any(), eq("testEvent"), anyMap()))
.thenReturn("workflow_1");

Map<String, Object> output = actionProcessor.execute(action, payload, "testEvent", "testMessage");
Expand All @@ -80,10 +86,12 @@ public void testStartWorkflow() throws Exception {
assertEquals("workflow_1", output.get("workflowId"));

ArgumentCaptor<Map> argumentCaptor = ArgumentCaptor.forClass(Map.class);
verify(workflowExecutor).startWorkflow(eq("testWorkflow"), eq(null), any(), argumentCaptor.capture(), any(), eq("testEvent"));
ArgumentCaptor<Map> captor = ArgumentCaptor.forClass(Map.class);
verify(workflowExecutor).startWorkflow(eq("testWorkflow"), eq(null), any(), argumentCaptor.capture(), any(), eq("testEvent"), captor.capture());
assertEquals("test_1", argumentCaptor.getValue().get("testInput"));
assertEquals("testMessage", argumentCaptor.getValue().get("conductor.event.messageId"));
assertEquals("testEvent", argumentCaptor.getValue().get("conductor.event.name"));
assertEquals(taskToDomain, captor.getValue());
}

@Test
Expand Down
Expand Up @@ -113,11 +113,15 @@ public void testEventProcessor() {
eventHandler.setName(UUID.randomUUID().toString());
eventHandler.setActive(true);

Map<String, String> taskToDomain = new HashMap<>();
taskToDomain.put("*", "dev");

Action startWorkflowAction = new Action();
startWorkflowAction.setAction(Type.start_workflow);
startWorkflowAction.setStart_workflow(new StartWorkflow());
startWorkflowAction.getStart_workflow().setName("workflow_x");
startWorkflowAction.getStart_workflow().setVersion(1);
startWorkflowAction.getStart_workflow().setTaskToDomain(taskToDomain);
eventHandler.getActions().add(startWorkflowAction);

Action completeTaskAction = new Action();
Expand All @@ -140,7 +144,7 @@ public void testEventProcessor() {
doAnswer((Answer<String>) invocation -> {
started.set(true);
return id;
}).when(workflowExecutor).startWorkflow(startWorkflowAction.getStart_workflow().getName(), startWorkflowAction.getStart_workflow().getVersion(), startWorkflowAction.getStart_workflow().getCorrelationId(), startWorkflowAction.getStart_workflow().getInput(), null, event);
}).when(workflowExecutor).startWorkflow(startWorkflowAction.getStart_workflow().getName(), startWorkflowAction.getStart_workflow().getVersion(), startWorkflowAction.getStart_workflow().getCorrelationId(), startWorkflowAction.getStart_workflow().getInput(), null, event, taskToDomain);

AtomicBoolean completed = new AtomicBoolean(false);
doAnswer((Answer<String>) invocation -> {
Expand Down Expand Up @@ -212,7 +216,7 @@ public void testEventHandlerWithCondition() {
doAnswer((Answer<String>) invocation -> {
started.set(true);
return id;
}).when(workflowExecutor).startWorkflow(startWorkflowAction.getStart_workflow().getName(), startWorkflowAction.getStart_workflow().getVersion(), startWorkflowAction.getStart_workflow().getCorrelationId(), startWorkflowAction.getStart_workflow().getInput(), null, event);
}).when(workflowExecutor).startWorkflow(startWorkflowAction.getStart_workflow().getName(), startWorkflowAction.getStart_workflow().getVersion(), startWorkflowAction.getStart_workflow().getCorrelationId(), startWorkflowAction.getStart_workflow().getInput(), null, event, null);

WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName(startWorkflowAction.getStart_workflow().getName());
Expand Down
Expand Up @@ -213,6 +213,7 @@ public EventHandlerPb.EventHandler.StartWorkflow toProto(EventHandler.StartWorkf
if (from.getInputMessage() != null) {
to.setInputMessage( toProto( from.getInputMessage() ) );
}
to.putAllTaskToDomain( from.getTaskToDomain() );
return to.build();
}

Expand All @@ -229,6 +230,7 @@ public EventHandler.StartWorkflow fromProto(EventHandlerPb.EventHandler.StartWor
if (from.hasInputMessage()) {
to.setInputMessage( fromProto( from.getInputMessage() ) );
}
to.setTaskToDomain( from.getTaskToDomainMap() );
return to;
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/eventhandler.proto
Expand Up @@ -15,6 +15,7 @@ message EventHandler {
string correlation_id = 3;
map<string, google.protobuf.Value> input = 4;
google.protobuf.Any input_message = 5;
map<string, string> task_to_domain = 6;
}
message TaskDetails {
string workflow_id = 1;
Expand Down

0 comments on commit ee34e7b

Please sign in to comment.