From 0566f58905e79536226dc7e0b715456d9a4d033e Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 14 Feb 2017 14:54:11 -0800 Subject: [PATCH 1/2] Re-enable UsesTimersInParDo tests in Dataflow runner --- runners/google-cloud-dataflow-java/pom.xml | 1 - .../test/java/org/apache/beam/sdk/transforms/ParDoTest.java | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index bfcb189232fb..4e11f8b316fc 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -126,7 +126,6 @@ org.apache.beam.sdk.testing.UsesGaugeMetrics, org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesMapState, - org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesUnboundedPCollections, org.apache.beam.sdk.testing.UsesTestStream, diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 1a976f22d088..15eec94559cb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -54,7 +54,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; @@ -2766,7 +2765,7 @@ public void onTimer(OnTimerContext context) {} }; PCollection output = pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); - thrown.expect(PipelineExecutionException.class); + thrown.expect(RuntimeException.class); // Note that runners can reasonably vary their message - this matcher should be flexible // and can be evolved. thrown.expectMessage("relative timers"); @@ -2796,7 +2795,7 @@ public void onTimer(OnTimerContext context) {} }; PCollection output = pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); - thrown.expect(PipelineExecutionException.class); + thrown.expect(RuntimeException.class); // Note that runners can reasonably vary their message - this matcher should be flexible // and can be evolved. thrown.expectMessage("event time timer"); From 7909a5efb9e2c07b8dfe6ce5e5b1b9ed74b90c21 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Sun, 30 Apr 2017 16:08:48 -0700 Subject: [PATCH 2/2] TestDataflowRunner: throw AssertionError only when assertion known failed --- .../dataflow/testing/TestDataflowRunner.java | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index ba9d971646b3..f41300bf06ed 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -111,7 +111,7 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler()); try { - Optional result = Optional.absent(); + Optional assertionResult = Optional.absent(); if (options.isStreaming()) { // In streaming, there are infinite retries, so rather than timeout @@ -148,14 +148,16 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { } if (messageHandler.hasSeenError()) { - result = Optional.of(false); + // We know it is a failure, but there are no metrics in streaming + // and the message may be a non-assertion failure + assertionResult = Optional.absent(); } } else { job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler); - result = checkForPAssertSuccess(job); + assertionResult = checkForPAssertSuccess(job); } - if (!result.isPresent()) { + if (!assertionResult.isPresent()) { if (options.isStreaming()) { LOG.warn( "Dataflow job {} did not output a success or failure metric." @@ -167,7 +169,7 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { String.format( "Dataflow job %s did not output a success or failure metric.", job.getJobId())); } - } else if (!result.get()) { + } else if (!assertionResult.get()) { throw new AssertionError( Strings.isNullOrEmpty(messageHandler.getErrorMessage()) ? String.format( @@ -203,20 +205,14 @@ void updatePAssertCount(Pipeline pipeline) { *

If the pipeline is not in a failed/cancelled state and no PAsserts were used within the * pipeline, then this method will state that all PAsserts succeeded. * - * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed, - * Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the - * evidence is inconclusive. + * @return Optional.of(false) if we are certain a PAssert failed. + * Optional.of(true) if we are certain all PAsserts passed. + * Optional.absent() if the evidence is inconclusive, including when the pipeline may have + * failed for other reasons. */ @VisibleForTesting Optional checkForPAssertSuccess(DataflowPipelineJob job) throws IOException { - // If the job failed, this is a definite failure. We only cancel jobs when they fail. - State state = job.getState(); - if (state == State.FAILED || state == State.CANCELLED) { - LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state); - return Optional.of(false); - } - JobMetrics metrics = getJobMetrics(job); if (metrics == null || metrics.getMetrics() == null) { LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); @@ -255,6 +251,16 @@ Optional checkForPAssertSuccess(DataflowPipelineJob job) throws IOExcep return Optional.of(true); } + // If the job failed, this is a definite failure. We only cancel jobs when they fail. + State state = job.getState(); + if (state == State.FAILED || state == State.CANCELLED) { + LOG.info( + "Dataflow job {} terminated in failure state {} without reporting a failed assertion", + job.getJobId(), + state); + return Optional.absent(); + } + LOG.info( "Inconclusive results for Dataflow job {}." + " Found {} success, {} failures out of {} expected assertions.",