From f6c87eb4247bec29b1c74203d10e953ad0654983 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 8 Feb 2017 12:53:27 -0800 Subject: [PATCH 1/6] If a Write operation requests runner-determined sharding, make the Dataflow runner default to maxNumWorkers * 2 shards. --- .../beam/runners/dataflow/DataflowRunner.java | 30 +++++++++++++ .../runners/dataflow/DataflowRunnerTest.java | 44 +++++++++++++++++-- 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ea9db24ff638..fe6f37989c1b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -71,6 +71,7 @@ import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; import org.apache.beam.runners.dataflow.util.DataflowTransport; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -91,6 +92,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; @@ -335,6 +337,10 @@ private List getOverrides(boolean streaming) { PTransformOverride.of( PTransformMatchers.classEqualTo(Read.Unbounded.class), new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this))) + .add( + PTransformOverride.of( + PTransformMatchers.writeWithRunnerDeterminedSharding(), + new StreamingShardedWriteFactory(options))) .add( PTransformOverride.of( PTransformMatchers.classEqualTo(View.CreatePCollectionView.class), @@ -1428,6 +1434,30 @@ public Map mapOutputs( } } + @VisibleForTesting + static class StreamingShardedWriteFactory + implements PTransformOverrideFactory, PDone, WriteFiles> { + DataflowPipelineWorkerPoolOptions options; + + StreamingShardedWriteFactory(PipelineOptions options) { + this.options = options.as(DataflowPipelineWorkerPoolOptions.class); + } + + @Override + public PTransformReplacement, PDone> getReplacementTransform( + AppliedPTransform, PDone, WriteFiles> transform) { + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(transform), + transform.getTransform().withNumShards(options.getMaxNumWorkers() * 2)); + } + + @Override + public Map mapOutputs(Map, PValue> outputs, + PDone newOutput) { + return Collections.emptyMap(); + } + } + @VisibleForTesting static String getContainerImageForJob(DataflowPipelineOptions options) { String workerHarnessContainerImage = options.getWorkerHarnessContainerImage(); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 8f10b18d3eda..a32192f0fa25 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -62,21 +63,27 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; +import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.testing.ExpectedLogs; @@ -87,7 +94,10 @@ import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Description; import org.hamcrest.Matchers; @@ -823,7 +833,6 @@ public void testTempLocationAndNoGcpTempLocationSucceeds() throws Exception { DataflowRunner.fromOptions(options); } - @Test public void testValidProfileLocation() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); @@ -1047,8 +1056,8 @@ public void testToString() { } /** - * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally - * when the runner issuccessfully run. + * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally when the + * runner is successfully run. */ @Test public void testTemplateRunnerFullCompletion() throws Exception { @@ -1127,4 +1136,33 @@ public void testWorkerHarnessContainerImage() { assertThat( getContainerImageForJob(options), equalTo("gcr.io/java/foo")); } + + @Test + public void testStreamingWriteWithNoShardingReturnsNewTransform() { + TestPipeline p = TestPipeline.create(); + StreamingShardedWriteFactory factory = new StreamingShardedWriteFactory(p.getOptions()); + WriteFiles original = WriteFiles.to(new TestSink(tmpFolder.toString())); + PCollection objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); + AppliedPTransform, PDone, WriteFiles> originalApplication = + AppliedPTransform.of( + "writefiles", objs.expand(), Collections., PValue>emptyMap(), original, p); + + assertThat( + factory.getReplacementTransform(originalApplication).getTransform(), + not(equalTo((Object) original))); + } + + private static class TestSink extends FileBasedSink { + @Override + public void validate(PipelineOptions options) {} + + TestSink(String tmpFolder) { + super(StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)), + null); + } + @Override + public WriteOperation createWriteOperation() { + throw new IllegalArgumentException("Should not be used"); + } + } } From c9dedcb5bcb998e519565f975281389a5b60847c Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Fri, 12 May 2017 14:32:30 -0700 Subject: [PATCH 2/6] Address comments. --- .../beam/runners/dataflow/DataflowRunner.java | 7 ++++++- .../runners/dataflow/DataflowRunnerTest.java | 16 +++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index fe6f37989c1b..5ce18d3b1575 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1446,7 +1446,12 @@ static class StreamingShardedWriteFactory @Override public PTransformReplacement, PDone> getReplacementTransform( AppliedPTransform, PDone, WriteFiles> transform) { - return PTransformReplacement.of( + // By default, if numShards is not set WriteFiles will produce one file per bundle. In + // streaming, there are large numbers of small bundles, resulting in many tiny files. + // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files. + // (current_num_workers * 2 might be a better choice, but that value is not easily available + // today). + return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), transform.getTransform().withNumShards(options.getMaxNumWorkers() * 2)); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index a32192f0fa25..17fafaa72639 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -66,6 +66,7 @@ import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; @@ -1139,17 +1140,22 @@ public void testWorkerHarnessContainerImage() { @Test public void testStreamingWriteWithNoShardingReturnsNewTransform() { - TestPipeline p = TestPipeline.create(); - StreamingShardedWriteFactory factory = new StreamingShardedWriteFactory(p.getOptions()); + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10); + TestPipeline p = TestPipeline.fromOptions(options); + + StreamingShardedWriteFactory factory = + new StreamingShardedWriteFactory<>(p.getOptions()); WriteFiles original = WriteFiles.to(new TestSink(tmpFolder.toString())); PCollection objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); AppliedPTransform, PDone, WriteFiles> originalApplication = AppliedPTransform.of( "writefiles", objs.expand(), Collections., PValue>emptyMap(), original, p); - assertThat( - factory.getReplacementTransform(originalApplication).getTransform(), - not(equalTo((Object) original))); + WriteFiles replacement = (WriteFiles) + factory.getReplacementTransform(originalApplication).getTransform(); + assertThat(replacement, not(equalTo((Object) original))); + assertThat(replacement.getNumShards().get(), equalTo(20)); } private static class TestSink extends FileBasedSink { From b3e0cc717c640b1a754d0f232eb0567323e22c20 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sat, 13 May 2017 01:15:17 -0700 Subject: [PATCH 3/6] Make sure not to set zero as the number of shards. --- .../beam/runners/dataflow/DataflowRunner.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5ce18d3b1575..0599f1a4200f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1437,6 +1437,7 @@ public Map mapOutputs( @VisibleForTesting static class StreamingShardedWriteFactory implements PTransformOverrideFactory, PDone, WriteFiles> { + static final int DEFAULT_NUM_SHARDS = 10; DataflowPipelineWorkerPoolOptions options; StreamingShardedWriteFactory(PipelineOptions options) { @@ -1451,9 +1452,19 @@ public PTransformReplacement, PDone> getReplacementTransform( // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files. // (current_num_workers * 2 might be a better choice, but that value is not easily available // today). + // If the user does not set either numWorkers or maxNumWorkers, default to 10 shards. + int numShards; + if (options.getMaxNumWorkers() > 0) { + numShards = options.getMaxNumWorkers() * 2; + } else if (options.getNumWorkers() > 0) { + numShards = options.getNumWorkers() * 2; + } else { + numShards = DEFAULT_NUM_SHARDS; + } + return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - transform.getTransform().withNumShards(options.getMaxNumWorkers() * 2)); + transform.getTransform().withNumShards(numShards)); } @Override From 272a18ae515f208a9344feaaefc14f38052ec86a Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 16 May 2017 17:30:07 -0700 Subject: [PATCH 4/6] Address comments. --- .../apache/beam/runners/dataflow/DataflowRunner.java | 2 ++ .../beam/runners/dataflow/DataflowRunnerTest.java | 12 +++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 0599f1a4200f..13ebafadee62 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1437,6 +1437,8 @@ public Map mapOutputs( @VisibleForTesting static class StreamingShardedWriteFactory implements PTransformOverrideFactory, PDone, WriteFiles> { + // We pick 10 as a a default, as it works well with the default number of workers started + // by Dataflow. static final int DEFAULT_NUM_SHARDS = 10; DataflowPipelineWorkerPoolOptions options; diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 17fafaa72639..aae21cffd4c0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1142,6 +1142,16 @@ public void testWorkerHarnessContainerImage() { public void testStreamingWriteWithNoShardingReturnsNewTransform() { PipelineOptions options = TestPipeline.testingPipelineOptions(); options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10); + testStreamingWriteOverride(options, 20); + } + + @Test + public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() { + PipelineOptions options = TestPipeline.testingPipelineOptions(); + testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS); + } + + private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) { TestPipeline p = TestPipeline.fromOptions(options); StreamingShardedWriteFactory factory = @@ -1155,7 +1165,7 @@ public void testStreamingWriteWithNoShardingReturnsNewTransform() { WriteFiles replacement = (WriteFiles) factory.getReplacementTransform(originalApplication).getTransform(); assertThat(replacement, not(equalTo((Object) original))); - assertThat(replacement.getNumShards().get(), equalTo(20)); + assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards)); } private static class TestSink extends FileBasedSink { From 4f6bace46f44aeb22304bf93491a89f1f999931a Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 18 Jun 2017 07:18:35 -0700 Subject: [PATCH 5/6] Use WriteFilesTranslation properly. --- .../beam/runners/dataflow/DataflowRunner.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 13ebafadee62..4e73b0725808 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -67,6 +67,7 @@ import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.core.construction.UnconsumedReads; +import org.apache.beam.runners.core.construction.WriteFilesTranslation; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; @@ -1464,9 +1465,17 @@ public PTransformReplacement, PDone> getReplacementTransform( numShards = DEFAULT_NUM_SHARDS; } - return PTransformReplacement.of( + try { + WriteFiles replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform)); + if (WriteFilesTranslation.isWindowedWrites(transform)) { + replacement = replacement.withWindowedWrites(); + } + return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - transform.getTransform().withNumShards(numShards)); + replacement.withNumShards(numShards)); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override From 9b9feb1a72c14fc192c0baf6d8285f3be99aef98 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Mon, 19 Jun 2017 13:23:45 -0700 Subject: [PATCH 6/6] Reorder overrides. --- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 4e73b0725808..ab95f7bf7146 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -328,6 +328,10 @@ private List getOverrides(boolean streaming) { new StreamingFnApiCreateOverrideFactory())); } overridesBuilder + .add( + PTransformOverride.of( + PTransformMatchers.writeWithRunnerDeterminedSharding(), + new StreamingShardedWriteFactory(options))) .add( // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and // must precede it @@ -338,10 +342,6 @@ private List getOverrides(boolean streaming) { PTransformOverride.of( PTransformMatchers.classEqualTo(Read.Unbounded.class), new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this))) - .add( - PTransformOverride.of( - PTransformMatchers.writeWithRunnerDeterminedSharding(), - new StreamingShardedWriteFactory(options))) .add( PTransformOverride.of( PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),