diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java index f604c0bd9e..1dc11d3793 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java @@ -100,7 +100,7 @@ public Void apply(Iterable> input) { values.add(kv.getValue()); } if (timeBound) { - assertTrue(values.size() > 2); + assertTrue(values.size() >= 1); } else if (dedup) { // Verify that at least some data came through. The chance of 90% of the input // being duplicates is essentially zero. @@ -111,7 +111,7 @@ public Void apply(Iterable> input) { Collections.sort(values); for (int i = 0; i < values.size(); i++) { assertEquals(i, (int) values.get(i)); - } + } if (finalizeTracker != null) { assertThat(finalizeTracker, containsInAnyOrder(values.size() - 1)); } @@ -126,7 +126,7 @@ private void test(boolean dedup, boolean timeBound) throws Exception { finalizeTracker = new ArrayList<>(); TestCountingSource.setFinalizeTracker(finalizeTracker); } - TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE); + TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting(); if (dedup) { source = source.withDedup(); } @@ -135,11 +135,6 @@ private void test(boolean dedup, boolean timeBound) throws Exception { ? p.apply(Read.from(source).withMaxReadTime(Duration.millis(200))) : p.apply(Read.from(source).withMaxNumRecords(NUM_RECORDS)); - List> expectedOutput = new ArrayList<>(); - for (int i = 0; i < NUM_RECORDS; i++) { - expectedOutput.add(KV.of(0, i)); - } - // Because some of the NUM_RECORDS elements read are dupes, the final output // will only have output from 0 to n where n < NUM_RECORDS. DataflowAssert.that(output).satisfies(new Checker(dedup, timeBound));