Skip to content

Commit

Permalink
This closes #2447: Remove Sink in favor of FileBasedSink
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Apr 20, 2017
2 parents 33078d2 + 6a6a1a8 commit 4f8b1cc
Show file tree
Hide file tree
Showing 24 changed files with 1,258 additions and 1,225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -268,8 +268,8 @@ public static PTransformMatcher writeWithRunnerDeterminedSharding() {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
if (application.getTransform() instanceof Write) {
Write write = (Write) application.getTransform();
if (application.getTransform() instanceof WriteFiles) {
WriteFiles write = (WriteFiles) application.getTransform();
return write.getSharding() == null && write.getNumShards() == null;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -499,8 +499,8 @@ public void flattenWithDuplicateInputsNonFlatten() {

@Test
public void writeWithRunnerDeterminedSharding() {
Write<Integer> write =
Write.to(
WriteFiles<Integer> write =
WriteFiles.to(
new FileBasedSink<Integer>("foo", "bar") {
@Override
public FileBasedWriteOperation<Integer> createWriteOperation(
Expand All @@ -512,23 +512,23 @@ public FileBasedWriteOperation<Integer> createWriteOperation(
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));

Write<Integer> withStaticSharding = write.withNumShards(3);
WriteFiles<Integer> withStaticSharding = write.withNumShards(3);
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withStaticSharding)),
is(false));

Write<Integer> withCustomSharding =
WriteFiles<Integer> withCustomSharding =
write.withSharding(Sum.integersGlobally().asSingletonView());
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withCustomSharding)),
is(false));
}

private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) {
return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of(
"Write",
private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer> write) {
return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer>>of(
"WriteFiles",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>emptyMap(),
write,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Count;
Expand All @@ -43,18 +43,19 @@
import org.apache.beam.sdk.values.TupleTag;

/**
* A {@link PTransformOverrideFactory} that overrides {@link Write} {@link PTransform PTransforms}
* with an unspecified number of shards with a write with a specified number of shards. The number
* of shards is the log base 10 of the number of input records, with up to 2 additional shards.
* A {@link PTransformOverrideFactory} that overrides {@link WriteFiles}
* {@link PTransform PTransforms} with an unspecified number of shards with a write with a
* specified number of shards. The number of shards is the log base 10 of the number of input
* records, with up to 2 additional shards.
*/
class WriteWithShardingFactory<InputT>
implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write<InputT>> {
implements PTransformOverrideFactory<PCollection<InputT>, PDone, WriteFiles<InputT>> {
static final int MAX_RANDOM_EXTRA_SHARDS = 3;
@VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;

@Override
public PTransformReplacement<PCollection<InputT>, PDone> getReplacementTransform(
AppliedPTransform<PCollection<InputT>, PDone, Write<InputT>> transform) {
AppliedPTransform<PCollection<InputT>, PDone, WriteFiles<InputT>> transform) {

return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
Expand Down Expand Up @@ -108,7 +109,7 @@ public void process(ProcessContext ctxt) {

private int calculateShards(long totalRecords) {
if (totalRecords == 0) {
// Write out at least one shard, even if there is no input.
// WriteFiles out at least one shard, even if there is no input.
return 1;
}
// Windows get their own number of random extra shards. This is stored in a side input, so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
Expand Down Expand Up @@ -84,7 +84,7 @@ public void dynamicallyReshardedWrite() throws Exception {
String fileName = "resharded_write";
String outputPath = tmp.getRoot().getAbsolutePath();
String targetLocation = IOChannelUtils.resolve(outputPath, fileName);
// TextIO is implemented in terms of the Write PTransform. When sharding is not specified,
// TextIO is implemented in terms of the WriteFiles PTransform. When sharding is not specified,
// resharding should be automatically applied
p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation));

Expand Down Expand Up @@ -121,10 +121,10 @@ public void dynamicallyReshardedWrite() throws Exception {

@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
Write<Object> original = Write.to(new TestSink());
WriteFiles<Object> original = WriteFiles.to(new TestSink());
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));

AppliedPTransform<PCollection<Object>, PDone, Write<Object>> originalApplication =
AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
AppliedPTransform.of(
"write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);

Expand Down Expand Up @@ -207,12 +207,16 @@ public void keyBasedOnCountFnManyElementsExtraShards() throws Exception {
assertThat(shards, containsInAnyOrder(13));
}

private static class TestSink extends Sink<Object> {
private static class TestSink extends FileBasedSink<Object> {
public TestSink() {
super("", "");
}

@Override
public void validate(PipelineOptions options) {}

@Override
public WriteOperation<Object, ?> createWriteOperation(PipelineOptions options) {
public FileBasedWriteOperation<Object> createWriteOperation(PipelineOptions options) {
throw new IllegalArgumentException("Should not be used");
}
}
Expand Down

This file was deleted.

Loading

0 comments on commit 4f8b1cc

Please sign in to comment.