Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-59] Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems #2779

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.beam.examples.common;

import static com.google.common.base.Verify.verifyNotNull;

import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand All @@ -47,9 +50,21 @@ public WriteOneFilePerWindow(String filenamePrefix) {

@Override
public PDone expand(PCollection<String> input) {
// filenamePrefix may contain a directory and a filename component. Pull out only the filename
// component from that path for the PerWindowFiles.
String prefix = "";
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
if (!resource.isDirectory()) {
prefix = verifyNotNull(
resource.getFilename(),
"A non-directory resource should have a non-null filename: %s",
resource);
}

return input.apply(
TextIO.write()
.to(new PerWindowFiles(filenamePrefix))
.to(resource.getCurrentDirectory())
.withFilenamePolicy(new PerWindowFiles(prefix))
.withWindowedWrites()
.withNumShards(3));
}
Expand All @@ -62,32 +77,31 @@ public PDone expand(PCollection<String> input) {
*/
public static class PerWindowFiles extends FilenamePolicy {

private final String output;
private final String prefix;

public PerWindowFiles(String output) {
this.output = output;
}

@Override
public ValueProvider<String> getBaseOutputFilenameProvider() {
return StaticValueProvider.of(output);
public PerWindowFiles(String prefix) {
this.prefix = prefix;
}

public String filenamePrefixForWindow(IntervalWindow window) {
return String.format(
"%s-%s-%s", output, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
public String filenamePrefixForWindow(IntervalWindow window) {
return String.format("%s-%s-%s",
prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
}

@Override
public String windowedFilename(WindowedContext context) {
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
return String.format(
"%s-%s-of-%s",
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards());
String filename = String.format(
"%s-%s-of-%s%s",
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
extension);
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}

@Override
public String unwindowedFilename(Context context) {
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@
import java.util.Collections;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
Expand Down Expand Up @@ -498,9 +503,12 @@ public void flattenWithDuplicateInputsNonFlatten() {

@Test
public void writeWithRunnerDeterminedSharding() {
ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
WriteFiles<Integer> write =
WriteFiles.to(
new FileBasedSink<Integer>("foo", "bar") {
new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) {
@Override
public FileBasedWriteOperation<Integer> createWriteOperation() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -121,7 +125,17 @@ public void dynamicallyReshardedWrite() throws Exception {

@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
WriteFiles<Object> original = WriteFiles.to(new TestSink());
ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
WriteFiles<Object> original = WriteFiles.to(
new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
@Override
public FileBasedWriteOperation<Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
});
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));

AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
Expand Down Expand Up @@ -206,18 +220,4 @@ public void keyBasedOnCountFnManyElementsExtraShards() throws Exception {
List<Integer> shards = fnTester.processBundle((long) count);
assertThat(shards, containsInAnyOrder(13));
}

private static class TestSink extends FileBasedSink<Object> {
public TestSink() {
super("", "");
}

@Override
public void validate(PipelineOptions options) {}

@Override
public FileBasedWriteOperation<Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
Expand Down Expand Up @@ -844,11 +847,12 @@ public BatchWrite(DataflowRunner runner, WriteFiles<T> transform) {

@Override
public PDone expand(PCollection<T> input) {
FileBasedSink<T> sink = transform.getSink();
if (sink.getBaseOutputFilenameProvider().isAccessible()) {
ValueProvider<ResourceId> outputDirectory =
transform.getSink().getBaseOutputDirectoryProvider();
if (outputDirectory.isAccessible()) {
PathValidator validator = runner.options.getPathValidator();
validator.validateOutputFilePrefixSupported(
sink.getBaseOutputFilenameProvider().get());
validator.validateOutputResourceSupported(
outputDirectory.get().resolve("some-file", StandardResolveOptions.RESOLVE_FILE));
}
return transform.expand(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ private DataflowPipelineOptions buildPipelineOptions() throws IOException {
options.setDataflowClient(buildMockDataflow());
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());

// Configure the FileSystem registrar to use these options.
FileSystems.setDefaultConfigInWorkers(options);

return options;
}

Expand Down