Skip to content

Commit

Permalink
Merge 52a1153 into 85c949a
Browse files Browse the repository at this point in the history
  • Loading branch information
adriana-corui committed May 22, 2020
2 parents 85c949a + 52a1153 commit 561599a
Show file tree
Hide file tree
Showing 15 changed files with 149 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,12 @@ public ExecutionStep createFinishExternalFlowStep(Long index,
boolean parallelLoop) {
throw new UnsupportedOperationException(EXTERNAL_STEPS_NOT_SUPPORTED);
}

@Override
public ExecutionStep createWorkerGroupExternalFlowStep(Long index,
Map<String, Serializable> preStepData,
String stepName,
String workerGroup) {
throw new UnsupportedOperationException(EXTERNAL_STEPS_NOT_SUPPORTED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class ExecutionPlanBuilder {
private ExternalExecutionStepFactory externalStepFactory;

private static final String CLOUDSLANG_NAME = "CloudSlang";
private static final int NUMBER_OF_STEP_EXECUTION_STEPS = 2;
private static final int NUMBER_OF_PARALLEL_LOOP_EXECUTION_STEPS = 2;
private static final int NUMBER_OF_STEP_EXECUTION_STEPS = 3; // setWorkerGroupStep + beginStep + endStep
private static final int NUMBER_OF_PARALLEL_LOOP_EXECUTION_STEPS = 3; // setWorkerGroupStep + beginStep + endStep
private static final long FLOW_END_STEP_ID = 0L;
private static final long FLOW_PRECONDITION_STEP_ID = 1L;
private static final long FLOW_START_STEP_ID = 2L;
Expand Down Expand Up @@ -153,6 +153,9 @@ private List<ExecutionStep> buildStepExecutionSteps(
);
}

ExecutionStep workerStep = createWorkerGroupStep(currentId++, step, inheritWorkerGroupFromFlow(
step, compiledFlow));
stepExecutionSteps.add(workerStep);
ExecutionStep executionStep = createBeginStep(currentId++, step, inheritWorkerGroupFromFlow(
step, compiledFlow));
stepExecutionSteps.add(executionStep);
Expand Down Expand Up @@ -222,7 +225,8 @@ private long getCurrentId(Map<String, Long> stepReferences, Deque<Step> steps) {
currentId = max + NUMBER_OF_STEP_EXECUTION_STEPS;
} else {
//async step
currentId = max + NUMBER_OF_STEP_EXECUTION_STEPS + NUMBER_OF_PARALLEL_LOOP_EXECUTION_STEPS;
//the -1 is needed because setWorkerGroupStep is taken into consideration for both parallel and normal steps
currentId = max + NUMBER_OF_STEP_EXECUTION_STEPS + NUMBER_OF_PARALLEL_LOOP_EXECUTION_STEPS - 1;
}

return currentId;
Expand All @@ -247,6 +251,14 @@ private ExecutionStep createBeginStep(Long id, Step step, String workerGroup) {
step.getPreStepActionData(), step.getRefId(), step.getName(), workerGroup);
}

private ExecutionStep createWorkerGroupStep(Long id, Step step, String workerGroup) {
if (step instanceof ExternalStep) {
return externalStepFactory.createWorkerGroupExternalFlowStep(id, step.getPreStepActionData(),
step.getName(), workerGroup);
}
return stepFactory.createWorkerGroupStep(id, step.getPreStepActionData(), step.getName(), workerGroup);
}

public void setStepFactory(ExecutionStepFactory stepFactory) {
this.stepFactory = stepFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ public ExecutionStep createBeginStepStep(Long index, List<Argument> stepInputs,
return createGeneralStep(index, STEP_EXECUTION_DATA_CLASS, "beginStep", actionData);
}

public ExecutionStep createWorkerGroupStep(Long index, Map<String, Serializable> preStepData, String stepName,
String workerGroup) {
Validate.notNull(preStepData, "preStepData is null");
Map<String, Serializable> actionData = new HashMap<>();
actionData.put(ScoreLangConstants.HOOKS, "TBD");
actionData.put(ScoreLangConstants.NODE_NAME_KEY, stepName);
actionData.put(ScoreLangConstants.NEXT_STEP_ID_KEY, index + 1);

if (workerGroup != null) {
actionData.put(ScoreLangConstants.WORKER_GROUP, preStepData.get(SlangTextualKeys.WORKER_GROUP));
}
return createGeneralStep(index, STEP_EXECUTION_DATA_CLASS, "setWorkerGroupStep", actionData);
}

public ExecutionStep createFinishStepStep(Long index, Map<String, Serializable> postStepData,
Map<String, ResultNavigation> navigationValues,
String stepName, String workerGroup, boolean parallelLoop) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ ExecutionStep createBeginExternalFlowStep(Long index, List<Argument> stepInputs,
ExecutionStep createFinishExternalFlowStep(Long index, Map<String, Serializable> postStepData,
Map<String, ResultNavigation> navigationValues,
String stepName, String workerGroup, boolean parallelLoop);

ExecutionStep createWorkerGroupExternalFlowStep(Long index, Map<String, Serializable> preStepData,
String stepName, String workerGroup);
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testCompileFlowBasic() throws Exception {
CompilationArtifact compilationArtifact = compiler.compile(SlangSource.fromFile(flow), path);
ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
Assert.assertNotNull("execution plan is null", executionPlan);
assertEquals("there is a different number of steps than expected", 5, executionPlan.getSteps().size());
assertEquals("there is a different number of steps than expected", 6, executionPlan.getSteps().size());
assertEquals("execution plan name is different than expected", "basic_flow", executionPlan.getName());
assertEquals("the dependencies size is not as expected", 1, compilationArtifact.getDependencies().size());
assertEquals("the inputs size is not as expected", 3, compilationArtifact.getInputs().size());
Expand All @@ -75,7 +75,7 @@ public void testCompileFlowWithData() throws Exception {
CompilationArtifact compilationArtifact = compiler.compile(SlangSource.fromFile(flow), path);
ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
Assert.assertNotNull("execution plan is null", executionPlan);
assertEquals("there is a different number of steps than expected", 5, executionPlan.getSteps().size());
assertEquals("there is a different number of steps than expected", 6, executionPlan.getSteps().size());
assertEquals("execution plan name is different than expected", "flow_with_data", executionPlan.getName());
assertEquals("the dependencies size is not as expected", 1, compilationArtifact.getDependencies().size());

Expand All @@ -85,7 +85,7 @@ public void testCompileFlowWithData() throws Exception {
Assert.assertNotNull("inputs doesn't exist", inputs);
assertEquals("there is a different number of inputs than expected", 1, inputs.size());

ExecutionStep beginStepExecutionStep = executionPlan.getStep(3L);
ExecutionStep beginStepExecutionStep = executionPlan.getStep(4L);
@SuppressWarnings("unchecked")
List<Argument> stepInputs = (List<Argument>) beginStepExecutionStep.getActionData()
.get(ScoreLangConstants.STEP_INPUTS_KEY);
Expand All @@ -95,7 +95,7 @@ public void testCompileFlowWithData() throws Exception {
assertEquals("country", stepInputs.get(1).getName());
assertEquals("CheckWeather", beginStepExecutionStep.getActionData().get(ScoreLangConstants.NODE_NAME_KEY));

ExecutionStep endStepExecutionStep = executionPlan.getStep(4L);
ExecutionStep endStepExecutionStep = executionPlan.getStep(5L);
@SuppressWarnings("unchecked")
List<Output> publish = (List<Output>) endStepExecutionStep.getActionData()
.get(ScoreLangConstants.STEP_PUBLISH_KEY);
Expand Down Expand Up @@ -155,9 +155,9 @@ public void testCompileFlowNavigateDuplicateKeysFirstIsTaken() throws Exception
CompilationArtifact compilationArtifact = compiler.compile(SlangSource.fromFile(flow), path);
ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
Assert.assertNotNull("execution plan is null", executionPlan);
assertEquals("there is a different number of steps than expected", 5, executionPlan.getSteps().size());
assertEquals("there is a different number of steps than expected", 6, executionPlan.getSteps().size());

ExecutionStep endStepExecutionStep = executionPlan.getStep(4L);
ExecutionStep endStepExecutionStep = executionPlan.getStep(5L);
@SuppressWarnings("unchecked") Map<String, ResultNavigation> actualNavigationValues =
(Map<String, ResultNavigation>) endStepExecutionStep.getActionData()
.get(ScoreLangConstants.STEP_NAVIGATION_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void subFlowRefId() throws Exception {
Assert.assertNotNull(executionPlan);
assertEquals("different number of dependencies than expected", 2,
compilationArtifact.getDependencies().size());
ExecutionStep secondStepStartExecutionStep = executionPlan.getStep(5L);
ExecutionStep secondStepStartExecutionStep = executionPlan.getStep(7L);
String refId = (String) secondStepStartExecutionStep.getActionData().get(ScoreLangConstants.REF_ID);
assertEquals("refId is not as expected", "user.flows.circular.child_flow", refId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public void testBasicAlias() throws Exception {

ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
Assert.assertNotNull("execution plan is null", executionPlan);
Assert.assertEquals("there is a different number of steps than expected", 5, executionPlan.getSteps().size());
Assert.assertEquals("there is a different number of steps than expected", 6, executionPlan.getSteps().size());

ExecutionStep beginStepExecutionStep = executionPlan.getStep(3L);
ExecutionStep beginStepExecutionStep = executionPlan.getStep(4L);
@SuppressWarnings("unchecked")
String referenceId = (String) beginStepExecutionStep.getActionData().get(ScoreLangConstants.REF_ID);

Expand All @@ -77,9 +77,9 @@ public void testDefaultNamespace() throws Exception {

ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
Assert.assertNotNull("execution plan is null", executionPlan);
Assert.assertEquals("there is a different number of steps than expected", 5, executionPlan.getSteps().size());
Assert.assertEquals("there is a different number of steps than expected", 6, executionPlan.getSteps().size());

ExecutionStep beginStepExecutionStep = executionPlan.getStep(3L);
ExecutionStep beginStepExecutionStep = executionPlan.getStep(4L);
@SuppressWarnings("unchecked")
String referenceId = (String) beginStepExecutionStep.getActionData().get(ScoreLangConstants.REF_ID);

Expand All @@ -98,9 +98,9 @@ public void testShortFullPathNoExpanding() throws Exception {

ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
Assert.assertNotNull("execution plan is null", executionPlan);
Assert.assertEquals("there is a different number of steps than expected", 5, executionPlan.getSteps().size());
Assert.assertEquals("there is a different number of steps than expected", 6, executionPlan.getSteps().size());

ExecutionStep beginStepExecutionStep = executionPlan.getStep(3L);
ExecutionStep beginStepExecutionStep = executionPlan.getStep(4L);
@SuppressWarnings("unchecked")
String referenceId = (String) beginStepExecutionStep.getActionData().get(ScoreLangConstants.REF_ID);

Expand All @@ -119,9 +119,9 @@ public void testLongFullPathNoExpanding() throws Exception {

ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
Assert.assertNotNull("execution plan is null", executionPlan);
Assert.assertEquals("there is a different number of steps than expected", 5, executionPlan.getSteps().size());
Assert.assertEquals("there is a different number of steps than expected", 6, executionPlan.getSteps().size());

ExecutionStep beginStepExecutionStep = executionPlan.getStep(3L);
ExecutionStep beginStepExecutionStep = executionPlan.getStep(4L);
@SuppressWarnings("unchecked")
String referenceId = (String) beginStepExecutionStep.getActionData().get(ScoreLangConstants.REF_ID);

Expand All @@ -140,9 +140,9 @@ public void testLongFullPathWithExpanding() throws Exception {

ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
Assert.assertNotNull("execution plan is null", executionPlan);
Assert.assertEquals("there is a different number of steps than expected", 5, executionPlan.getSteps().size());
Assert.assertEquals("there is a different number of steps than expected", 6, executionPlan.getSteps().size());

ExecutionStep beginStepExecutionStep = executionPlan.getStep(3L);
ExecutionStep beginStepExecutionStep = executionPlan.getStep(4L);
@SuppressWarnings("unchecked")
String referenceId = (String) beginStepExecutionStep.getActionData().get(ScoreLangConstants.REF_ID);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testCompileFlowBasic() throws Exception {
CompilationArtifact compilationArtifact = compiler.compile(SlangSource.fromFile(flow), path);
ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
Assert.assertNotNull("execution plan is null", executionPlan);
assertEquals("there is a different number of steps than expected", 11, executionPlan.getSteps().size());
assertEquals("there is a different number of steps than expected", 15, executionPlan.getSteps().size());
assertEquals("execution plan name is different than expected",
"flow_with_multiple_steps", executionPlan.getName());
assertEquals("the dependencies size is not as expected", 3, compilationArtifact.getDependencies().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ public void testCompileOnFailureBasic() throws Exception {
CompilationArtifact compilationArtifact = compiler.compile(SlangSource.fromFile(flow), path);
ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
assertNotNull("execution plan is null", executionPlan);
assertEquals("there is a different number of steps than expected", 9, executionPlan.getSteps().size());
assertEquals("there is a different number of steps than expected", 12, executionPlan.getSteps().size());
assertEquals("execution plan name is different than expected", "flow_with_on_failure", executionPlan.getName());
assertEquals("the dependencies size is not as expected", 1, compilationArtifact.getDependencies().size());
assertEquals("the inputs size is not as expected", 1, compilationArtifact.getInputs().size());

long firstOnFailureStep = 7L;
long firstOnFailureStep = 9L;
long endFlowStep = 0L;

ExecutionStep firstStep = executionPlan.getStep(4L);
ExecutionStep firstStep = executionPlan.getStep(5L);
assertEquals("first step didn't navigate to on failure",
firstOnFailureStep, getFailureNavigationStepId(firstStep));
ExecutionStep secondStep = executionPlan.getStep(6L);
ExecutionStep secondStep = executionPlan.getStep(8L);
assertEquals(endFlowStep, getFailureNavigationStepId(secondStep));
ExecutionStep firstOnFailStep = executionPlan.getStep(8L);
ExecutionStep firstOnFailStep = executionPlan.getStep(11L);
assertEquals(endFlowStep, getFailureNavigationStepId(firstOnFailStep));
}

Expand Down Expand Up @@ -136,18 +136,18 @@ public void testCompileNavigationWithRoi() throws Exception {
CompilationArtifact compilationArtifact = compiler.compile(SlangSource.fromFile(flow), path);
ExecutionPlan executionPlan = compilationArtifact.getExecutionPlan();
assertNotNull("execution plan is null", executionPlan);
assertEquals("there is a different number of steps than expected", 7, executionPlan.getSteps().size());
assertEquals("there is a different number of steps than expected", 9, executionPlan.getSteps().size());
assertEquals("execution plan name is different than expected", "flow_with_roi", executionPlan.getName());
assertEquals("the dependencies size is not as expected", 1, compilationArtifact.getDependencies().size());

final long step1 = 1;
final long step3 = 3;
final long step4 = 4;

ExecutionStep firstStep = executionPlan.getStep(step1);
assertTrue("navigation data is not empty", firstStep.getNavigationData() == null ||
firstStep.getNavigationData().isEmpty());

ExecutionStep thirdStep = executionPlan.getStep(step3);
ExecutionStep thirdStep = executionPlan.getStep(step4);
assertTrue("navigation data is empty", thirdStep.getNavigationData() != null &&
!thirdStep.getNavigationData().isEmpty());
List<NavigationOptions> optionsList = (List<NavigationOptions>) thirdStep.getNavigationData()
Expand Down
Loading

0 comments on commit 561599a

Please sign in to comment.