Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Updating ParameterUtils replace to not modify inputParams inplace. Im…
Browse files Browse the repository at this point in the history
…proving logs to trace SimpleActionProcessor bug when starting workflow with incorrect payload.
  • Loading branch information
kishorebanala committed Apr 14, 2020
1 parent a9c8633 commit 798b1bc
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private Map<String, Object> startWorkflow(Action action, Object payload, String
Map<String, Object> output = new HashMap<>();
try {
Map<String, Object> inputParams = params.getInput();
logger.debug("Executing start workflow for event: {} for message: {} with inputParams: {}", event, messageId, inputParams);
Map<String, Object> workflowInput = parametersUtils.replace(inputParams, payload);

Map<String, Object> paramsMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public Map<String, Object> replace(Map<String, Object> input, Object json) {
}
Configuration option = Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS);
DocumentContext documentContext = JsonPath.parse(doc, option);
return replace(input, documentContext, null);
return replace(new HashMap<>(input), documentContext, null);
}

public Object replace(String paramString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyMapOf;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -147,7 +151,7 @@ public void testEventProcessor() {
doAnswer((Answer<String>) invocation -> {
started.set(true);
return id;
}).when(workflowExecutor).startWorkflow(startWorkflowAction.getStart_workflow().getName(), startWorkflowAction.getStart_workflow().getVersion(), startWorkflowAction.getStart_workflow().getCorrelationId(), startWorkflowAction.getStart_workflow().getInput(), null, event, taskToDomain);
}).when(workflowExecutor).startWorkflow(eq(startWorkflowAction.getStart_workflow().getName()), eq(startWorkflowAction.getStart_workflow().getVersion()), eq(startWorkflowAction.getStart_workflow().getCorrelationId()), anyMap(), eq(null), eq(event), anyMap());

AtomicBoolean completed = new AtomicBoolean(false);
doAnswer((Answer<String>) invocation -> {
Expand Down Expand Up @@ -219,7 +223,7 @@ public void testEventHandlerWithCondition() {
doAnswer((Answer<String>) invocation -> {
started.set(true);
return id;
}).when(workflowExecutor).startWorkflow(startWorkflowAction.getStart_workflow().getName(), startWorkflowAction.getStart_workflow().getVersion(), startWorkflowAction.getStart_workflow().getCorrelationId(), startWorkflowAction.getStart_workflow().getInput(), null, event, null);
}).when(workflowExecutor).startWorkflow(eq(startWorkflowAction.getStart_workflow().getName()), eq(startWorkflowAction.getStart_workflow().getVersion()), eq(startWorkflowAction.getStart_workflow().getCorrelationId()), anyMap(), eq(null), eq(event), eq(null));

WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName(startWorkflowAction.getStart_workflow().getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.conductor.core.execution;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.common.utils.JsonMapperProvider;
import com.netflix.conductor.core.utils.JsonUtils;
Expand All @@ -10,6 +11,11 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -96,4 +102,43 @@ public void testReplaceWithMapExpand() {
assertEquals("conductor", replaced.get("k4"));
assertEquals(2, replaced.get("k5"));
}

@Test
public void testReplaceConcurrent() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);

AtomicReference<String> generatedId = new AtomicReference<>("test-0");
Map<String, Object> input = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
payload.put("event", "conductor:TEST_EVENT");
payload.put("someId", generatedId);
input.put("payload", payload);
input.put("name", "conductor");
input.put("version", 2);

Map<String, Object> inputParams = new HashMap<>();
inputParams.put("k1", "${payload.someId}");
inputParams.put("k2", "${name}");

CompletableFuture.runAsync(() -> {
for (int i = 0; i < 10000; i++) {
generatedId.set("test-" + i);
payload.put("someId", generatedId.get());
Object jsonObj = null;
try {
jsonObj = objectMapper.readValue(objectMapper.writeValueAsString(input), Object.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
return;
}
Map<String, Object> replaced = parametersUtils.replace(inputParams, jsonObj);
assertNotNull(replaced);
assertEquals(generatedId.get(), replaced.get("k1"));
assertEquals("conductor", replaced.get("k2"));
assertNull(replaced.get("k3"));
}
}, executorService).get();

executorService.shutdown();
}
}

0 comments on commit 798b1bc

Please sign in to comment.