From c8c3af517d3e25906e1f56df06a2793d2f7a11a0 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Tue, 14 Jun 2016 14:03:41 -0700 Subject: [PATCH 1/2] Write: add support for setting a fixed number of shards And remove special support in Dataflow and Direct runners for it. --- .../direct/AvroIOShardedWriteFactory.java | 76 ----- .../beam/runners/direct/DirectRunner.java | 4 - .../runners/direct/ShardControlledWrite.java | 81 ----- .../direct/TextIOShardedWriteFactory.java | 78 ----- .../direct/AvroIOShardedWriteFactoryTest.java | 120 ------- .../direct/TextIOShardedWriteFactoryTest.java | 120 ------- .../beam/runners/dataflow/DataflowRunner.java | 258 --------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 12 +- .../java/org/apache/beam/sdk/io/TextIO.java | 15 +- .../java/org/apache/beam/sdk/io/Write.java | 306 ++++++++++++++---- .../org/apache/beam/sdk/io/WriteTest.java | 132 +++++++- 11 files changed, 372 insertions(+), 830 deletions(-) delete mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java delete mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java delete mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java delete mode 100644 runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java delete mode 100644 runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java deleted file mode 100644 index 7422f273b7e5..000000000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -class AvroIOShardedWriteFactory implements PTransformOverrideFactory { - @Override - public PTransform override( - PTransform transform) { - if (transform instanceof AvroIO.Write.Bound) { - @SuppressWarnings("unchecked") - AvroIO.Write.Bound originalWrite = (AvroIO.Write.Bound) transform; - if (originalWrite.getNumShards() > 1 - || (originalWrite.getNumShards() == 1 - && !"".equals(originalWrite.getShardNameTemplate()))) { - @SuppressWarnings("unchecked") - PTransform override = - (PTransform) new AvroIOShardedWrite(originalWrite); - return override; - } - } - return transform; - } - - private class AvroIOShardedWrite extends ShardControlledWrite { - private final AvroIO.Write.Bound initial; - - private AvroIOShardedWrite(AvroIO.Write.Bound initial) { - this.initial = initial; - } - - @Override - int getNumShards() { - return initial.getNumShards(); - } - - @Override - PTransform, PDone> getSingleShardTransform(int shardNum) { - String shardName = - IOChannelUtils.constructName( - initial.getFilenamePrefix(), - initial.getShardNameTemplate(), - initial.getFilenameSuffix(), - shardNum, - getNumShards()); - return initial.withoutSharding().to(shardName).withSuffix(""); - } - - @Override - protected PTransform, PDone> delegate() { - return initial; - } - } -} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 25847398940a..7408c0bf56a3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -24,8 +24,6 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AggregatorPipelineExtractor; import org.apache.beam.sdk.runners.AggregatorRetrievalException; @@ -80,8 +78,6 @@ public class DirectRunner ImmutableMap., PTransformOverrideFactory>builder() .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory()) .put(CreatePCollectionView.class, new ViewOverrideFactory()) - .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory()) - .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory()) .build(); /** diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java deleted file mode 100644 index 4687f85c1fc9..000000000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.Partition; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PDone; - -import java.util.concurrent.ThreadLocalRandom; - -/** - * A write that explicitly controls its number of output shards. - */ -abstract class ShardControlledWrite - extends ForwardingPTransform, PDone> { - @Override - public PDone apply(PCollection input) { - int numShards = getNumShards(); - checkArgument( - numShards >= 1, - "%s should only be applied if the output has a controlled number of shards (> 1); got %s", - getClass().getSimpleName(), - getNumShards()); - PCollectionList shards = - input.apply( - "PartitionInto" + numShards + "Shards", - Partition.of(getNumShards(), new RandomSeedPartitionFn())); - for (int i = 0; i < shards.size(); i++) { - PCollection shard = shards.get(i); - PTransform, PDone> writeShard = getSingleShardTransform(i); - shard.apply(String.format("%s(Shard:%s)", writeShard.getName(), i), writeShard); - } - return PDone.in(input.getPipeline()); - } - - /** - * Returns the number of shards this {@link PTransform} should write to. - */ - abstract int getNumShards(); - - /** - * Returns a {@link PTransform} that performs a write to the shard with the specified shard - * number. - * - *

This method will be called n times, where n is the value of {@link #getNumShards()}, for - * shard numbers {@code [0...n)}. - */ - abstract PTransform, PDone> getSingleShardTransform(int shardNum); - - private static class RandomSeedPartitionFn implements Partition.PartitionFn { - int nextPartition = -1; - @Override - public int partitionFor(T elem, int numPartitions) { - if (nextPartition < 0) { - nextPartition = ThreadLocalRandom.current().nextInt(numPartitions); - } - nextPartition++; - nextPartition %= numPartitions; - return nextPartition; - } - } -} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java deleted file mode 100644 index be1bf186a865..000000000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.TextIO.Write.Bound; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -class TextIOShardedWriteFactory implements PTransformOverrideFactory { - - @Override - public PTransform override( - PTransform transform) { - if (transform instanceof TextIO.Write.Bound) { - @SuppressWarnings("unchecked") - TextIO.Write.Bound originalWrite = (TextIO.Write.Bound) transform; - if (originalWrite.getNumShards() > 1 - || (originalWrite.getNumShards() == 1 - && !"".equals(originalWrite.getShardNameTemplate()))) { - @SuppressWarnings("unchecked") - PTransform override = - (PTransform) new TextIOShardedWrite(originalWrite); - return override; - } - } - return transform; - } - - private static class TextIOShardedWrite extends ShardControlledWrite { - private final TextIO.Write.Bound initial; - - private TextIOShardedWrite(Bound initial) { - this.initial = initial; - } - - @Override - int getNumShards() { - return initial.getNumShards(); - } - - @Override - PTransform, PDone> getSingleShardTransform(int shardNum) { - String shardName = - IOChannelUtils.constructName( - initial.getFilenamePrefix(), - initial.getShardTemplate(), - initial.getFilenameSuffix(), - shardNum, - getNumShards()); - return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding(); - } - - @Override - protected PTransform, PDone> delegate() { - return initial; - } - } -} diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java deleted file mode 100644 index d94113ac7d23..000000000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.theInstance; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.AvroIOTest; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.File; - -/** - * Tests for {@link AvroIOShardedWriteFactory}. - */ -@RunWith(JUnit4.class) -public class AvroIOShardedWriteFactoryTest { - - @Rule public TemporaryFolder tmp = new TemporaryFolder(); - private AvroIOShardedWriteFactory factory; - - @Before - public void setup() { - factory = new AvroIOShardedWriteFactory(); - } - - @Test - public void originalWithoutShardingReturnsOriginal() throws Exception { - File file = tmp.newFile("foo"); - PTransform, PDone> original = - AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding(); - PTransform, PDone> overridden = factory.override(original); - - assertThat(overridden, theInstance(original)); - } - - @Test - public void originalShardingNotSpecifiedReturnsOriginal() throws Exception { - File file = tmp.newFile("foo"); - PTransform, PDone> original = - AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()); - PTransform, PDone> overridden = factory.override(original); - - assertThat(overridden, theInstance(original)); - } - - @Test - public void originalShardedToOneReturnsExplicitlySharded() throws Exception { - File file = tmp.newFile("foo"); - AvroIO.Write.Bound original = - AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1); - PTransform, PDone> overridden = factory.override(original); - - assertThat(overridden, not(Matchers., PDone>>equalTo(original))); - - Pipeline p = getPipeline(); - String[] elems = new String[] {"foo", "bar", "baz"}; - p.apply(Create.of(elems)).apply(overridden); - - file.delete(); - - p.run(); - AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate()); - } - - @Test - public void originalShardedToManyReturnsExplicitlySharded() throws Exception { - File file = tmp.newFile("foo"); - AvroIO.Write.Bound original = - AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3); - PTransform, PDone> overridden = factory.override(original); - - assertThat(overridden, not(Matchers., PDone>>equalTo(original))); - - Pipeline p = getPipeline(); - String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"}; - p.apply(Create.of(elems)).apply(overridden); - - file.delete(); - p.run(); - AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate()); - } - - private Pipeline getPipeline() { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setRunner(DirectRunner.class); - return TestPipeline.fromOptions(options); - } -} diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java deleted file mode 100644 index 5ede931df1a7..000000000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.theInstance; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.TextIOTest; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.File; - -/** - * Tests for {@link TextIOShardedWriteFactory}. - */ -@RunWith(JUnit4.class) -public class TextIOShardedWriteFactoryTest { - @Rule public TemporaryFolder tmp = new TemporaryFolder(); - private TextIOShardedWriteFactory factory; - - @Before - public void setup() { - factory = new TextIOShardedWriteFactory(); - } - - @Test - public void originalWithoutShardingReturnsOriginal() throws Exception { - File file = tmp.newFile("foo"); - PTransform, PDone> original = - TextIO.Write.to(file.getAbsolutePath()).withoutSharding(); - PTransform, PDone> overridden = factory.override(original); - - assertThat(overridden, theInstance(original)); - } - - @Test - public void originalShardingNotSpecifiedReturnsOriginal() throws Exception { - File file = tmp.newFile("foo"); - PTransform, PDone> original = TextIO.Write.to(file.getAbsolutePath()); - PTransform, PDone> overridden = factory.override(original); - - assertThat(overridden, theInstance(original)); - } - - @Test - public void originalShardedToOneReturnsExplicitlySharded() throws Exception { - File file = tmp.newFile("foo"); - TextIO.Write.Bound original = - TextIO.Write.to(file.getAbsolutePath()).withNumShards(1); - PTransform, PDone> overridden = factory.override(original); - - assertThat(overridden, not(Matchers., PDone>>equalTo(original))); - - Pipeline p = getPipeline(); - String[] elems = new String[] {"foo", "bar", "baz"}; - p.apply(Create.of(elems)).apply(overridden); - - file.delete(); - - p.run(); - TextIOTest.assertOutputFiles( - elems, StringUtf8Coder.of(), 1, tmp, "foo", original.getShardNameTemplate()); - } - - @Test - public void originalShardedToManyReturnsExplicitlySharded() throws Exception { - File file = tmp.newFile("foo"); - TextIO.Write.Bound original = TextIO.Write.to(file.getAbsolutePath()).withNumShards(3); - PTransform, PDone> overridden = factory.override(original); - - assertThat(overridden, not(Matchers., PDone>>equalTo(original))); - - Pipeline p = getPipeline(); - String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"}; - p.apply(Create.of(elems)).apply(overridden); - - file.delete(); - p.run(); - TextIOTest.assertOutputFiles( - elems, StringUtf8Coder.of(), 3, tmp, "foo", original.getShardNameTemplate()); - } - - private Pipeline getPipeline() { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setRunner(DirectRunner.class); - return TestPipeline.fromOptions(options); - } -} diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 33f97e664f3a..70dd94fed972 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; @@ -66,7 +65,6 @@ import org.apache.beam.sdk.io.PubsubUnboundedSink; import org.apache.beam.sdk.io.PubsubUnboundedSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.ShardNameTemplate; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.Write; @@ -91,7 +89,6 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -376,8 +373,6 @@ && isNullOrEmpty(dataflowOptions.getStagingLocation())), builder.put(Read.Unbounded.class, UnsupportedIO.class); builder.put(Window.Bound.class, AssignWindows.class); builder.put(Write.Bound.class, BatchWrite.class); - builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class); - builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class); // In batch mode must use the custom Pubsub bounded source/sink. builder.put(PubsubUnboundedSource.class, UnsupportedIO.class); builder.put(PubsubUnboundedSink.class, UnsupportedIO.class); @@ -2047,52 +2042,6 @@ public Iterable apply(Iterable> input) { } } - /** - * A {@link PTransform} that uses shuffle to create a fusion break. This allows pushing - * parallelism limits such as sharding controls further down the pipeline. - */ - private static class ReshardForWrite extends PTransform, PCollection> { - @Override - public PCollection apply(PCollection input) { - return input - // TODO: This would need to be adapted to write per-window shards. - .apply( - Window.into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()) - .apply( - "RandomKey", - ParDo.of( - new DoFn>() { - transient long counter, step; - - @Override - public void startBundle(Context c) { - counter = (long) (Math.random() * Long.MAX_VALUE); - step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE); - } - - @Override - public void processElement(ProcessContext c) { - counter += step; - c.output(KV.of(counter, c.element())); - } - })) - .apply(GroupByKey.create()) - .apply( - "Ungroup", - ParDo.of( - new DoFn>, T>() { - @Override - public void processElement(ProcessContext c) { - for (T item : c.element().getValue()) { - c.output(item); - } - } - })); - } - } - /** * Specialized implementation which overrides * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound} to provide Google @@ -2121,213 +2070,6 @@ public PDone apply(PCollection input) { } } - /** - * Specialized implementation which overrides - * {@link org.apache.beam.sdk.io.TextIO.Write.Bound TextIO.Write.Bound} with - * a native sink instead of a custom sink as workaround until custom sinks - * have support for sharding controls. - */ - private static class BatchTextIOWrite extends PTransform, PDone> { - private final TextIO.Write.Bound transform; - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public BatchTextIOWrite(DataflowRunner runner, TextIO.Write.Bound transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection input) { - if (transform.getNumShards() > 0) { - return input - .apply(new ReshardForWrite()) - .apply(new BatchTextIONativeWrite<>(transform)); - } else { - return transform.apply(input); - } - } - } - - /** - * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way - * to provide the native definition of the Text sink. - */ - private static class BatchTextIONativeWrite extends PTransform, PDone> { - private final TextIO.Write.Bound transform; - public BatchTextIONativeWrite(TextIO.Write.Bound transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection input) { - return PDone.in(input.getPipeline()); - } - - static { - DataflowPipelineTranslator.registerTransformTranslator( - BatchTextIONativeWrite.class, new BatchTextIONativeWriteTranslator()); - } - } - - /** - * TextIO.Write.Bound support code for the Dataflow backend when applying parallelism limits - * through user requested sharding limits. - */ - private static class BatchTextIONativeWriteTranslator - implements TransformTranslator> { - @SuppressWarnings("unchecked") - @Override - public void translate(@SuppressWarnings("rawtypes") BatchTextIONativeWrite transform, - TranslationContext context) { - translateWriteHelper(transform, transform.transform, context); - } - - private void translateWriteHelper( - BatchTextIONativeWrite transform, - TextIO.Write.Bound originalTransform, - TranslationContext context) { - // Note that the original transform can not be used during add step/add input - // and is only passed in to get properties from it. - - checkState(originalTransform.getNumShards() > 0, - "Native TextSink is expected to only be used when sharding controls are required."); - - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - - // TODO: drop this check when server supports alternative templates. - switch (originalTransform.getShardTemplate()) { - case ShardNameTemplate.INDEX_OF_MAX: - break; // supported by server - case "": - // Empty shard template allowed - forces single output. - checkArgument(originalTransform.getNumShards() <= 1, - "Num shards must be <= 1 when using an empty sharding template"); - break; - default: - throw new UnsupportedOperationException("Shard template " - + originalTransform.getShardTemplate() - + " not yet supported by Dataflow service"); - } - - // TODO: How do we want to specify format and - // format-specific properties? - context.addInput(PropertyNames.FORMAT, "text"); - context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix()); - context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, - originalTransform.getShardNameTemplate()); - context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix()); - context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation()); - context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards()); - context.addEncodingInput( - WindowedValue.getValueOnlyCoder(originalTransform.getCoder())); - - } - } - - /** - * Specialized implementation which overrides - * {@link org.apache.beam.sdk.io.AvroIO.Write.Bound AvroIO.Write.Bound} with - * a native sink instead of a custom sink as workaround until custom sinks - * have support for sharding controls. - */ - private static class BatchAvroIOWrite extends PTransform, PDone> { - private final AvroIO.Write.Bound transform; - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public BatchAvroIOWrite(DataflowRunner runner, AvroIO.Write.Bound transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection input) { - if (transform.getNumShards() > 0) { - return input - .apply(new ReshardForWrite()) - .apply(new BatchAvroIONativeWrite<>(transform)); - } else { - return transform.apply(input); - } - } - } - - /** - * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way - * to provide the native definition of the Avro sink. - */ - private static class BatchAvroIONativeWrite extends PTransform, PDone> { - private final AvroIO.Write.Bound transform; - public BatchAvroIONativeWrite(AvroIO.Write.Bound transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection input) { - return PDone.in(input.getPipeline()); - } - - static { - DataflowPipelineTranslator.registerTransformTranslator( - BatchAvroIONativeWrite.class, new BatchAvroIONativeWriteTranslator()); - } - } - - /** - * AvroIO.Write.Bound support code for the Dataflow backend when applying parallelism limits - * through user requested sharding limits. - */ - private static class BatchAvroIONativeWriteTranslator - implements TransformTranslator> { - @SuppressWarnings("unchecked") - @Override - public void translate(@SuppressWarnings("rawtypes") BatchAvroIONativeWrite transform, - TranslationContext context) { - translateWriteHelper(transform, transform.transform, context); - } - - private void translateWriteHelper( - BatchAvroIONativeWrite transform, - AvroIO.Write.Bound originalTransform, - TranslationContext context) { - // Note that the original transform can not be used during add step/add input - // and is only passed in to get properties from it. - - checkState(originalTransform.getNumShards() > 0, - "Native AvroSink is expected to only be used when sharding controls are required."); - - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - - // TODO: drop this check when server supports alternative templates. - switch (originalTransform.getShardTemplate()) { - case ShardNameTemplate.INDEX_OF_MAX: - break; // supported by server - case "": - // Empty shard template allowed - forces single output. - checkArgument(originalTransform.getNumShards() <= 1, - "Num shards must be <= 1 when using an empty sharding template"); - break; - default: - throw new UnsupportedOperationException("Shard template " - + originalTransform.getShardTemplate() - + " not yet supported by Dataflow service"); - } - - context.addInput(PropertyNames.FORMAT, "avro"); - context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix()); - context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, originalTransform.getShardTemplate()); - context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix()); - context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation()); - context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards()); - context.addEncodingInput( - WindowedValue.getValueOnlyCoder( - AvroCoder.of(originalTransform.getType(), originalTransform.getSchema()))); - } - } - /** * Specialized (non-)implementation for * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 280cd1267bd6..718461a826b9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -647,14 +647,14 @@ public PDone apply(PCollection input) { throw new IllegalStateException("need to set the schema of an AvroIO.Write transform"); } - // Note that custom sinks currently do not expose sharding controls. - // Thus pipeline runner writers need to individually add support internally to - // apply user requested sharding limits. - return input.apply( - "Write", + org.apache.beam.sdk.io.Write.Bound write = org.apache.beam.sdk.io.Write.to( new AvroSink<>( - filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema)))); + filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema))); + if (getNumShards() > 0) { + write = write.withNumShards(getNumShards()); + } + return input.apply("Write", write); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 9dd367968be8..64db3f76311c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -109,7 +109,7 @@ * } * *

