Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testMinimalWordCountJava8() throws Exception {
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
.withOutputType(new TypeDescriptor<String>() {}))
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
.apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix"));
}

private GcsUtil buildMockGcsUtil() throws IOException {
Expand Down
158 changes: 104 additions & 54 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,26 @@
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.MimeTypes;

import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based
Expand All @@ -66,6 +72,8 @@
* @param <T> the type of values written to the sink.
*/
public abstract class FileBasedSink<T> extends Sink<T> {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);

/**
* Base filename for final output files.
*/
Expand Down Expand Up @@ -168,11 +176,11 @@ private static String getFileExtension(String usersExtension) {
* FileBasedSinkWriter.
*
* <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary
* files using the baseTemporaryFilename that can be provided via the constructor of
* files using the tempDirectory that can be provided via the constructor of
* FileBasedWriteOperation. These temporary files will be named
* {@code {baseTemporaryFilename}-temp-{bundleId}}, where bundleId is the unique id of the bundle.
* For example, if baseTemporaryFilename is "gs://my-bucket/my_temp_output", the output for a
* bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output-temp-15723".
* {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle.
* For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a
* bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723".
*
* <p>Final output files are written to baseOutputFilename with the format
* {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles
Expand All @@ -195,8 +203,6 @@ private static String getFileExtension(String usersExtension) {
* @param <T> the type of values written to the sink.
*/
public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T, FileResult> {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriteOperation.class);

/**
* Options for handling of temporary output files.
*/
Expand All @@ -215,56 +221,65 @@ public enum TemporaryFileRetention {
*/
protected final TemporaryFileRetention temporaryFileRetention;

/**
* Base filename used for temporary output files. Default is the baseOutputFilename.
*/
protected final String baseTemporaryFilename;

/**
* Name separator for temporary files. Temporary files will be named
* {@code {baseTemporaryFilename}-temp-{bundleId}}.
*/
protected static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-";
/** Directory for temporary output files. */
protected final String tempDirectory;

/**
* Build a temporary filename using the temporary filename separator with the given prefix and
* suffix.
*/
protected static final String buildTemporaryFilename(String prefix, String suffix) {
return prefix + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + suffix;
/** Constructs a temporary file path given the temporary directory and a filename. */
protected static String buildTemporaryFilename(String tempDirectory, String filename)
throws IOException {
return IOChannelUtils.getFactory(tempDirectory).resolve(tempDirectory, filename);
}

/**
* Construct a FileBasedWriteOperation using the same base filename for both temporary and
* output files.
* Constructs a FileBasedWriteOperation using the default strategy for generating a temporary
* directory from the base output filename.
*
* <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is
* /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
*/
public FileBasedWriteOperation(FileBasedSink<T> sink) {
this(sink, sink.baseOutputFilename);
this(sink, buildTemporaryDirectoryName(sink.getBaseOutputFilename()));
}

private static String buildTemporaryDirectoryName(String baseOutputFilename) {
try {
IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename);
Path baseOutputPath = factory.toPath(baseOutputFilename);
return baseOutputPath
.resolveSibling(
"temp-beam-"
+ baseOutputPath.getFileName()
+ "-"
+ Instant.now().toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss")))
.toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Construct a FileBasedWriteOperation.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
* @param baseTemporaryFilename the base filename to be used for temporary output files.
* @param tempDirectory the base directory to be used for temporary output files.
*/
public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename) {
this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE);
public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
this(sink, tempDirectory, TemporaryFileRetention.REMOVE);
}

/**
* Create a new FileBasedWriteOperation.
*
* @param sink the FileBasedSink that will be used to configure this write operation.
* @param baseTemporaryFilename the base filename to be used for temporary output files.
* @param tempDirectory the base directory to be used for temporary output files.
* @param temporaryFileRetention defines how temporary files are handled.
*/
public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename,
public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory,
TemporaryFileRetention temporaryFileRetention) {
this.sink = sink;
this.baseTemporaryFilename = baseTemporaryFilename;
this.tempDirectory = tempDirectory;
this.temporaryFileRetention = temporaryFileRetention;
}

Expand Down Expand Up @@ -312,7 +327,12 @@ public void finalize(Iterable<FileResult> writerResults, PipelineOptions options

// Optionally remove temporary files.
if (temporaryFileRetention == TemporaryFileRetention.REMOVE) {
removeTemporaryFiles(options);
// We remove the entire temporary directory, rather than specifically removing the files
// from writerResults, because writerResults includes only successfully completed bundles,
// and we'd like to clean up the failed ones too.
// Note that due to GCS eventual consistency, matching files in the temp directory is also
// currently non-perfect and may fail to delete some files.
removeTemporaryFiles(files, options);
}
}

Expand Down Expand Up @@ -370,21 +390,18 @@ protected final List<String> generateDestinationFilenames(int numFiles) {
}

/**
* Removes temporary output files. Uses the temporary filename to find files to remove.
* Removes temporary output files. Uses the temporary directory to find files to remove.
*
* <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}.
* <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
* temporary files, this method will remove them.
*/
protected final void removeTemporaryFiles(PipelineOptions options) throws IOException {
String pattern = buildTemporaryFilename(baseTemporaryFilename, "*");
LOG.debug("Finding temporary bundle output files matching {}.", pattern);
FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options);
IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
Collection<String> matches = factory.match(pattern);
LOG.debug("{} temporary files matched {}", matches.size(), pattern);
LOG.debug("Removing {} files.", matches.size());
fileOperations.remove(matches);
protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options)
Copy link
Contributor

@dhalperi dhalperi Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not backwards-compatible. Please be on-guard for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of the current change is technically also not backward-compatible (e.g. the set of protected member variables has changed) for someone who was overriding FileBasedSink/FileBasedWriteOperation/... in a highly non-trivial way (e.g. trying to construct filenames manually using the protected members).

Note that, of the file-based sinks I've seen, none would be broken by this change, because they don't peek into these protected variables.

However, as for those that potentially do, I don't think there's a way to make the current change backward-compatible for them. There's a choice:

  • Make the current change, with a (I think) very small chance of breaking somebody, but in return, fix deletion of files and data loss/duplication issues.
  • Abandon the current change and say that the problem is only fixed in the Beam SDK
  • Do something much weaker but backward-compatible instead: e.g., simply add code for removing all known files, but do not place temp files in a subdirectory. As an unfortunate side effect, this will cause a divergence in code and behavior between Dataflow and Beam SDKs.

Which would you prefer? Are there other options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3 is the default choice, unless very strong guarantees can be made regarding #1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Beam and Dataflow SDKs have diverged on several points already and are expected to keep diverging. Maintaining consistency between the two is not a goal, providing the best experience with Beam SDK is, maintaining backwards compatibility in Dataflow SDK is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukecwik, you said it so nicely ;-)

throws IOException {
LOG.debug("Removing temporary bundle output files in {}.", tempDirectory);
FileOperations fileOperations =
FileOperationsFactory.getFileOperations(tempDirectory, options);
fileOperations.removeDirectoryAndFiles(tempDirectory, knownFiles);
}

/**
Expand Down Expand Up @@ -429,9 +446,7 @@ public abstract static class FileBasedWriter<T> extends Writer<T, FileResult> {
private String id;

/**
* The filename of the output bundle. Equal to the
* {@link FileBasedSink.FileBasedWriteOperation#TEMPORARY_FILENAME_SEPARATOR} and id appended to
* the baseName.
* The filename of the output bundle - $tempDirectory/$id.
*/
private String filename;

Expand Down Expand Up @@ -484,7 +499,7 @@ protected void writeFooter() throws Exception {}
public final void open(String uId) throws Exception {
this.id = uId;
filename = FileBasedWriteOperation.buildTemporaryFilename(
getWriteOperation().baseTemporaryFilename, uId);
getWriteOperation().tempDirectory, uId);
LOG.debug("Opening {}.", filename);
channel = IOChannelUtils.create(filename, mimeType);
try {
Expand Down Expand Up @@ -566,7 +581,7 @@ public static FileOperations getFileOperations(String spec, PipelineOptions opti
if (factory instanceof GcsIOChannelFactory) {
return new GcsOperations(options);
} else if (factory instanceof FileIOChannelFactory) {
return new LocalFileOperations();
return new LocalFileOperations(factory);
} else {
throw new IOException("Unrecognized file system.");
}
Expand All @@ -590,9 +605,16 @@ private interface FileOperations {
void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;

/**
* Remove a collection of files.
* Removes a directory and the files in it (but not subdirectories).
*
* <p>Additionally, to partially mitigate the effects of filesystems with eventually-consistent
* directory matching APIs, takes a list of files that are known to exist - i.e. removes the
* union of the known files and files that the filesystem says exist in the directory.
*
* <p>Assumes that, if directory listing had been strongly consistent, it would have matched
* all of knownFiles - i.e. on a strongly consistent filesystem, knownFiles can be ignored.
*/
void remove(Collection<String> filenames) throws IOException;
void removeDirectoryAndFiles(String directory, List<String> knownFiles) throws IOException;
}

/**
Expand All @@ -601,7 +623,7 @@ private interface FileOperations {
private static class GcsOperations implements FileOperations {
private final GcsUtil gcsUtil;

public GcsOperations(PipelineOptions options) {
GcsOperations(PipelineOptions options) {
gcsUtil = new GcsUtilFactory().create(options);
}

Expand All @@ -611,8 +633,21 @@ public void copy(List<String> srcFilenames, List<String> destFilenames) throws I
}

@Override
public void remove(Collection<String> filenames) throws IOException {
gcsUtil.remove(filenames);
public void removeDirectoryAndFiles(String directory, List<String> knownFiles)
throws IOException {
IOChannelFactory factory = IOChannelUtils.getFactory(directory);
Collection<String> matches = factory.match(directory + "/*");
Set<String> allMatches = new HashSet<>(matches);
allMatches.addAll(knownFiles);
LOG.debug(
"Removing {} temporary files found under {} ({} matched glob, {} additional known files)",
allMatches.size(),
directory,
matches.size(),
allMatches.size() - matches.size());
gcsUtil.remove(allMatches);
// No need to remove the directory itself: GCS doesn't have directories, so if the directory
// is empty, then it already doesn't exist.
}
}

Expand All @@ -622,6 +657,12 @@ public void remove(Collection<String> filenames) throws IOException {
private static class LocalFileOperations implements FileOperations {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class);

private final IOChannelFactory factory;

LocalFileOperations(IOChannelFactory factory) {
this.factory = factory;
}

@Override
public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
checkArgument(
Expand Down Expand Up @@ -649,11 +690,20 @@ private void copyOne(String source, String destination) throws IOException {
}

@Override
public void remove(Collection<String> filenames) throws IOException {
for (String filename : filenames) {
public void removeDirectoryAndFiles(String directory, List<String> knownFiles)
throws IOException {
if (!new File(directory).exists()) {
LOG.debug("Directory {} already doesn't exist", directory);
return;
}
Collection<String> matches = factory.match(new File(directory, "*").getAbsolutePath());
LOG.debug("Removing {} temporary files found under {}", matches.size(), directory);
for (String filename : matches) {
LOG.debug("Removing file {}", filename);
removeOne(filename);
}
LOG.debug("Removing directory {}", directory);
removeOne(directory);
}

private void removeOne(String filename) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -56,7 +56,8 @@ public Collection<String> match(String spec) throws IOException {

File parent = file.getAbsoluteFile().getParentFile();
if (!parent.exists()) {
throw new IOException("Unable to find parent directory of " + spec);
throw new FileNotFoundException(
"Parent directory " + parent + " of " + spec + " does not exist");
}

// Method getAbsolutePath() on Windows platform may return something like
Expand Down Expand Up @@ -131,6 +132,11 @@ public boolean isReadSeekEfficient(String spec) throws IOException {

@Override
public String resolve(String path, String other) throws IOException {
return Paths.get(path).resolve(other).toString();
return toPath(path).resolve(other).toString();
}

@Override
public Path toPath(String path) {
return new File(path).toPath();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public boolean isReadSeekEfficient(String spec) throws IOException {

@Override
public String resolve(String path, String other) throws IOException {
return GcsPath.fromUri(path).resolve(other).toString();
return toPath(path).resolve(other).toString();
}

@Override
public GcsPath toPath(String path) {
return GcsPath.fromUri(path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.util.Collection;

/**
Expand Down Expand Up @@ -98,4 +99,7 @@ public interface IOChannelFactory {
* dependent and therefore unspecified.
*/
String resolve(String path, String other) throws IOException;

/** Converts the given string to a {@link Path}. */
Path toPath(String path);
}
Loading