Skip to content

Commit

Permalink
Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems
Browse files Browse the repository at this point in the history
This converts FileBasedSink from IOChannelFactory to FileSystems, with
fallout changes on all existing Transforms that use WriteFiles.

We preserve the existing semantics of most transforms, simply adding the
ability for users to provide ResourceId in addition to String when
setting the outputPrefix.

Other changes:

* Make DefaultFilenamePolicy its own top-level class and move
  IOChannelUtils#constructName into it. This the default FilenamePolicy
  used by FilebasedSource.

* Rethink FilenamePolicy as a function from ResourceId (base directory)
  to ResourceId (output file), moving the base directory into the
  context. This way, FilenamePolicy logic is truly independent from the
  base directory. Using ResourceId#resolve, a filename policy can add
  multiple path components, say, base/YYYY/MM/DD/file.txt, in a
  fileystem independent way.

  (Also add an optional extension parameter to the function, enabling an
  owning transform to pass in the suffix from a separately-configured
  compression factory or similar.)

* Remove some old logic disallowing certain specific patterns of
  filenames that dates back to Cloud Dataflow SDKs on no-longer-used
  implementations.
  • Loading branch information
dhalperi committed Apr 30, 2017
1 parent d07b4c3 commit 164bf51
Show file tree
Hide file tree
Showing 27 changed files with 1,286 additions and 1,378 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down Expand Up @@ -49,7 +49,8 @@ public WriteOneFilePerWindow(String filenamePrefix) {
public PDone expand(PCollection<String> input) {
return input.apply(
TextIO.Write
.to(new PerWindowFiles(filenamePrefix))
.to(filenamePrefix)
.withFilenamePolicy(new PerWindowFiles())
.withWindowedWrites()
.withNumShards(3));
}
Expand All @@ -62,32 +63,24 @@ public PDone expand(PCollection<String> input) {
*/
public static class PerWindowFiles extends FilenamePolicy {

private final String output;

public PerWindowFiles(String output) {
this.output = output;
}

@Override
public ValueProvider<String> getBaseOutputFilenameProvider() {
return StaticValueProvider.of(output);
}

public String filenamePrefixForWindow(IntervalWindow window) {
return String.format(
"%s-%s-%s", output, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
public String filenamePrefixForWindow(IntervalWindow window) {
return String.format("%s-%s", FORMATTER.print(window.start()), FORMATTER.print(window.end()));
}

@Override
public String windowedFilename(WindowedContext context) {
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
return String.format(
"%s-%s-of-%s",
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards());
String filename = String.format(
"%s-%s-of-%s%s",
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
extension);
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}

@Override
public String unwindowedFilename(Context context) {
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public static void main(String[] args) {
p.apply(TextIO.Read.from(options.getInput()))
.apply(MapElements.via(new ParseTableRowJson()))
.apply(new ComputeTopSessions(samplingThreshold))
.apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
.apply("Write", TextIO.Write.to(options.getOutput()).withoutSharding());

p.run().waitUntilFinish();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Collections;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
Expand All @@ -41,7 +40,6 @@
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.ViewFn;
Expand Down Expand Up @@ -496,33 +494,33 @@ public void flattenWithDuplicateInputsNonFlatten() {
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
}

@Test
public void writeWithRunnerDeterminedSharding() {
WriteFiles<Integer> write =
WriteFiles.to(
new FileBasedSink<Integer>("foo", "bar") {
@Override
public FileBasedWriteOperation<Integer> createWriteOperation() {
return null;
}
});
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));

WriteFiles<Integer> withStaticSharding = write.withNumShards(3);
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withStaticSharding)),
is(false));

WriteFiles<Integer> withCustomSharding =
write.withSharding(Sum.integersGlobally().asSingletonView());
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withCustomSharding)),
is(false));
}
// @Test
// public void writeWithRunnerDeterminedSharding() {
// WriteFiles<Integer> write =
// WriteFiles.to(
// new FileBasedSink<Integer>("foo", "bar") {
// @Override
// public FileBasedWriteOperation<Integer> createWriteOperation() {
// return null;
// }
// });
// assertThat(
// PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
// is(true));
//
// WriteFiles<Integer> withStaticSharding = write.withNumShards(3);
// assertThat(
// PTransformMatchers.writeWithRunnerDeterminedSharding()
// .matches(appliedWrite(withStaticSharding)),
// is(false));
//
// WriteFiles<Integer> withCustomSharding =
// write.withSharding(Sum.integersGlobally().asSingletonView());
// assertThat(
// PTransformMatchers.writeWithRunnerDeterminedSharding()
// .matches(appliedWrite(withCustomSharding)),
// is(false));
// }

