Skip to content

Commit

Permalink
Merge 3aa8ebb into d9a6311
Browse files Browse the repository at this point in the history
  • Loading branch information
sammcveety committed Dec 9, 2016
2 parents d9a6311 + 3aa8ebb commit cc3d338
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
Expand Down Expand Up @@ -205,6 +207,13 @@ public String getBaseOutputFilename() {
return baseOutputFilename.get();
}

/**
* Returns the base output filename for this file based sink.
*/
public ValueProvider<String> getBaseOutputFilenameProvider() {
return baseOutputFilename;
}

@Override
public void validate(PipelineOptions options) {}

Expand Down Expand Up @@ -290,7 +299,7 @@ public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T
protected final FileBasedSink<T> sink;

/** Directory for temporary output files. */
protected final String tempDirectory;
protected final ValueProvider<String> tempDirectory;

/** Constructs a temporary file path given the temporary directory and a filename. */
protected static String buildTemporaryFilename(String tempDirectory, String filename)
Expand All @@ -308,22 +317,29 @@ protected static String buildTemporaryFilename(String tempDirectory, String file
* @param sink the FileBasedSink that will be used to configure this write operation.
*/
public FileBasedWriteOperation(FileBasedSink<T> sink) {
this(sink, buildTemporaryDirectoryName(sink.getBaseOutputFilename()));
this(sink, NestedValueProvider.of(
sink.getBaseOutputFilenameProvider(), new TemporaryDirectoryBuilder()));
}

private static String buildTemporaryDirectoryName(String baseOutputFilename) {
try {
IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename);
Path baseOutputPath = factory.toPath(baseOutputFilename);
return baseOutputPath
.resolveSibling(
"temp-beam-"
+ baseOutputPath.getFileName()
+ "-"
+ Instant.now().toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss")))
.toString();
} catch (IOException e) {
throw new RuntimeException(e);
private static class TemporaryDirectoryBuilder
implements SerializableFunction<String, String> {
Instant now = Instant.now();

@Override
public String apply(String baseOutputFilename) {
try {
IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename);
Path baseOutputPath = factory.toPath(baseOutputFilename);
return baseOutputPath
.resolveSibling(
"temp-beam-"
+ baseOutputPath.getFileName()
+ "-"
+ now.toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss")))
.toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

Expand All @@ -334,6 +350,10 @@ private static String buildTemporaryDirectoryName(String baseOutputFilename) {
* @param tempDirectory the base directory to be used for temporary output files.
*/
public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
this(sink, StaticValueProvider.of(tempDirectory));
}

private FileBasedWriteOperation(FileBasedSink<T> sink, ValueProvider<String> tempDirectory) {
this.sink = sink;
this.tempDirectory = tempDirectory;
}
Expand Down Expand Up @@ -452,26 +472,27 @@ protected final List<String> generateDestinationFilenames(int numFiles) {
*/
protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options)
throws IOException {
LOG.debug("Removing temporary bundle output files in {}.", tempDirectory);
IOChannelFactory factory = IOChannelUtils.getFactory(tempDirectory);
LOG.debug("Removing temporary bundle output files in {}.", tempDirectory.get());
String tempDir = tempDirectory.get();
IOChannelFactory factory = IOChannelUtils.getFactory(tempDir);

// To partially mitigate the effects of filesystems with eventually-consistent
// directory matching APIs, we remove not only files that the filesystem says exist
// in the directory (which may be incomplete), but also files that are known to exist
// (produced by successfully completed bundles).
// This may still fail to remove temporary outputs of some failed bundles, but at least
// the common case (where all bundles succeed) is guaranteed to be fully addressed.
Collection<String> matches = factory.match(factory.resolve(tempDirectory, "*"));
Collection<String> matches = factory.match(factory.resolve(tempDir, "*"));
Set<String> allMatches = new HashSet<>(matches);
allMatches.addAll(knownFiles);
LOG.debug(
"Removing {} temporary files found under {} ({} matched glob, {} known files)",
allMatches.size(),
tempDirectory,
tempDir,
matches.size(),
allMatches.size() - matches.size());
factory.remove(allMatches);
factory.remove(ImmutableList.of(tempDirectory));
factory.remove(ImmutableList.of(tempDir));
}

/**
Expand Down Expand Up @@ -569,7 +590,7 @@ protected void writeFooter() throws Exception {}
public final void open(String uId) throws Exception {
this.id = uId;
filename = FileBasedWriteOperation.buildTemporaryFilename(
getWriteOperation().tempDirectory, uId);
getWriteOperation().tempDirectory.get(), uId);
LOG.debug("Opening {}.", filename);
final WritableByteChannelFactory factory =
getWriteOperation().getSink().writableByteChannelFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> tem
assertFalse(temporaryFiles.get(i).exists());
}

assertFalse(new File(writeOp.tempDirectory).exists());
assertFalse(new File(writeOp.tempDirectory.get()).exists());
// Test that repeated requests of the temp directory return a stable result.
assertEquals(writeOp.tempDirectory.get(), writeOp.tempDirectory.get());
}

/**
Expand Down Expand Up @@ -487,7 +489,7 @@ public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception
final FileBasedWriter<String> writer =
writeOp.createWriter(null);
final String expectedFilename =
writeOp.tempDirectory + "/" + testUid;
writeOp.tempDirectory.get() + "/" + testUid;

final List<String> expected = new ArrayList<>();
expected.add("header");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
Expand Down Expand Up @@ -629,6 +630,24 @@ public void testBadWildcardRecursive() throws Exception {
pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
}

/** Options for testing. */
public interface RuntimeTestOptions extends PipelineOptions {
ValueProvider<String> getInput();
void setInput(ValueProvider<String> value);

ValueProvider<String> getOutput();
void setOutput(ValueProvider<String> value);
}

@Test
public void testRuntimeOptionsNotCalledInApply() throws Exception {
Pipeline pipeline = TestPipeline.create();
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
pipeline
.apply(TextIO.Read.from(options.getInput()).withoutValidation())
.apply(TextIO.Write.to(options.getOutput()).withoutValidation());
}

@Test
public void testReadWithoutValidationFlag() throws Exception {
TextIO.Read.Bound<String> read = TextIO.Read.from("gs://bucket/foo*/baz");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void testCreateWriteOperations() {
assertEquals(testRootElement, writeOp.getSink().rootElementName);
assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
Path outputPath = new File(testFilePrefix).toPath();
Path tempPath = new File(writeOp.tempDirectory).toPath();
Path tempPath = new File(writeOp.tempDirectory.get()).toPath();
assertEquals(outputPath.getParent(), tempPath.getParent());
assertThat(
tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
Expand All @@ -163,7 +163,7 @@ public void testCreateWriter() throws Exception {
.createWriteOperation(options);
XmlWriter<Bird> writer = writeOp.createWriter(options);
Path outputPath = new File(testFilePrefix).toPath();
Path tempPath = new File(writer.getWriteOperation().tempDirectory).toPath();
Path tempPath = new File(writer.getWriteOperation().tempDirectory.get()).toPath();
assertEquals(outputPath.getParent(), tempPath.getParent());
assertThat(
tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
Expand Down

0 comments on commit cc3d338

Please sign in to comment.