Permissions

- *

When run using the {@link DirectRunner}, your pipeline can read and write text files + *

When run using the {@code DirectRunner}, your pipeline can read and write text files * on your local drive and remote text files on Google Cloud Storage that you have access to using * your {@code gcloud} credentials. When running in the Dataflow service, the pipeline can only * read and write files from GCS. For more information about permissions, see the Cloud Dataflow @@ -608,12 +608,13 @@ public PDone apply(PCollection input) { "need to set the filename prefix of a TextIO.Write transform"); } - // Note that custom sinks currently do not expose sharding controls. - // Thus pipeline runner writers need to individually add support internally to - // apply user requested sharding limits. - return input.apply("Write", org.apache.beam.sdk.io.Write.to( - new TextSink<>( - filenamePrefix, filenameSuffix, shardTemplate, coder))); + org.apache.beam.sdk.io.Write.Bound write = + org.apache.beam.sdk.io.Write.to( + new TextSink<>(filenamePrefix, filenameSuffix, shardTemplate, coder)); + if (getNumShards() > 0) { + write = write.withNumShards(getNumShards()); + } + return input.apply("Write", write); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index df6e4d29d6d8..dc17a4c03d71 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkNotNull; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; @@ -26,54 +28,80 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +import com.google.api.client.util.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; +import java.util.List; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; /** * A {@link PTransform} that writes to a {@link Sink}. A write begins with a sequential global * initialization of a sink, followed by a parallel write, and ends with a sequential finalization - * of the write. The output of a write is {@link PDone}. In the case of an empty PCollection, only - * the global initialization and finalization will be performed. + * of the write. The output of a write is {@link PDone}. + * + *

