Skip to content

Commit

Permalink
Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems
Browse files Browse the repository at this point in the history
This converts FileBasedSink from IOChannelFactory to FileSystems, with
fallout changes on all existing Transforms that use WriteFiles.

We preserve the existing semantics of most transforms, simply adding the
ability for users to provide ResourceId in addition to String when
setting the outputPrefix.

Other changes:

* Make DefaultFilenamePolicy its own top-level class and move
  IOChannelUtils#constructName into it. This the default FilenamePolicy
  used by FilebasedSource.

* Rethink FilenamePolicy as a function from ResourceId (base directory)
  to ResourceId (output file), moving the base directory into the
  context. This way, FilenamePolicy logic is truly independent from the
  base directory. Using ResourceId#resolve, a filename policy can add
  multiple path components, say, base/YYYY/MM/DD/file.txt, in a
  fileystem independent way.

  (Also add an optional extension parameter to the function, enabling an
  owning transform to pass in the suffix from a separately-configured
  compression factory or similar.)
  • Loading branch information
dhalperi committed May 2, 2017
1 parent f9243e1 commit aa267d4
Show file tree
Hide file tree
Showing 29 changed files with 1,248 additions and 1,131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.beam.examples.common;

import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down Expand Up @@ -49,7 +50,8 @@ public WriteOneFilePerWindow(String filenamePrefix) {
public PDone expand(PCollection<String> input) {
return input.apply(
TextIO.Write
.to(new PerWindowFiles(filenamePrefix))
.to(filenamePrefix)
.withFilenamePolicy(new PerWindowFiles(filenamePrefix))
.withWindowedWrites()
.withNumShards(3));
}
Expand All @@ -62,32 +64,38 @@ public PDone expand(PCollection<String> input) {
*/
public static class PerWindowFiles extends FilenamePolicy {

private final String output;
private final String filenamePrefix;

public PerWindowFiles(String output) {
this.output = output;
public PerWindowFiles(String filenamePrefix) {
String filePrefixOnly;
try {
ResourceId file = FileSystems.matchNewResource(filenamePrefix, false /* isDirectory */);
filePrefixOnly = file.getFilename();
} catch (Exception e) {
filePrefixOnly = "";
}
this.filenamePrefix = filePrefixOnly;
}

@Override
public ValueProvider<String> getBaseOutputFilenameProvider() {
return StaticValueProvider.of(output);
}

public String filenamePrefixForWindow(IntervalWindow window) {
return String.format(
"%s-%s-%s", output, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
public String filenamePrefixForWindow(IntervalWindow window) {
return String.format("%s-%s-%s",
filenamePrefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
}

@Override
public String windowedFilename(WindowedContext context) {
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
return String.format(
"%s-%s-of-%s",
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards());
String filename = String.format(
"%s-%s-of-%s%s",
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
extension);
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}

@Override
public String unwindowedFilename(Context context) {
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public static void main(String[] args) {
p.apply(TextIO.Read.from(options.getInput()))
.apply(MapElements.via(new ParseTableRowJson()))
.apply(new ComputeTopSessions(samplingThreshold))
.apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
.apply("Write", TextIO.Write.to(options.getOutput()).withoutSharding());

p.run().waitUntilFinish();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@
import java.util.Collections;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
Expand Down Expand Up @@ -498,9 +503,12 @@ public void flattenWithDuplicateInputsNonFlatten() {

@Test
public void writeWithRunnerDeterminedSharding() {
ResourceId outputDirectory = LocalResources.fromString("/too", true /* isDirectory */);
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
WriteFiles<Integer> write =
WriteFiles.to(
new FileBasedSink<Integer>("foo", "bar") {
new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) {
@Override
public FileBasedWriteOperation<Integer> createWriteOperation() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -121,7 +125,16 @@ public void dynamicallyReshardedWrite() throws Exception {

@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
WriteFiles<Object> original = WriteFiles.to(new TestSink());
ResourceId outputDirectory = LocalResources.fromString("/too", true /* isDirectory */);
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
WriteFiles<Object> original = WriteFiles.to(
new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
@Override
public FileBasedWriteOperation<Object> createWriteOperation() {
return null;
}
});
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));

AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
Expand Down Expand Up @@ -206,18 +219,4 @@ public void keyBasedOnCountFnManyElementsExtraShards() throws Exception {
List<Integer> shards = fnTester.processBundle((long) count);
assertThat(shards, containsInAnyOrder(13));
}

private static class TestSink extends FileBasedSink<Object> {
public TestSink() {
super("", "");
}

@Override
public void validate(PipelineOptions options) {}

@Override
public FileBasedWriteOperation<Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
Expand Down Expand Up @@ -841,11 +844,12 @@ public BatchWrite(DataflowRunner runner, WriteFiles<T> transform) {

@Override
public PDone expand(PCollection<T> input) {
FileBasedSink<T> sink = transform.getSink();
if (sink.getBaseOutputFilenameProvider().isAccessible()) {
ValueProvider<ResourceId> outputDirectory =
transform.getSink().getBaseOutputDirectoryProvider();
if (outputDirectory.isAccessible()) {
PathValidator validator = runner.options.getPathValidator();
validator.validateOutputFilePrefixSupported(
sink.getBaseOutputFilenameProvider().get());
validator.validateOutputResourceSupported(
outputDirectory.get().resolve("some-file", StandardResolveOptions.RESOLVE_FILE));
}
return transform.expand(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;

Expand Down Expand Up @@ -79,7 +78,6 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
ByteCoder.class,
DoubleCoder.class,
DurationCoder.class,
FileResultCoder.class,
FooterCoder.class,
InstantCoder.class,
IsmShardCoder.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public boolean matches(Object o) {
private Pipeline buildPipeline(DataflowPipelineOptions options) {
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
FileSystems.setDefaultConfigInWorkers(options);

// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultConfigInWorkers(options);

// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultConfigInWorkers(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.TextIO.Read;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -243,6 +244,10 @@ private DataflowPipelineOptions buildPipelineOptions() throws IOException {
options.setDataflowClient(buildMockDataflow());
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());

// Configure the FileSystem registrar to use these options.
FileSystems.setDefaultConfigInWorkers(options);

return options;
}

Expand Down Expand Up @@ -325,8 +330,8 @@ public interface RuntimeTestOptions extends PipelineOptions {
ValueProvider<String> getInput();
void setInput(ValueProvider<String> value);

ValueProvider<String> getOutput();
void setOutput(ValueProvider<String> value);
ValueProvider<ResourceId> getOutput();
void setOutput(ValueProvider<ResourceId> value);
}

@Test
Expand All @@ -336,7 +341,7 @@ public void testTextIOWithRuntimeParameters() throws IOException {
Pipeline p = buildDataflowPipeline(dataflowOptions);
p
.apply(TextIO.Read.from(options.getInput()).withoutValidation())
.apply(TextIO.Write.to(options.getOutput()).withoutValidation());
.apply(TextIO.Write.to(options.getOutput()));
}

/**
Expand Down

0 comments on commit aa267d4

Please sign in to comment.