From 238b6afbe23f84c8c8b5a86364b995bba4e0e530 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 29 Sep 2016 09:40:07 -0700 Subject: [PATCH] BoundedReadFromUnboundedSourceTest: fixes 1) Tests actually require non-splitting source (assert that the Key is 0). Use withoutSplitting. 2) Indentation and unused code (expectedOutput). 3) For these time-based tests, runners cannot guarantee we'll produce at least 3 elements, but we can expect there to be at least 1. These appear in Beam but were not backported: https://github.com/apache/incubator-beam/commit/fecd0362f5c1710c7fee0b2a03cc8549d1b8f75a https://github.com/apache/incubator-beam/commit/ee7272fe77d78240c6f8e0b79cdc93b460171b8c --- .../sdk/io/BoundedReadFromUnboundedSourceTest.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java index f604c0bd9e..1dc11d3793 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java @@ -100,7 +100,7 @@ public Void apply(Iterable> input) { values.add(kv.getValue()); } if (timeBound) { - assertTrue(values.size() > 2); + assertTrue(values.size() >= 1); } else if (dedup) { // Verify that at least some data came through. The chance of 90% of the input // being duplicates is essentially zero. @@ -111,7 +111,7 @@ public Void apply(Iterable> input) { Collections.sort(values); for (int i = 0; i < values.size(); i++) { assertEquals(i, (int) values.get(i)); - } + } if (finalizeTracker != null) { assertThat(finalizeTracker, containsInAnyOrder(values.size() - 1)); } @@ -126,7 +126,7 @@ private void test(boolean dedup, boolean timeBound) throws Exception { finalizeTracker = new ArrayList<>(); TestCountingSource.setFinalizeTracker(finalizeTracker); } - TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE); + TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting(); if (dedup) { source = source.withDedup(); } @@ -135,11 +135,6 @@ private void test(boolean dedup, boolean timeBound) throws Exception { ? p.apply(Read.from(source).withMaxReadTime(Duration.millis(200))) : p.apply(Read.from(source).withMaxNumRecords(NUM_RECORDS)); - List> expectedOutput = new ArrayList<>(); - for (int i = 0; i < NUM_RECORDS; i++) { - expectedOutput.add(KV.of(0, i)); - } - // Because some of the NUM_RECORDS elements read are dupes, the final output // will only have output from 0 to n where n < NUM_RECORDS. DataflowAssert.that(output).satisfies(new Checker(dedup, timeBound));