Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,17 @@ public List<FlowStatus> getFlowStatusesAcrossGroup(String flowGroup, int countPe
* @param flowGroup
* @return true, if any jobs of the flow are RUNNING.
*/
public boolean isFlowRunning(String flowName, String flowGroup) {
public boolean isFlowRunning(String flowName, String flowGroup, long flowExecutionId) {
List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1, null);
if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
} else {
FlowStatus flowStatus = flowStatusList.get(0);
ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();
// If the latest flow status is the current job about to get kicked off, we should ignore this check
if (flowStatus.getFlowExecutionId() == flowExecutionId) {
return false;
}
return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,61 @@
public class FlowStatusGeneratorTest {

@Test
public void testIsFlowRunning() {
public void testIsFlowRunningFirstExecution() {
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
long currFlowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);

FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, currFlowExecutionId));
}

//If a flow is COMPILED, isFlowRunning() should return true.
@Test
public void testIsFlowRunningCompiledPastExecution() {
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// Block the next execution if the prior one is in compiled as it's considered still running
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId + 1));
}

@Test
public void skipFlowConcurrentCheckSameFlowExecutionId() {
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// If the flow is compiled but the flow execution status is the same as the one about to be kicked off, do not consider it as running.
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
}

@Test
public void testIsFlowRunningJobExecutionIgnored() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
Lists.newArrayList(flowExecutionId));
//JobStatuses should be ignored, only the flow level status matters.
String job1 = "job1";
String job2 = "job2";
Expand All @@ -69,15 +105,17 @@ public void testIsFlowRunning() {
.jobName(job3).eventName("CANCELLED").build();
JobStatus flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);

when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));

flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId+1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
Optional<Dag<JobExecutionPlan>> compiledDagOptional =
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
flowName);
flowName, flowMetadata);

if (!compiledDagOptional.isPresent()) {
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS
specCompiler.awaitHealthy();

TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName);
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName, flowMetadata);

if (!jobExecutionPlanDagOptional.isPresent()) {
return Optional.absent();
Expand All @@ -89,7 +89,6 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS
return Optional.absent();
}

addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());
flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
}
Expand All @@ -101,13 +100,18 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS
* @throws IOException
*/
public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
String flowGroup, String flowName) throws IOException {
String flowGroup, String flowName, Map<String,String> flowMetadata) throws IOException {
boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, isFlowConcurrencyEnabled);

Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(flowSpec);
if (jobExecutionPlanDag.isEmpty()) {
return Optional.absent();
}
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);

if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution)) {
if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution,
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))) {
return Optional.fromNullable(jobExecutionPlanDag);
} else {
log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
Expand All @@ -121,9 +125,7 @@ public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Conf
quotaManager.releaseQuota(dagNode);
}
}

// Send FLOW_FAILED event
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour.");
new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
Expand All @@ -140,8 +142,8 @@ public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Conf
* @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING.
*/
private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup,
boolean allowConcurrentExecution) {
return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
boolean allowConcurrentExecution, long flowExecutionId) {
return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
}

/**
Expand Down