By default, every bundle in the input {@link PCollection} will be processed by a + * {@link WriteOperation}, so the number of outputs will vary based on runner behavior, though at + * least 1 output will always be produced. The exact parallelism of the write stage can be + * controlled using {@link Write.Bound#withNumShards}, typically used to control how many files are + * produced or to globally limit the number of workers connecting to an external service. However, + * this option can often hurt performance: it adds an additional {@link GroupByKey} to the pipeline. + * + *

{@code Write} re-windows the data into the global window, so it is typically not well suited + * to use in streaming pipelines. * - *

Currently, only batch workflows can contain Write transforms. + *

Example usage with runner-controlled sharding: * - *

Example usage: + *

{@code p.apply(Write.to(new MySink(...)));}
+ + *

Example usage with a fixed number of shards: * - *

{@code p.apply(Write.to(new MySink(...)));} + *

{@code p.apply(Write.to(new MySink(...)).withNumShards(3));}
*/ @Experimental(Experimental.Kind.SOURCE_SINK) public class Write { private static final Logger LOG = LoggerFactory.getLogger(Write.class); /** - * Creates a Write transform that writes to the given Sink. + * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner + * control how many different shards are produced. */ public static Bound to(Sink sink) { - return new Bound<>(sink); + checkNotNull(sink, "sink"); + return new Bound<>(sink, 0 /* runner-controlled sharding */); } /** - * A {@link PTransform} that writes to a {@link Sink}. See {@link Write} and {@link Sink} for - * documentation about writing to Sinks. + * A {@link PTransform} that writes to a {@link Sink}. See the class-level Javadoc for more + * information. + * + * @see Write + * @see Sink */ public static class Bound extends PTransform, PDone> { private final Sink sink; + private int numShards; - private Bound(Sink sink) { + private Bound(Sink sink, int numShards) { this.sink = sink; + this.numShards = numShards; } @Override @@ -92,6 +120,15 @@ public void populateDisplayData(DisplayData.Builder builder) { .include(sink); } + /** + * Returns the number of shards that will be produced in the output. + * + * @see Write for more information + */ + public int getNumShards() { + return numShards; + } + /** * Returns the {@link Sink} associated with this PTransform. */ @@ -99,6 +136,153 @@ public Sink getSink() { return sink; } + /** + * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the + * specified number of shards. + * + *

This option should be used sparingly as it can hurt performance. See {@link Write} for + * more information. + * + *

A value less than or equal to 0 will be equivalent to the default behavior of + * runner-controlled sharding. + */ + public Bound withNumShards(int numShards) { + return new Bound<>(sink, Math.max(numShards, 0)); + } + + /** + * Writes all the elements in a bundle using a {@link Writer} produced by the + * {@link WriteOperation} associated with the {@link Sink}. + */ + private class WriteBundles extends DoFn { + // Writer that will write the records in this bundle. Lazily + // initialized in processElement. + private Writer writer = null; + private final PCollectionView> writeOperationView; + + WriteBundles(PCollectionView> writeOperationView) { + this.writeOperationView = writeOperationView; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + // Lazily initialize the Writer + if (writer == null) { + WriteOperation writeOperation = c.sideInput(writeOperationView); + LOG.info("Opening writer for write operation {}", writeOperation); + writer = writeOperation.createWriter(c.getPipelineOptions()); + writer.open(UUID.randomUUID().toString()); + LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); + } + try { + writer.write(c.element()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + } + + @Override + public void finishBundle(Context c) throws Exception { + if (writer != null) { + WriteT result = writer.close(); + c.output(result); + // Reset state in case of reuse. + writer = null; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Write.Bound.this.populateDisplayData(builder); + } + } + + /** + * Like {@link WriteBundles}, but where the elements for each shard have been collected into + * a single iterable. + * + * @see WriteBundles + */ + private class WriteShardedBundles extends DoFn>, WriteT> { + private final PCollectionView> writeOperationView; + + WriteShardedBundles(PCollectionView> writeOperationView) { + this.writeOperationView = writeOperationView; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + // In a sharded write, single input element represents one shard. We can open and close + // the writer in each call to processElement. + WriteOperation writeOperation = c.sideInput(writeOperationView); + LOG.info("Opening writer for write operation {}", writeOperation); + Writer writer = writeOperation.createWriter(c.getPipelineOptions()); + writer.open(UUID.randomUUID().toString()); + LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); + + try { + for (T t : c.element().getValue()) { + writer.write(t); + } + } catch (Exception e) { + try { + writer.close(); + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + + // Close the writer; if this throws let the error propagate. + WriteT result = writer.close(); + c.output(result); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Write.Bound.this.populateDisplayData(builder); + } + } + + private static class ApplyShardingKey implements SerializableFunction { + private final int numShards; + private int shardNumber; + + ApplyShardingKey(int numShards) { + this.numShards = numShards; + shardNumber = -1; + } + + @Override + public Integer apply(T input) { + if (shardNumber == -1) { + // We want to desynchronize the first record sharding key for each instance of + // ApplyShardingKey, so records in a small PCollection will be statistically balanced. + shardNumber = ThreadLocalRandom.current().nextInt(numShards); + } else { + shardNumber = (shardNumber + 1) % numShards; + } + return shardNumber; + } + } + /** * A write is performed as sequence of three {@link ParDo}'s. * @@ -142,7 +326,7 @@ private PDone createWrite( // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize // the sink. PCollection> operationCollection = - p.apply(Create.>of(writeOperation).withCoder(operationCoder)); + p.apply(Create.of(writeOperation).withCoder(operationCoder)); // Initialize the resource in a do-once ParDo on the WriteOperation. operationCollection = operationCollection @@ -165,57 +349,32 @@ public void processElement(ProcessContext c) throws Exception { final PCollectionView> writeOperationView = operationCollection.apply(View.>asSingleton()); + // Re-window the data into the global window and remove any existing triggers. + PCollection inputInGlobalWindow = + input.apply( + Window.into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation // as a side input) and collect the results of the writes in a PCollection. // There is a dependency between this ParDo and the first (the WriteOperation PCollection // as a side input), so this will happen after the initial ParDo. - PCollection results = input - .apply(Window.into(new GlobalWindows())) - .apply("WriteBundles", ParDo.of(new DoFn() { - // Writer that will write the records in this bundle. Lazily - // initialized in processElement. - private Writer writer = null; - - @Override - public void processElement(ProcessContext c) throws Exception { - // Lazily initialize the Writer - if (writer == null) { - WriteOperation writeOperation = c.sideInput(writeOperationView); - LOG.info("Opening writer for write operation {}", writeOperation); - writer = writeOperation.createWriter(c.getPipelineOptions()); - writer.open(UUID.randomUUID().toString()); - LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); - } - try { - writer.write(c.element()); - } catch (Exception e) { - // Discard write result and close the write. - try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused - } catch (Exception closeException) { - // Do not mask the exception that caused the write to fail. - } - throw e; - } - } - - @Override - public void finishBundle(Context c) throws Exception { - if (writer != null) { - WriteT result = writer.close(); - c.output(result); - // Reset state in case of reuse - writer = null; - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - Write.Bound.this.populateDisplayData(builder); - } - }).withSideInputs(writeOperationView)) - .setCoder(writeOperation.getWriterResultCoder()); + PCollection results; + if (getNumShards() <= 0) { + results = inputInGlobalWindow + .apply("WriteBundles", + ParDo.of(new WriteBundles<>(writeOperationView)) + .withSideInputs(writeOperationView)); + } else { + results = inputInGlobalWindow + .apply("ApplyShardLabel", WithKeys.of(new ApplyShardingKey(getNumShards()))) + .apply("GroupIntoShards", GroupByKey.create()) + .apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles<>(writeOperationView)) + .withSideInputs(writeOperationView)); + } + results.setCoder(writeOperation.getWriterResultCoder()); final PCollectionView> resultsView = results.apply(View.asIterable()); @@ -231,17 +390,26 @@ public void populateDisplayData(DisplayData.Builder builder) { @Override public void processElement(ProcessContext c) throws Exception { WriteOperation writeOperation = c.element(); - LOG.info("Finalizing write operation {}", writeOperation); - Iterable results = c.sideInput(resultsView); - LOG.debug("Side input initialized to finalize write operation {}", writeOperation); - if (!results.iterator().hasNext()) { - LOG.info("No write results, creating a single empty output."); - Writer writer = writeOperation.createWriter(c.getPipelineOptions()); - writer.open(UUID.randomUUID().toString()); - WriteT emptyWrite = writer.close(); - results = Collections.singleton(emptyWrite); - LOG.debug("Done creating a single empty output."); + LOG.info("Finalizing write operation {}.", writeOperation); + List results = Lists.newArrayList(c.sideInput(resultsView)); + LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); + + // We must always output at least 1 shard, and honor user-specified numShards if set. + int minShardsNeeded = Math.max(1, getNumShards()); + int extraShardsNeeded = minShardsNeeded - results.size(); + if (extraShardsNeeded > 0) { + LOG.info( + "Creating {} empty output shards in addition to {} written for a total of {}.", + extraShardsNeeded, results.size(), minShardsNeeded); + for (int i = 0; i < extraShardsNeeded; ++i) { + Writer writer = writeOperation.createWriter(c.getPipelineOptions()); + writer.open(UUID.randomUUID().toString()); + WriteT emptyWrite = writer.close(); + results.add(emptyWrite); + } + LOG.debug("Done creating extra shards."); } + writeOperation.finalize(results, c.getPipelineOptions()); LOG.debug("Done finalizing write operation {}", writeOperation); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index abda3a555343..abe255de4c83 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -19,9 +19,11 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; + import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -52,7 +54,9 @@ import org.apache.beam.sdk.values.PCollection; import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; +import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -62,12 +66,14 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; /** * Tests for the Write PTransform. @@ -76,6 +82,10 @@ public class WriteTest { // Static store that can be accessed within the writer private static List sinkContents = new ArrayList<>(); + // Static count of output shards + private static AtomicInteger numShards = new AtomicInteger(0); + // Static counts of the number of records per shard. + private static List recordsPerShard = new ArrayList<>(); private static final MapElements IDENTITY_MAP = MapElements.via(new SimpleFunction() { @@ -128,6 +138,71 @@ public void testWrite() { runWrite(inputs, IDENTITY_MAP); } + /** + * Test that Write with an empty input still produces one shard. + */ + @Test + @Category(NeedsRunner.class) + public void testEmptyWrite() { + runWrite(Collections.emptyList(), IDENTITY_MAP); + // Note we did not request a sharded write, so runWrite will not validate the number of shards. + assertEquals(1, numShards.intValue()); + } + + /** + * Test that Write with a configured number of shards produces the desired number of shards even + * when there are many elements. + */ + @Test + @Category(NeedsRunner.class) + public void testShardedWrite() { + runShardedWrite( + Arrays.asList("one", "two", "three", "four", "five", "six"), + IDENTITY_MAP, + Optional.of(1)); + } + + /** + * Test that Write with a configured number of shards produces the desired number of shards even + * when there are too few elements. + */ + @Test + @Category(NeedsRunner.class) + public void testExpandShardedWrite() { + runShardedWrite( + Arrays.asList("one", "two", "three", "four", "five", "six"), + IDENTITY_MAP, + Optional.of(20)); + } + + /** + * Tests that a Write can balance many elements. + */ + @Test + @Category(NeedsRunner.class) + public void testShardedWriteBalanced() { + int numElements = 1000; + List inputs = new ArrayList<>(numElements); + for (int i = 0; i < numElements; ++i) { + inputs.add(String.format("elt%04d", i)); + } + + runShardedWrite( + inputs, + new WindowAndReshuffle<>( + Window.into(Sessions.withGapDuration(Duration.millis(1)))), + Optional.of(10)); + + // Check that both the min and max number of results per shard are close to the expected. + int min = Integer.MAX_VALUE; + int max = Integer.MIN_VALUE; + for (Integer i : recordsPerShard) { + min = Math.min(min, i); + max = Math.max(max, i); + } + assertThat((double) min, Matchers.greaterThanOrEqualTo(max * 0.9)); + } + /** * Test a Write transform with an empty PCollection. */ @@ -147,7 +222,7 @@ public void testWriteWindowed() { List inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"); runWrite( - inputs, new WindowAndReshuffle(Window.into(FixedWindows.of(Duration.millis(2))))); + inputs, new WindowAndReshuffle<>(Window.into(FixedWindows.of(Duration.millis(2))))); } /** @@ -161,7 +236,22 @@ public void testWriteWithSessions() { runWrite( inputs, - new WindowAndReshuffle(Window.into(Sessions.withGapDuration(Duration.millis(1))))); + new WindowAndReshuffle<>( + Window.into(Sessions.withGapDuration(Duration.millis(1))))); + } + + @Test + public void testBuildWrite() { + Sink sink = new TestSink() {}; + Write.Bound write = Write.to(sink).withNumShards(3); + assertEquals(3, write.getNumShards()); + assertThat(write.getSink(), is(sink)); + + Write.Bound write2 = write.withNumShards(7); + assertEquals(7, write2.getNumShards()); + assertThat(write2.getSink(), is(sink)); + // original unchanged + assertEquals(3, write.getNumShards()); } @Test @@ -179,8 +269,6 @@ public void populateDisplayData(DisplayData.Builder builder) { assertThat(displayData, includesDisplayDataFrom(sink)); } - - /** * Performs a Write transform and verifies the Write transform calls the appropriate methods on * a test sink in the correct order, as well as verifies that the elements of a PCollection are @@ -188,6 +276,18 @@ public void populateDisplayData(DisplayData.Builder builder) { */ private static void runWrite( List inputs, PTransform, PCollection> transform) { + runShardedWrite(inputs, transform, Optional.absent()); + } + + /** + * Performs a Write transform with the desired number of shards. Verifies the Write transform + * calls the appropriate methods on a test sink in the correct order, as well as verifies that + * the elements of a PCollection are written to the sink. If numConfiguredShards is not null, also + * verifies that the output number of shards is correct. + */ + private static void runShardedWrite( + List inputs, PTransform, PCollection> transform, + Optional numConfiguredShards) { // Flag to validate that the pipeline options are passed to the Sink WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); options.setTestFlag("test_value"); @@ -195,6 +295,10 @@ private static void runWrite( // Clear the sink's contents. sinkContents.clear(); + // Reset the number of shards produced. + numShards.set(0); + // Reset the number of records in each shard. + recordsPerShard.clear(); // Prepare timestamps for the elements. List timestamps = new ArrayList<>(); @@ -203,13 +307,21 @@ private static void runWrite( } TestSink sink = new TestSink(); + Write.Bound write = Write.to(sink); + if (numConfiguredShards.isPresent()) { + write = write.withNumShards(numConfiguredShards.get()); + } p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) .apply(transform) - .apply(Write.to(sink)); + .apply(write); p.run(); assertThat(sinkContents, containsInAnyOrder(inputs.toArray())); assertTrue(sink.hasCorrectState()); + if (numConfiguredShards.isPresent()) { + assertEquals(numConfiguredShards.get().intValue(), numShards.intValue()); + assertEquals(numConfiguredShards.get().intValue(), recordsPerShard.size()); + } } // Test sink and associated write operation and writer. TestSink, TestWriteOperation, and @@ -246,10 +358,7 @@ private boolean hasCorrectState() { */ @Override public boolean equals(Object other) { - if (!(other instanceof TestSink)) { - return false; - } - return true; + return (other instanceof TestSink); } @Override @@ -314,6 +423,7 @@ public void finalize(Iterable bundleResults, PipelineOptions o idSet.add(result.uId); // Add the elements that were written to the sink's contents. sinkContents.addAll(result.elementsWritten); + recordsPerShard.add(result.elementsWritten.size()); } // Each result came from a unique id. assertEquals(resultCount, idSet.size()); @@ -398,6 +508,7 @@ public TestSinkWriteOperation getWriteOperation() { @Override public void open(String uId) throws Exception { + numShards.incrementAndGet(); this.uId = uId; assertEquals(State.INITIAL, state); state = State.OPENED; @@ -421,10 +532,9 @@ public TestWriterResult close() throws Exception { /** * Options for test, exposed for PipelineOptionsFactory. */ - public static interface WriteOptions extends TestPipelineOptions { + public interface WriteOptions extends TestPipelineOptions { @Description("Test flag and value") String getTestFlag(); - void setTestFlag(String value); } } From 3d5a931bb1ab7ba36a6e4d6dd84836240ee422ad Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Tue, 28 Jun 2016 09:24:01 -0700 Subject: [PATCH 2/2] fixup! Write: add support for setting a fixed number of shards --- .../main/java/org/apache/beam/sdk/io/Write.java | 8 +++++--- .../java/org/apache/beam/sdk/io/WriteTest.java | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index dc17a4c03d71..c48933b27055 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -115,9 +115,11 @@ public PDone apply(PCollection input) { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder - .add(DisplayData.item("sink", sink.getClass()) - .withLabel("Write Sink")) - .include(sink); + .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink")) + .include(sink) + .addIfNotDefault( + DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"), + 0); } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index abe255de4c83..56643f224783 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -269,6 +269,21 @@ public void populateDisplayData(DisplayData.Builder builder) { assertThat(displayData, includesDisplayDataFrom(sink)); } + @Test + public void testShardedDisplayData() { + TestSink sink = new TestSink() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + Write.Bound write = Write.to(sink).withNumShards(1); + DisplayData displayData = DisplayData.from(write); + assertThat(displayData, hasDisplayItem("sink", sink.getClass())); + assertThat(displayData, includesDisplayDataFrom(sink)); + assertThat(displayData, hasDisplayItem("numShards", 1)); + } + /** * Performs a Write transform and verifies the Write transform calls the appropriate methods on * a test sink in the correct order, as well as verifies that the elements of a PCollection are