From ed8bd62652126d8d0cf054cee5cc79dda88e3415 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 26 Apr 2017 17:55:29 -0700 Subject: [PATCH] Do not rely on metrics in streaming TestDataflowRunner The Dataflow service automatically shuts down jobs when their input watermarks reach infinity, and other functionality can be restored when the feature is restored to the Dataflow service. --- .../dataflow/testing/TestDataflowRunner.java | 181 ++++----- .../testing/TestDataflowRunnerTest.java | 358 ++++++------------ 2 files changed, 195 insertions(+), 344 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index dc32466ceb5e..ba9d971646b3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -26,13 +26,9 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Strings; -import com.google.common.base.Throwables; import java.io.IOException; import java.math.BigDecimal; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowClient; import org.apache.beam.runners.dataflow.DataflowPipelineJob; @@ -59,13 +55,6 @@ */ public class TestDataflowRunner extends PipelineRunner { private static final String TENTATIVE_COUNTER = "tentative"; - // See https://issues.apache.org/jira/browse/BEAM-1170 - // we need to either fix the API or pipe the DRAINED signal through - @VisibleForTesting - static final String LEGACY_WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; - @VisibleForTesting - static final String WATERMARK_METRIC_SUFFIX = "DataWatermark"; - private static final long MAX_WATERMARK_VALUE = -2L; private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); private final TestDataflowPipelineOptions options; @@ -73,9 +62,9 @@ public class TestDataflowRunner extends PipelineRunner { private final DataflowRunner runner; private int expectedNumberOfAssertions = 0; - TestDataflowRunner(TestDataflowPipelineOptions options) { + TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) { this.options = options; - this.dataflowClient = DataflowClient.create(options); + this.dataflowClient = client; this.runner = DataflowRunner.fromOptions(options); } @@ -91,7 +80,14 @@ public static TestDataflowRunner fromOptions(PipelineOptions options) { "results"); dataflowOptions.setTempLocation(tempLocation); - return new TestDataflowRunner(dataflowOptions); + return new TestDataflowRunner( + dataflowOptions, DataflowClient.create(options.as(DataflowPipelineOptions.class))); + } + + @VisibleForTesting + static TestDataflowRunner fromOptionsAndClient( + TestDataflowPipelineOptions options, DataflowClient client) { + return new TestDataflowRunner(options, client); } @Override @@ -115,40 +111,34 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler()); try { - final Optional success; + Optional result = Optional.absent(); if (options.isStreaming()) { - Future> resultFuture = options.getExecutorService().submit( - new Callable>() { - @Override - public Optional call() throws Exception { - try { - for (;;) { - JobMetrics metrics = getJobMetrics(job); - Optional success = checkForPAssertSuccess(job, metrics); - if (messageHandler.hasSeenError()) { - return Optional.of(false); - } - - if (success.isPresent() && (!success.get() || atMaxWatermark(job, metrics))) { - // It's possible that the streaming pipeline doesn't use PAssert. - // So checkForSuccess() will return true before job is finished. - // atMaxWatermark() will handle this case. - return success; - } - Thread.sleep(10000L); - } - } finally { - if (!job.getState().isTerminal()) { - LOG.info("Cancelling Dataflow job {}", job.getJobId()); - job.cancel(); - } + // In streaming, there are infinite retries, so rather than timeout + // we try to terminate early by polling and canceling if we see + // an error message + while (true) { + State state = job.waitUntilFinish(Duration.standardSeconds(3), messageHandler); + if (state != null && state.isTerminal()) { + break; + } + + if (messageHandler.hasSeenError()) { + if (!job.getState().isTerminal()) { + LOG.info("Cancelling Dataflow job {}", job.getJobId()); + job.cancel(); } + break; } - }); + } + + // Whether we canceled or not, this gets the final state of the job or times out State finalState = job.waitUntilFinish( Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler); + + // Getting the final state timed out; it may not indicate a failure. + // This cancellation may be the second if (finalState == null || finalState == State.RUNNING) { LOG.info( "Dataflow job {} took longer than {} seconds to complete, cancelling.", @@ -156,15 +146,28 @@ public Optional call() throws Exception { options.getTestTimeoutSeconds()); job.cancel(); } - success = resultFuture.get(); + + if (messageHandler.hasSeenError()) { + result = Optional.of(false); + } } else { job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler); - success = checkForPAssertSuccess(job, getJobMetrics(job)); + result = checkForPAssertSuccess(job); } - if (!success.isPresent()) { - throw new IllegalStateException( - "The dataflow did not output a success or failure metric."); - } else if (!success.get()) { + + if (!result.isPresent()) { + if (options.isStreaming()) { + LOG.warn( + "Dataflow job {} did not output a success or failure metric." + + " In rare situations, some PAsserts may not have run." + + " This is a known limitation of Dataflow in streaming.", + job.getJobId()); + } else { + throw new IllegalStateException( + String.format( + "Dataflow job %s did not output a success or failure metric.", job.getJobId())); + } + } else if (!result.get()) { throw new AssertionError( Strings.isNullOrEmpty(messageHandler.getErrorMessage()) ? String.format( @@ -177,9 +180,6 @@ public Optional call() throws Exception { } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); - } catch (ExecutionException e) { - Throwables.propagateIfPossible(e.getCause()); - throw new RuntimeException(e.getCause()); } catch (IOException e) { throw new RuntimeException(e); } @@ -200,22 +200,24 @@ void updatePAssertCount(Pipeline pipeline) { /** * Check that PAssert expectations were met. * - *

If the pipeline is not in a failed/cancelled state and no PAsserts were used - * within the pipeline, then this method will state that all PAsserts succeeded. + *

If the pipeline is not in a failed/cancelled state and no PAsserts were used within the + * pipeline, then this method will state that all PAsserts succeeded. * - * @return Optional.of(false) if the job failed, was cancelled or if any PAssert - * expectation was not met, true if all the PAssert expectations passed, - * Optional.absent() if the metrics were inconclusive. + * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed, + * Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the + * evidence is inconclusive. */ @VisibleForTesting - Optional checkForPAssertSuccess(DataflowPipelineJob job, @Nullable JobMetrics metrics) - throws IOException { + Optional checkForPAssertSuccess(DataflowPipelineJob job) throws IOException { + + // If the job failed, this is a definite failure. We only cancel jobs when they fail. State state = job.getState(); if (state == State.FAILED || state == State.CANCELLED) { - LOG.info("The pipeline failed"); + LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state); return Optional.of(false); } + JobMetrics metrics = getJobMetrics(job); if (metrics == null || metrics.getMetrics() == null) { LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); return Optional.absent(); @@ -238,66 +240,31 @@ Optional checkForPAssertSuccess(DataflowPipelineJob job, @Nullable JobM } if (failures > 0) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " + LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of " + "{} expected assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); return Optional.of(false); } else if (successes >= expectedNumberOfAssertions) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " - + "{} expected assertions.", job.getJobId(), successes, failures, + LOG.info( + "Success result for Dataflow job {}." + + " Found {} success, {} failures out of {} expected assertions.", + job.getJobId(), + successes, + failures, expectedNumberOfAssertions); return Optional.of(true); } - LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected " - + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); + LOG.info( + "Inconclusive results for Dataflow job {}." + + " Found {} success, {} failures out of {} expected assertions.", + job.getJobId(), + successes, + failures, + expectedNumberOfAssertions); return Optional.absent(); } - /** - * Checks wether a metric is a streaming watermark. - * - * @return true if the metric is a watermark. - */ - boolean isWatermark(MetricUpdate metric) { - if (metric.getName() == null || metric.getName().getName() == null) { - return false; // no name -> shouldn't happen, not the watermark - } - if (metric.getScalar() == null) { - return false; // no scalar value -> not the watermark - } - String name = metric.getName().getName(); - return name.endsWith(LEGACY_WATERMARK_METRIC_SUFFIX) - || name.endsWith(WATERMARK_METRIC_SUFFIX); - } - - /** - * Check watermarks of the streaming job. At least one watermark metric must exist. - * - * @return true if all watermarks are at max, false otherwise. - */ - @VisibleForTesting - boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) { - boolean hasMaxWatermark = false; - for (MetricUpdate metric : metrics.getMetrics()) { - if (!isWatermark(metric)) { - continue; - } - BigDecimal watermark = (BigDecimal) metric.getScalar(); - hasMaxWatermark = watermark.longValue() == MAX_WATERMARK_VALUE; - if (!hasMaxWatermark) { - LOG.info("Found a non-max watermark metric {} in job {}", metric.getName().getName(), - job.getJobId()); - return false; - } - } - - if (hasMaxWatermark) { - LOG.info("All watermarks are at max. JobID: {}", job.getJobId()); - } - return hasMaxWatermark; - } - @Nullable @VisibleForTesting JobMetrics getJobMetrics(DataflowPipelineJob job) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index e4fa788e4fbc..307393ca5589 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -17,32 +17,21 @@ */ package org.apache.beam.runners.dataflow.testing; -import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.LEGACY_WATERMARK_METRIC_SUFFIX; -import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.WATERMARK_METRIC_SUFFIX; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.api.client.http.LowLevelHttpResponse; -import com.google.api.client.json.Json; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.JobMessage; import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricStructuredName; @@ -55,6 +44,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.beam.runners.dataflow.DataflowClient; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -69,7 +59,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.NoopPathValidator; import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.util.Transport; @@ -94,21 +83,13 @@ @RunWith(JUnit4.class) public class TestDataflowRunnerTest { @Rule public ExpectedException expectedException = ExpectedException.none(); - @Mock private MockHttpTransport transport; - @Mock private MockLowLevelHttpRequest request; - @Mock private GcsUtil mockGcsUtil; - - private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2); + @Mock private DataflowClient mockClient; private TestDataflowPipelineOptions options; - private Dataflow service; @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - when(transport.buildRequest(anyString(), anyString())).thenReturn(request); - doCallRealMethod().when(request).getContentAsString(); - service = new Dataflow(transport, Transport.getJsonFactory(), null); options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); options.setAppName("TestAppName"); @@ -116,7 +97,6 @@ public void setUp() throws Exception { options.setTempLocation("gs://test/temp/location"); options.setTempRoot("gs://test"); options.setGcpCredential(new TestCredential()); - options.setDataflowClient(service); options.setRunner(TestDataflowRunner.class); options.setPathValidatorClass(NoopPathValidator.class); } @@ -124,12 +104,12 @@ public void setUp() throws Exception { @Test public void testToString() { assertEquals("TestDataflowRunner#TestAppName", - new TestDataflowRunner(options).toString()); + TestDataflowRunner.fromOptions(options).toString()); } @Test public void testRunBatchJobThatSucceeds() throws Exception { - Pipeline p = TestPipeline.create(options); + Pipeline p = Pipeline.create(options); PCollection pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -141,9 +121,9 @@ public void testRunBatchJobThatSucceeds() throws Exception { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, - true /* tentative */, null /* additionalMetrics */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); assertEquals(mockJob, runner.run(p, mockRunner)); } @@ -161,9 +141,9 @@ public void testRunBatchJobThatFails() throws Exception { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - false /* tentative */, null /* additionalMetrics */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */)); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -201,9 +181,9 @@ public State answer(InvocationOnMock invocation) { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -216,6 +196,9 @@ public State answer(InvocationOnMock invocation) { fail("AssertionError expected"); } + /** + * A streaming job that terminates with no error messages is a success. + */ @Test public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception { options.setStreaming(true); @@ -224,17 +207,18 @@ public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()) - .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); runner.run(p, mockRunner); } @@ -245,60 +229,37 @@ public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception { p.apply(Create.of(1, 2, 3)); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()) - .thenReturn(generateMockStreamingMetricResponse( - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.of())); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); runner.run(p, mockRunner); } + /** + * Tests that a streaming job with a false {@link PAssert} fails. + * + *

Currently, this failure is indistinguishable from a non-{@link PAssert} failure, because it + * is detected only by failure job messages. With fuller metric support, this can detect a PAssert + * failure via metrics and raise an {@link AssertionError} in just that case. + */ @Test public void testRunStreamingJobThatFails() throws Exception { - options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getState()).thenReturn(State.RUNNING); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - return; - } - // Note that fail throws an AssertionError which is why it is placed out here - // instead of inside the try-catch block. - fail("AssertionError expected"); + testStreamingPipelineFailsIfException(); } - private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative, - Map additionalMetrics) + private JobMetrics generateMockMetricResponse(boolean success, boolean tentative) throws Exception { - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContentType(Json.MEDIA_TYPE); List metrics = generateMockMetrics(success, tentative); - if (additionalMetrics != null && !additionalMetrics.isEmpty()) { - metrics.addAll(generateMockStreamingMetrics(additionalMetrics)); - } - JobMetrics jobMetrics = buildJobMetrics(metrics); - response.setContent(jobMetrics.toPrettyString()); - return response; + return buildJobMetrics(metrics); } private List generateMockMetrics(boolean success, boolean tentative) { @@ -313,13 +274,9 @@ private List generateMockMetrics(boolean success, boolean tentativ return Lists.newArrayList(metric); } - private LowLevelHttpResponse generateMockStreamingMetricResponse(Map metricMap) throws IOException { - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContentType(Json.MEDIA_TYPE); - JobMetrics jobMetrics = buildJobMetrics(generateMockStreamingMetrics(metricMap)); - response.setContent(jobMetrics.toPrettyString()); - return response; + return buildJobMetrics(generateMockStreamingMetrics(metricMap)); } private List generateMockStreamingMetrics(Map metricMap) { @@ -344,6 +301,10 @@ private JobMetrics buildJobMetrics(List metricList) { return jobMetrics; } + /** + * Tests that a tentative {@code true} from metrics indicates that every {@link PAssert} has + * succeeded. + */ @Test public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); @@ -351,13 +312,18 @@ public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { PCollection pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, true /* tentative */))); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); doReturn(State.DONE).when(job).getState(); - JobMetrics metrics = buildJobMetrics( - generateMockMetrics(true /* success */, true /* tentative */)); - assertEquals(Optional.of(true), runner.checkForPAssertSuccess(job, metrics)); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true))); } + /** + * Tests that when we just see a tentative failure for a {@link PAssert} it is considered a + * conclusive failure. + */ @Test public void testCheckingForSuccessWhenPAssertFails() throws Exception { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); @@ -365,11 +331,13 @@ public void testCheckingForSuccessWhenPAssertFails() throws Exception { PCollection pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn( + buildJobMetrics(generateMockMetrics(false /* success */, true /* tentative */))); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); doReturn(State.DONE).when(job).getState(); - JobMetrics metrics = buildJobMetrics( - generateMockMetrics(false /* success */, true /* tentative */)); - assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, metrics)); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false))); } @Test @@ -379,108 +347,20 @@ public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception { PCollection pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - runner.updatePAssertCount(p); - doReturn(State.RUNNING).when(job).getState(); - JobMetrics metrics = buildJobMetrics( - generateMockMetrics(true /* success */, false /* tentative */)); - assertEquals(Optional.absent(), runner.checkForPAssertSuccess(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of("no-watermark", new BigDecimal(100)))); - doReturn(State.RUNNING).when(job).getState(); - assertFalse(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithLegacyWatermarkAtMax() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); + when(mockClient.getJobMetrics(anyString())) + .thenReturn( + buildJobMetrics(generateMockMetrics(true /* success */, false /* tentative */))); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics - (ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); - doReturn(State.RUNNING).when(job).getState(); - assertFalse(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, - "two" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, - "two" + WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); - doReturn(State.RUNNING).when(job).getState(); - assertFalse(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, - "no-watermark", new BigDecimal(100)))); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + runner.updatePAssertCount(p); doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.atMaxWatermark(job, metrics)); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.absent())); } + /** + * Tests that if a streaming pipeline terminates with FAIL that the check for PAssert + * success is a conclusive failure. + */ @Test public void testStreamingPipelineFailsIfServiceFails() throws Exception { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); @@ -488,16 +368,23 @@ public void testStreamingPipelineFailsIfServiceFails() throws Exception { PCollection pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); doReturn(State.FAILED).when(job).getState(); - assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, null /* metrics */)); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false))); } + /** + * Tests that if a streaming pipeline crash loops for a non-assertion reason that the test run + * throws an {@link AssertionError}. + * + *

This is a known limitation/bug of the runner that it does not distinguish the two modes of + * failure. + */ @Test public void testStreamingPipelineFailsIfException() throws Exception { options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection pc = p.apply(Create.of(1, 2, 3)); + Pipeline pipeline = TestPipeline.create(options); + PCollection pc = pipeline.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); @@ -521,18 +408,15 @@ public State answer(InvocationOnMock invocation) { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - assertThat(expected.getMessage(), containsString("FooException")); - verify(mockJob, atLeastOnce()).cancel(); + runner.run(pipeline, mockRunner); + } catch (AssertionError exc) { return; } - // Note that fail throws an AssertionError which is why it is placed out here - // instead of inside the try-catch block. fail("AssertionError expected"); } @@ -542,9 +426,9 @@ public void testGetJobMetricsThatSucceeds() throws Exception { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, - true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); JobMetrics metrics = runner.getJobMetrics(job); assertEquals(1, metrics.getMetrics().size()); @@ -558,8 +442,8 @@ public void testGetJobMetricsThatFailsForException() throws Exception { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - when(request.execute()).thenThrow(new IOException()); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException()); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); assertNull(runner.getJobMetrics(job)); } @@ -577,12 +461,12 @@ public void testBatchOnCreateMatcher() throws Exception { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); - when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, - true /* tentative */, null /* additionalMetrics */)); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); runner.run(p, mockRunner); } @@ -601,16 +485,16 @@ public void testStreamingOnCreateMatcher() throws Exception { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.DONE); - when(request.execute()) - .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */ + )); runner.run(p, mockRunner); } @@ -628,15 +512,20 @@ public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); - when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, - true /* tentative */, null /* additionalMetrics */)); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); runner.run(p, mockRunner); } + /** + * Tests that when a streaming pipeline terminates and doesn't fail due to {@link PAssert} that + * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is + * invoked. + */ @Test public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception { options.setStreaming(true); @@ -652,16 +541,15 @@ public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.DONE); - when(request.execute()) - .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); runner.run(p, mockRunner); } @@ -679,12 +567,12 @@ public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestFailureMatcher()); - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - true /* tentative */, null /* additionalMetrics */)); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -695,6 +583,11 @@ public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception { fail("Expected an exception on pipeline failure."); } + /** + * Tests that when a streaming pipeline terminates in FAIL that the {@link + * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is not + * invoked. + */ @Test public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { options.setStreaming(true); @@ -710,24 +603,15 @@ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestFailureMatcher()); when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.FAILED); - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */, - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - verify(mockJob, Mockito.times(1)).waitUntilFinish(any(Duration.class), - any(JobMessagesHandler.class)); - return; - } - fail("Expected an exception on pipeline failure."); + runner.run(p, mockRunner); + // If the onSuccessMatcher were invoked, it would have crashed here. } static class TestSuccessMatcher extends BaseMatcher implements