Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-59] Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems #2779

Closed
wants to merge 2 commits into from

Conversation

dhalperi
Copy link
Contributor

@dhalperi dhalperi commented Apr 29, 2017

This converts FileBasedSink from IOChannelFactory to FileSystems, with
fallout changes on all existing Transforms that use WriteFiles.

We preserve the existing semantics of most transforms, simply adding the
ability for users to provide ResourceId in addition to String when
setting the outputPrefix.

Other changes:

  • Make DefaultFilenamePolicy its own top-level class and move
    IOChannelUtils#constructName into it. This the default FilenamePolicy
    used by FilebasedSource.

  • Rethink FilenamePolicy as a function from ResourceId (base directory)
    to ResourceId (output file), moving the base directory into the
    context. This way, FilenamePolicy logic is truly independent from the
    base directory. Using ResourceId#resolve, a filename policy can add
    multiple path components, say, base/YYYY/MM/DD/file.txt, in a
    fileystem independent way.

    (Also add an optional extension parameter to the function, enabling an
    owning transform to pass in the suffix from a separately-configured
    compression factory or similar.)


TODO:

  • I cleaned up TextIO and AvroIO, but XmlIO and TFRecordIO need more.
  • Review test coverage.
  • REALLY review testing and javadoc.

But getting this out to be able to look at the comprehensive diff.

CC: @davorbonaci @lukecwik @vikkyrk @jkff @reuvenlax @kennknowles

.matches(appliedWrite(withCustomSharding)),
is(false));
}
// @Test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhalperi todo

@kennknowles
Copy link
Member

This seems like a bunch of different changes in one commit. Can you split?

@dhalperi
Copy link
Contributor Author

dhalperi commented Apr 29, 2017 via email

// PTransformMatchers.writeWithRunnerDeterminedSharding()
// .matches(appliedWrite(withCustomSharding)),
// is(false));
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping when tests are done, and I will come back to this file

// public FileBasedWriteOperation<Object> createWriteOperation() {
// throw new IllegalArgumentException("Should not be used");
// }
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

