From 0cb835585006334a6d3f31c7acfbd926543a08d4 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Tue, 4 Apr 2017 15:33:21 -0700 Subject: [PATCH 1/3] DataflowPipelineJob: handle concurrent cancel and finish This makes job.cancel() not throw an exception if cancel() is called while job is finished. (Note that state.isTerminal() is not guaranteed to be up to date.) --- .../apache/beam/runners/dataflow/DataflowPipelineJob.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 5ad6f9f9131c..aea9ed56ac8a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -369,7 +369,10 @@ public State call() throws Exception { } catch (IOException e) { State state = getState(); if (state.isTerminal()) { - LOG.warn("Job is already terminated. State is {}", state); + LOG.warn("Cancel failed because job is already terminated. State is {}", state); + return state; + } else if (e.getMessage().contains("has terminated")) { + LOG.warn("Cancel failed because job is already terminated.", e); return state; } else { String errorMsg = String.format( From cbdf855da104d668bd3fa39c447b4f421bb1db13 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 5 Apr 2017 10:02:03 -0700 Subject: [PATCH 2/3] Fixup Includes dropping useless verification in failing tests. General pattern: thrown.expect... should be the next-to-last line in any test. (The failing line should be the immediate next line.) --- .../runners/dataflow/DataflowPipelineJob.java | 5 +++ .../dataflow/DataflowPipelineJobTest.java | 31 +++++++++++++------ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index aea9ed56ac8a..96f6f586705a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -372,6 +372,11 @@ public State call() throws Exception { LOG.warn("Cancel failed because job is already terminated. State is {}", state); return state; } else if (e.getMessage().contains("has terminated")) { + // Example message: + // Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform + // operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has + // terminated in state SUCCESS: Workflow job: 2017-04-01_22_50_59-9269855660514862348 + // succeeded. LOG.warn("Cancel failed because job is already terminated.", e); return state; } else { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 108baddc910d..43eb7698ae9b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -618,7 +618,6 @@ public void testGetAggregatorValuesWithUnusedAggregatorThrowsException() thrown.expect(IllegalArgumentException.class); thrown.expectMessage("not used in this pipeline"); - job.getAggregatorValues(aggregator); } @@ -656,7 +655,6 @@ public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetr thrown.expectCause(is(cause)); thrown.expectMessage(aggregator.toString()); thrown.expectMessage("when retrieving Aggregator values for"); - job.getAggregatorValues(aggregator); } @@ -750,7 +748,7 @@ public void testCancelUnterminatedJobThatFails() throws IOException { Dataflow.Projects.Locations.Jobs.Update.class); when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class))) .thenReturn(update); - when(update.execute()).thenThrow(new IOException()); + when(update.execute()).thenThrow(new IOException("Some random IOException")); DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); @@ -758,13 +756,28 @@ public void testCancelUnterminatedJobThatFails() throws IOException { thrown.expectMessage("Failed to cancel job in state RUNNING, " + "please go to the Developers Console to cancel it manually:"); job.cancel(); + } - Job content = new Job(); - content.setProjectId(PROJECT_ID); - content.setId(JOB_ID); - content.setRequestedState("JOB_STATE_CANCELLED"); - verify(mockJobs).update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), eq(content)); - verify(mockJobs).get(PROJECT_ID, REGION_ID, JOB_ID); + @Test + public void testCancelTerminatedJobWithStaleState() throws IOException { + Dataflow.Projects.Locations.Jobs.Get statusRequest = + mock(Dataflow.Projects.Locations.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_RUNNING"); + when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + Dataflow.Projects.Locations.Jobs.Update update = mock( + Dataflow.Projects.Locations.Jobs.Update.class); + when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class))) + .thenReturn(update); + when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS")); + + DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); + State returned = job.cancel(); + assertThat(returned, equalTo(State.RUNNING)); + expectedLogs.verifyWarn("Cancel failed because job is already terminated."); } @Test From 29d646154309533dd50379bcaca1fce0d08b9aae Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 5 Apr 2017 15:26:13 -0700 Subject: [PATCH 3/3] improve comments --- .../apache/beam/runners/dataflow/DataflowPipelineJob.java | 4 ++++ .../beam/runners/dataflow/DataflowPipelineJobTest.java | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 96f6f586705a..7cb0f0ed203c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -372,6 +372,10 @@ public State call() throws Exception { LOG.warn("Cancel failed because job is already terminated. State is {}", state); return state; } else if (e.getMessage().contains("has terminated")) { + // This handles the case where the getState() call above returns RUNNING but the cancel + // was rejected because the job is in fact done. Hopefully, someday we can delete this + // code if there is better consistency between the State and whether Cancel succeeds. + // // Example message: // Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform // operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 43eb7698ae9b..e3d2e4ecf201 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -758,6 +758,12 @@ public void testCancelUnterminatedJobThatFails() throws IOException { job.cancel(); } + /** + * Test that {@link DataflowPipelineJob#cancel} doesn't throw if the Dataflow service returns + * non-terminal state even though the cancel API call failed, which can happen in practice. + * + *

TODO: delete this code if the API calls become consistent. + */ @Test public void testCancelTerminatedJobWithStaleState() throws IOException { Dataflow.Projects.Locations.Jobs.Get statusRequest =