Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,35 @@ public void cancel() throws IOException {
content.setProjectId(projectId);
content.setId(jobId);
content.setRequestedState("JOB_STATE_CANCELLED");
dataflowClient.projects().jobs()
.update(projectId, jobId, content)
.execute();
try {
dataflowClient.projects().jobs()
.update(projectId, jobId, content)
.execute();
} catch (IOException e) {
State state = getState();
if (state.isTerminal()) {
LOG.warn("Cancel failed because job {} is already terminated in state {}.", jobId, state);
} else if (e.getMessage().contains("has terminated")) {
// This handles the case where the getState() call above returns RUNNING but the cancel
// was rejected because the job is in fact done. Hopefully, someday we can delete this
// code if there is better consistency between the State and whether Cancel succeeds.
//
// Example message:
// Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform
// operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has
// terminated in state SUCCESS: Workflow job: 2017-04-01_22_50_59-9269855660514862348
// succeeded.
LOG.warn("Cancel failed because job {} is already terminated.", jobId, e);
} else {
String errorMsg = String.format(
"Failed to cancel job in state %s, "
+ "please go to the Developers Console to cancel it manually: %s",
state,
MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
LOG.warn(errorMsg);
throw new IOException(errorMsg, e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -42,6 +43,7 @@
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.cloud.dataflow.sdk.PipelineResult.State;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
Expand Down Expand Up @@ -91,6 +93,9 @@ public class DataflowPipelineJobTest {
@Rule
public ExpectedException thrown = ExpectedException.none();

@Rule
public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowPipelineJob.class);

@Before
public void setup() {
MockitoAnnotations.initMocks(this);
Expand Down Expand Up @@ -193,6 +198,34 @@ public void testWaitToFinishCancelled() throws Exception {
assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED));
}

/**
* Test that {@link DataflowPipelineJob#cancel} doesn't throw if the Dataflow service returns
* non-terminal state even though the cancel API call failed, which can happen in practice.
*
* <p>TODO: delete this code if the API calls become consistent.
*/
@Test
public void testCancelTerminatedJobWithStaleState() throws IOException {
Dataflow.Projects.Jobs.Get statusRequest =
mock(Dataflow.Projects.Jobs.Get.class);

Job statusResponse = new Job();
statusResponse.setCurrentState("JOB_STATE_RUNNING");
when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(statusRequest);
when(statusRequest.execute()).thenReturn(statusResponse);

Dataflow.Projects.Jobs.Update update = mock(
Dataflow.Projects.Jobs.Update.class);
when(mockJobs.update(eq(PROJECT_ID), eq(JOB_ID), any(Job.class)))
.thenReturn(update);
when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS"));

DataflowPipelineJob job = new DataflowPipelineJob(
PROJECT_ID, JOB_ID, mockWorkflowClient, null);
job.cancel();
expectedLogs.verifyWarn("Cancel failed because job " + JOB_ID + " is already terminated.");
}

/**
* Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED}
* state is terminal.
Expand Down