private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer> write) {
return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer>>of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;

import java.io.File;
Expand All @@ -38,13 +36,8 @@
import java.util.UUID;
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
Expand All @@ -54,9 +47,6 @@
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -119,19 +109,19 @@ public void dynamicallyReshardedWrite() throws Exception {
+ WriteWithShardingFactory.MAX_RANDOM_EXTRA_SHARDS)))));
}

@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
WriteFiles<Object> original = WriteFiles.to(new TestSink());
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));

AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
AppliedPTransform.of(
"write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);

assertThat(
factory.getReplacementTransform(originalApplication).getTransform(),
not(equalTo((Object) original)));
}
// @Test
// public void withNoShardingSpecifiedReturnsNewTransform() {
// WriteFiles<Object> original = WriteFiles.to(new TestSink());
// PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
//
// AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
// AppliedPTransform.of(
// "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
//
// assertThat(
// factory.getReplacementTransform(originalApplication).getTransform(),
// not(equalTo((Object) original)));
// }

@Test
public void keyBasedOnCountFnWithNoElements() throws Exception {
Expand Down Expand Up @@ -206,18 +196,18 @@ public void keyBasedOnCountFnManyElementsExtraShards() throws Exception {
List<Integer> shards = fnTester.processBundle((long) count);
assertThat(shards, containsInAnyOrder(13));
}

private static class TestSink extends FileBasedSink<Object> {
public TestSink() {
super("", "");
}

@Override
public void validate(PipelineOptions options) {}

@Override
public FileBasedWriteOperation<Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
}
//
// private static class TestSink extends FileBasedSink<Object> {
// public TestSink() {
// super("", "");
// }
//
// @Override
// public void validate(PipelineOptions options) {}
//
// @Override
// public FileBasedWriteOperation<Object> createWriteOperation() {
// throw new IllegalArgumentException("Should not be used");
// }
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
Expand Down Expand Up @@ -837,11 +839,11 @@ public BatchWrite(DataflowRunner runner, WriteFiles<T> transform) {

@Override
public PDone expand(PCollection<T> input) {
FileBasedSink<T> sink = transform.getSink();
if (sink.getBaseOutputFilenameProvider().isAccessible()) {
ValueProvider<ResourceId> outputDirectory =
transform.getSink().getBaseOutputDirectoryProvider();
if (outputDirectory.isAccessible()) {
PathValidator validator = runner.options.getPathValidator();
validator.validateOutputFilePrefixSupported(
sink.getBaseOutputFilenameProvider().get());
validator.validateOutputResourceSupported(outputDirectory.get());
}
return transform.expand(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.TextIO.Read;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -321,8 +322,8 @@ public interface RuntimeTestOptions extends PipelineOptions {
ValueProvider<String> getInput();
void setInput(ValueProvider<String> value);

ValueProvider<String> getOutput();
void setOutput(ValueProvider<String> value);
ValueProvider<ResourceId> getOutput();
void setOutput(ValueProvider<ResourceId> value);
}

@Test
Expand All @@ -332,7 +333,7 @@ public void testTextIOWithRuntimeParameters() throws IOException {
Pipeline p = buildDataflowPipeline(dataflowOptions);
p
.apply(TextIO.Read.from(options.getInput()).withoutValidation())
.apply(TextIO.Write.to(options.getOutput()).withoutValidation());
.apply(TextIO.Write.to(options.getOutput()));
}

/**
Expand Down

0 comments on commit 164bf51

Please sign in to comment.