From 1cb04a638781943af15975590c4cabb6995fafd7 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Fri, 7 Apr 2017 15:35:25 -0700 Subject: [PATCH] DataflowPipelineJob: gracefully handle cancellatoin concurrent with termination This is a backport of BEAM-1880 https://github.com/apache/beam/pull/2428 --- .../sdk/runners/DataflowPipelineJob.java | 32 ++++++++++++++++-- .../sdk/runners/DataflowPipelineJobTest.java | 33 +++++++++++++++++++ 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java index 4a68755565..ced2759561 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java @@ -292,9 +292,35 @@ public void cancel() throws IOException { content.setProjectId(projectId); content.setId(jobId); content.setRequestedState("JOB_STATE_CANCELLED"); - dataflowClient.projects().jobs() - .update(projectId, jobId, content) - .execute(); + try { + dataflowClient.projects().jobs() + .update(projectId, jobId, content) + .execute(); + } catch (IOException e) { + State state = getState(); + if (state.isTerminal()) { + LOG.warn("Cancel failed because job {} is already terminated in state {}.", jobId, state); + } else if (e.getMessage().contains("has terminated")) { + // This handles the case where the getState() call above returns RUNNING but the cancel + // was rejected because the job is in fact done. Hopefully, someday we can delete this + // code if there is better consistency between the State and whether Cancel succeeds. + // + // Example message: + // Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform + // operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has + // terminated in state SUCCESS: Workflow job: 2017-04-01_22_50_59-9269855660514862348 + // succeeded. + LOG.warn("Cancel failed because job {} is already terminated.", jobId, e); + } else { + String errorMsg = String.format( + "Failed to cancel job in state %s, " + + "please go to the Developers Console to cancel it manually: %s", + state, + MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId())); + LOG.warn(errorMsg); + throw new IOException(errorMsg, e); + } + } } @Override diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJobTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJobTest.java index 1d6ccc66ab..9d1172bb7b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJobTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJobTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -42,6 +43,7 @@ import com.google.api.services.dataflow.model.MetricUpdate; import com.google.cloud.dataflow.sdk.PipelineResult.State; import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms; +import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper; import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -91,6 +93,9 @@ public class DataflowPipelineJobTest { @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule + public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowPipelineJob.class); + @Before public void setup() { MockitoAnnotations.initMocks(this); @@ -193,6 +198,34 @@ public void testWaitToFinishCancelled() throws Exception { assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED)); } + /** + * Test that {@link DataflowPipelineJob#cancel} doesn't throw if the Dataflow service returns + * non-terminal state even though the cancel API call failed, which can happen in practice. + * + *

TODO: delete this code if the API calls become consistent. + */ + @Test + public void testCancelTerminatedJobWithStaleState() throws IOException { + Dataflow.Projects.Jobs.Get statusRequest = + mock(Dataflow.Projects.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_RUNNING"); + when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + Dataflow.Projects.Jobs.Update update = mock( + Dataflow.Projects.Jobs.Update.class); + when(mockJobs.update(eq(PROJECT_ID), eq(JOB_ID), any(Job.class))) + .thenReturn(update); + when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS")); + + DataflowPipelineJob job = new DataflowPipelineJob( + PROJECT_ID, JOB_ID, mockWorkflowClient, null); + job.cancel(); + expectedLogs.verifyWarn("Cancel failed because job " + JOB_ID + " is already terminated."); + } + /** * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED} * state is terminal.