diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 695e196acfca..3976392de685 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -59,7 +59,6 @@ import org.apache.avro.reflect.Nullable; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -81,10 +80,12 @@ import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; @@ -307,25 +308,32 @@ public void testAvroIOContinuouslyWriteAndReadMultipleFilepatterns() throws Thro for (int i = 0; i < 7; ++i) { (i < 3 ? firstValues : secondValues).add(mapFn.apply((long) i)); } - writePipeline.apply( + // Configure windowing of the input so that it fires every time a new element is generated, + // so that files are written continuously. + Window window = Window.into(FixedWindows.of(Duration.millis(100))) + .withAllowedLateness(Duration.ZERO) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes(); + readPipeline.apply( "Sequence first", GenerateSequence.from(0).to(3).withRate(1, Duration.millis(300))) + .apply("Window first", window) .apply("Map first", MapElements.via(mapFn)) .apply( "Write first", AvroIO.write(GenericClass.class) .to(tmpFolder.getRoot().getAbsolutePath() + "/first") - .withNumShards(2)); - writePipeline.apply( + .withNumShards(2).withWindowedWrites()); + readPipeline.apply( "Sequence second", GenerateSequence.from(3).to(7).withRate(1, Duration.millis(300))) + .apply("Window second", window) .apply("Map second", MapElements.via(mapFn)) .apply( "Write second", AvroIO.write(GenericClass.class) .to(tmpFolder.getRoot().getAbsolutePath() + "/second") - .withNumShards(3)); - PipelineResult writeRes = writePipeline.run(); + .withNumShards(3).withWindowedWrites()); // Test read(), readAll(), parse(), and parseAllGenericRecords() with watchForNewFiles(). PAssert.that( @@ -374,7 +382,6 @@ public void testAvroIOContinuouslyWriteAndReadMultipleFilepatterns() throws Thro .containsInAnyOrder(Iterables.concat(firstValues, secondValues)); readPipeline.run(); - writeRes.waitUntilFinish(); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index f7bb12cf09b0..e4fca4776f8c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -60,7 +60,6 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; - import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; @@ -74,9 +73,14 @@ import org.apache.beam.sdk.testing.UsesSplittableParDo; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; @@ -827,50 +831,30 @@ public void testReadFiles() throws IOException { public void testReadWatchForNewFiles() throws IOException, InterruptedException { final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch"); basePath.toFile().mkdir(); + + p.apply(GenerateSequence.from(0).to(10).withRate(1, Duration.millis(100))) + .apply( + Window.into(FixedWindows.of(Duration.millis(150))) + .withAllowedLateness(Duration.ZERO) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes()) + .apply(ToString.elements()) + .apply( + TextIO.write() + .to(basePath.resolve("data").toString()) + .withNumShards(1) + .withWindowedWrites()); + PCollection lines = p.apply( TextIO.read() .from(basePath.resolve("*").toString()) - // Make sure that compression type propagates into readAll() - .withCompression(ZIP) .watchForNewFiles( Duration.millis(100), Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))); - Thread writer = - new Thread() { - @Override - public void run() { - try { - Thread.sleep(1000); - writeToFile( - Arrays.asList("a.1", "a.2"), - tempFolder, - basePath.resolve("fileA").toString(), - ZIP); - Thread.sleep(300); - writeToFile( - Arrays.asList("b.1", "b.2"), - tempFolder, - basePath.resolve("fileB").toString(), - ZIP); - Thread.sleep(300); - writeToFile( - Arrays.asList("c.1", "c.2"), - tempFolder, - basePath.resolve("fileC").toString(), - ZIP); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - } - }; - writer.start(); - - PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", "c.1", "c.2"); + PAssert.that(lines).containsInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); p.run(); - - writer.join(); } } }