* {@link ResourceId#getFilename() filename} of the output resource will be used to set the file
* prefix. Files are additionally named by a shard identifier (see
* {@link Bound#withNumShards(int)}) and can be configured with a common suffix using
* {@link Bound#withSuffix(String)}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of this (e.g. withSuffix) applies only to DefaultFilenamePolicy

final boolean windowedWrites;
FileBasedSink.FilenamePolicy filenamePolicy;
@Nullable final FileBasedSink.FilenamePolicy filenamePolicy;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Nullable?

// /some/dir as the output directory and file- as the prefix.
//
// But if this fails, say for /some/dir/ , fallback to treating it as a directory.
return to(FileSystems.matchNewResource(outputPrefix, false /* isDirectory */));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT FileSystems.matchNewResource does not throw

@@ -694,11 +614,10 @@ private Read() {}
*/
public <X> Bound<X> withSchema(Class<X> type) {
return new Bound<>(
name,
filenamePrefix,
outputPrefix,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to consider AutoValue (I believe jkff already has that coded up). Right now this feels a bit error prone, especially with multiple string arguments that are easy to mix up

@Override
public ResourceId apply(ResourceId input) {
return input.getCurrentDirectory();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe belongs somewhere more common? Seems like it might be useful elsewhere (e.g. TextIO)

.withLabel("Filename Pattern"));
}

private static class ExtractFilename implements SerializableFunction<ResourceId, String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as ExtractDirectory - I feel like this could be somewhere more common

@@ -297,19 +253,19 @@ public int getNumShards() {
* well as sharding information. The policy must return unique and consistent filenames
* for different windows and panes.
*/
public abstract String windowedFilename(WindowedContext c);
public abstract ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext c, String extension);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain extension?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this can't be in WindowedContext? Where does extension come from?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, maybe this better belongs in DefaultFilenamePolicy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FileBasedSink already has to know the CompressionFactory to open the channel. It doesn't make sense to make every policy support passing in the compression factory just to make it know about the extension. This is contextual information, in the sense that every sink needs it and it's not valid across different applications of the policy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @kennknowles is deleting the contexts here; this PR simply makes that not-worse.

// The intent of the code is to have a consistent value of tempDirectory across
// all workers, which wouldn't happen if now() was called inline.
Instant now = Instant.now();
private final String timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP);
private final Long tempId = TEMP_COUNT.getAndIncrement();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume this is to handle the case of multiple FileBasedSinks that all get evaluated within the same millisecond at graph-construction time. If so, please add a comment to that effect.

// writerResults won't contain destination filenames, so we dynamically generate them here.
if (files.size() > 0) {
checkArgument(outputFilenames.isEmpty());
// Sort files for idempotence.
files = Ordering.natural().sortedCopy(files);
FilenamePolicy filenamePolicy = getSink().fileNamePolicy;
files = Ordering.usingToString().sortedCopy(files);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to inject a comparator here. People may in the future change toString without realizing that we depend on it for semantics

Copy link
Contributor Author

@dhalperi dhalperi Apr 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResourceId#toString has documented semantics, so this is not a valid concern.

@@ -739,7 +614,7 @@ protected final void removeTemporaryFiles(Set<String> knownFiles,
* Provides a coder for {@link FileBasedSink.FileResult}.
*/
public final Coder<FileResult> getFileResultCoder() {
return FileResultCoder.of();
return SerializableCoder.of(FileResult.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this back? FileResultCoder was explicitly added, because SerializableCoder is somewhat broken in streaming.

/**
* A coder for FileResult objects.
*/
public static final class FileResultCoder extends CustomCoder<FileResult> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you delete this because of ResourceId? This will break streaming update in a fundamental way. Compatibility checks will succeed (as the coder hasn't changed), but Java may fail to deserialize objects after update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find any Beam documentation that mentions update compatibility. Can you link the spec? I'm not sure what guiding principles you're using here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question!

"Update" is a feature of the Dataflow runner, that actually predates Beam. It allows updating a running streaming pipeline with new code, even allowing certain changes to the graph structure. This has not yet been proposed as part of the Beam model, though it could be and should be.

While update is not yet formally a part of Beam, we should endeavor not to gratuitously break runner features - especially features that are widely used. Similarly Flink supports resuming a pipeline from a savepoint, and we should try not to break that.

In this particular case "update compatibility" (which means ensuring that a new graph structure can resume from an old graph) is not the issue. The issue is that the Java Serializable structure for objects is extremely prone to changing. The user might make simple code changes that do not at all change the graph structure of the pipeline, yet will change the serialized format of the object. This means that the new pipeline will start crashing at runtime, because it will no longer be able to deserialize it's own stored data.

Supporting undocumented runner features is not sustainable. We really need to make sure all such things are explicitly captured in the Beam model.

private final String filename;
private final String destinationFilename;
public static final class FileResult implements Serializable {
private final ResourceId filename;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we resolve these into Strings before creating the FileResult? That way we can keep the coder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternate idea - add a getCoder() method to ResourceId that subclasses must implement.

Copy link
Contributor Author

@dhalperi dhalperi Apr 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResourceId does not have a coder as the type of resource is not known – and not knowable! – at construction time.

@dhalperi
Copy link
Contributor Author

dhalperi commented Apr 30, 2017

Thanks for initial comments, but as I said in the PR desc this is just CC: folks to let them know where things are at.

dhalperi added a commit to dhalperi/beam that referenced this pull request Apr 30, 2017
@dhalperi dhalperi force-pushed the convert-file-based-sink branch 3 times, most recently from 164bf51 to 435b21a Compare April 30, 2017 18:53
dhalperi added a commit to dhalperi/beam that referenced this pull request Apr 30, 2017
@dhalperi dhalperi force-pushed the convert-file-based-sink branch 2 times, most recently from a76c223 to f4ab836 Compare April 30, 2017 23:01
dhalperi added a commit to dhalperi/beam that referenced this pull request Apr 30, 2017
asfgit pushed a commit that referenced this pull request May 1, 2017
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.3%) to 69.812% when pulling 93e2898 on dhalperi:convert-file-based-sink into 1197bef on apache:master.

@dhalperi
Copy link
Contributor Author

dhalperi commented May 1, 2017

retest this please

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.04%) to 70.349% when pulling 0f9db04 on dhalperi:convert-file-based-sink into cd813fb on apache:master.

@dhalperi
Copy link
Contributor Author

dhalperi commented May 2, 2017

Restored file result coder, rebased on top of AvroIO changes, and tests are green locally.

I'm still reviewing test coverage, but this is ready for a bigger look.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 441b9d8 on dhalperi:convert-file-based-sink into ** on apache:master**.

@coveralls
Copy link

Coverage Status

Coverage decreased (-3.4%) to 66.499% when pulling 441b9d8 on dhalperi:convert-file-based-sink into 4c3174b on apache:master.

@@ -297,19 +253,19 @@ public int getNumShards() {
* well as sharding information. The policy must return unique and consistent filenames
* for different windows and panes.
*/
public abstract String windowedFilename(WindowedContext c);
public abstract ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext c, String extension);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this can't be in WindowedContext? Where does extension come from?


private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a generic ResourceIdHelpers class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not currently in the business of providing users with reusable utilities. We can certainly do that in follow-up work if we want.

@@ -297,19 +253,19 @@ public int getNumShards() {
* well as sharding information. The policy must return unique and consistent filenames
* for different windows and panes.
*/
public abstract String windowedFilename(WindowedContext c);
public abstract ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext c, String extension);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, maybe this better belongs in DefaultFilenamePolicy

protected final void copyToOutputFiles(Map<String, String> filenames,
PipelineOptions options)
@VisibleForTesting
final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/filenames/resourceIds/

return FileResultCoder.of();
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we come up with a solution forFileResultCoder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -173,9 +172,9 @@

private static <T> Write.Builder<T> defaultWriteBuilder() {
return new AutoValue_AvroIO_Write.Builder<T>()
.setFilenameSuffix("")
.setFilenameSuffix(null)
.setShardTemplate(null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this to null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we know whether they're set.

/**
* Forces a single file as output.
* Forces a single file as output. This option is only compatible with unwindowed writes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? I think it's compatible with windowed writes.

@coveralls
Copy link

Coverage Status

Coverage decreased (-3.4%) to 66.496% when pulling 81c0db4 on dhalperi:convert-file-based-sink into 4c3174b on apache:master.

@dhalperi dhalperi force-pushed the convert-file-based-sink branch 3 times, most recently from fcda6ab to c8c451b Compare May 3, 2017 01:02
@coveralls
Copy link

Coverage Status

Changes Unknown when pulling c8c451b on dhalperi:convert-file-based-sink into ** on apache:master**.

@dhalperi
Copy link
Contributor Author

dhalperi commented May 3, 2017

PTAL @jkff

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. All my comments are minor enough that it's fine to self-merge after addressing them.

* the filename. The {@link WindowedContext} object gives access to the window and pane, as
* well as sharding information. The policy must return unique and consistent filenames
* the file {@link ResourceId resource} to be created given the base output directory and an
* optional extension from {@link FileBasedSink} configuration (e.g., {@link CompressionType}).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what sense is the extension optional: can it be null? if yes, mark nullable; if no, clarify

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed from optional to (possibly empty).

* @return The base filename for all output files.
* When a sink has not requested windowed or triggered output, this method will be invoked to
* return the file {@link ResourceId resource} to be created given the base output directory and
* an optional extension applied by additional {@link FileBasedSink} configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same about the extension

* <p>The {@link Context} object only provides sharding information, which is used by the policy
* to generate unique and consistent filenames.
*
* <p>Expected to be {@code null} when the {@link Context} has {@link Context#getNumShards()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean this is expected to return null? Is there any symmetric expectation on windowedFilename?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted and solved in FileBasedSink itself. This was a wart from Reuven's earlier work; now we'll simply not call the function to compute filename if numShards <= 0.


public FilenamePolicy getFileNamePolicy() {
return fileNamePolicy;
public FilenamePolicy getFilenamePolicy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

srcFiles.add(srcDestPair.getKey());
dstFiles.add(srcDestPair.getValue());
}
FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was IGNORE_MISSING_FILES also the semantics of channelFactory.copy? Either way worth a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it was; added a comment. Would like to clean up the logic here however to be more explicit and better support non-copying filesystems like HDFS.

*/
public Write to(String filenamePrefix) {
try {
ResourceId fileResource =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as in Avro.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
*
* <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: I'm generally not thrilled about this much javadoc duplication - it tends to rot and get inconsistent. Here and in other IOs, I think it'd be nice to have only one of the overloads be "authoritative" and have others refer to it like Like {@link #thatOtherOverload()}, but ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@Override
public String windowedFilename(WindowedContext input) {
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext input, String extension) {
String filename = outputFilePrefix + "-" + input.getWindow().toString() + "-"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this and the unwindowed one could benefit from some String.format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Test
@Category(ValidatesRunner.class)
@Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
public void testPrimitiveWriteDisplayData() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this one deleted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It used to be a test that display data is preserved by runners, exploiting the fact that p.apply() is intercepted by the runner. It never worked (hence it was @Ignored) and now with runner API the entire premise of the test is flawed.

return toBuilder().setFilenamePrefix(filenamePrefix).build();
public Write<T> to(String filenamePrefix) {
ResourceId resourceId;
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as Avro.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done with helper.

Copy link
Contributor Author

@dhalperi dhalperi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

/**
* A default {@link FilenamePolicy} for unwindowed files. This policy takes four parameters:
*
* <ul>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

getFilenamePrefix() != null,
"Unable to populate DisplayData for invalid AvroIO.Write (unset output prefix).");
String outputPrefixString = null;
if (getFilenamePrefix().isAccessible()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. I know @swegner made an explicit choice here, though. Will look into again after.

* a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
* overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
*/
public Write<T> to(String outputPrefix) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. to(VP) and toResource(VP) since they need to have different names in type-erased Java or they're treated as the same function.

*/
public Write<T> to(String outputPrefix) {
try {
ResourceId fileResource =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done and pulled this logic into a single helper in FileBasedSink.

*
* <p>If provided, the suffix will be used; otherwise the files will have an empty suffix.
*/
public static DefaultFilenamePolicy constructUsingStandardParameters(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*/
public Write to(String filenamePrefix) {
try {
ResourceId fileResource =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
*
* <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@Override
public String windowedFilename(WindowedContext input) {
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext input, String extension) {
String filename = outputFilePrefix + "-" + input.getWindow().toString() + "-"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Test
@Category(ValidatesRunner.class)
@Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
public void testPrimitiveWriteDisplayData() throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It used to be a test that display data is preserved by runners, exploiting the fact that p.apply() is intercepted by the runner. It never worked (hence it was @Ignored) and now with runner API the entire premise of the test is flawed.

return toBuilder().setFilenamePrefix(filenamePrefix).build();
public Write<T> to(String filenamePrefix) {
ResourceId resourceId;
try {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done with helper.

This converts FileBasedSink from IOChannelFactory to FileSystems, with
fallout changes on all existing Transforms that use WriteFiles.

We preserve the existing semantics of most transforms, simply adding the
ability for users to provide ResourceId in addition to String when
setting the outputPrefix.

Other changes:

* Rethink FilenamePolicy as a function from ResourceId (base directory)
  to ResourceId (output file), moving the base directory into the
  context. This way, FilenamePolicy logic is truly independent from the
  base directory. Using ResourceId#resolve, a filename policy can add
  multiple path components, say, base/YYYY/MM/DD/file.txt, in a
  fileystem independent way.

  (Also add an optional extension parameter to the function, enabling an
  owning transform to pass in the suffix from a separately-configured
  compression factory or similar.)

* Make DefaultFilenamePolicy its own top-level class and move
  IOChannelUtils#constructName into it. This the default FilenamePolicy
  used by FileBasedSink.
@coveralls
Copy link

Coverage Status

Coverage increased (+0.09%) to 69.964% when pulling e190c7d on dhalperi:convert-file-based-sink into 2d22485 on apache:master.

@asfgit asfgit closed this in 1bc50d6 May 4, 2017
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.08%) to 70.554% when pulling 7aaa59e on dhalperi:convert-file-based-sink into b4bafd0 on apache:master.

@dhalperi dhalperi deleted the convert-file-based-sink branch May 4, 2017 17:36
peihe pushed a commit to peihe/incubator-beam that referenced this pull request Jul 21, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants