From e469a22a5db41766589257b0c49cd0cca1ecc4d7 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Thu, 10 Nov 2016 13:38:49 -0800 Subject: [PATCH] Backporting FileBasedSink changes from Beam https://github.com/apache/incubator-beam/pull/1050 https://github.com/apache/incubator-beam/pull/1278 --- .../examples/MinimalWordCountJava8Test.java | 2 +- .../cloud/dataflow/sdk/io/FileBasedSink.java | 158 ++++++++++++------ .../sdk/util/FileIOChannelFactory.java | 12 +- .../sdk/util/GcsIOChannelFactory.java | 7 +- .../dataflow/sdk/util/IOChannelFactory.java | 4 + .../dataflow/sdk/util/gcsfs/GcsPath.java | 13 +- .../dataflow/sdk/io/FileBasedSinkTest.java | 35 ++-- .../cloud/dataflow/sdk/io/XmlSinkTest.java | 16 +- .../dataflow/sdk/util/gcsfs/GcsPathTest.java | 25 +++ 9 files changed, 195 insertions(+), 77 deletions(-) diff --git a/examples/src/test/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java b/examples/src/test/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java index fcae41c6bb..2f464d192f 100644 --- a/examples/src/test/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java +++ b/examples/src/test/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java @@ -69,7 +69,7 @@ public void testMinimalWordCountJava8() throws Exception { .apply(MapElements .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) .withOutputType(new TypeDescriptor() {})) - .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); + .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix")); } private GcsUtil buildMockGcsUtil() throws IOException { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java index 4ed6c7216e..1dab3164f3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java @@ -31,20 +31,26 @@ import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.MimeTypes; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based @@ -66,6 +72,8 @@ * @param the type of values written to the sink. */ public abstract class FileBasedSink extends Sink { + private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class); + /** * Base filename for final output files. */ @@ -168,11 +176,11 @@ private static String getFileExtension(String usersExtension) { * FileBasedSinkWriter. * *

Temporary and Output File Naming:

During the write, bundles are written to temporary - * files using the baseTemporaryFilename that can be provided via the constructor of + * files using the tempDirectory that can be provided via the constructor of * FileBasedWriteOperation. These temporary files will be named - * {@code {baseTemporaryFilename}-temp-{bundleId}}, where bundleId is the unique id of the bundle. - * For example, if baseTemporaryFilename is "gs://my-bucket/my_temp_output", the output for a - * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output-temp-15723". + * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. + * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a + * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723". * *

Final output files are written to baseOutputFilename with the format * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles @@ -195,8 +203,6 @@ private static String getFileExtension(String usersExtension) { * @param the type of values written to the sink. */ public abstract static class FileBasedWriteOperation extends WriteOperation { - private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriteOperation.class); - /** * Options for handling of temporary output files. */ @@ -215,56 +221,65 @@ public enum TemporaryFileRetention { */ protected final TemporaryFileRetention temporaryFileRetention; - /** - * Base filename used for temporary output files. Default is the baseOutputFilename. - */ - protected final String baseTemporaryFilename; - - /** - * Name separator for temporary files. Temporary files will be named - * {@code {baseTemporaryFilename}-temp-{bundleId}}. - */ - protected static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-"; + /** Directory for temporary output files. */ + protected final String tempDirectory; - /** - * Build a temporary filename using the temporary filename separator with the given prefix and - * suffix. - */ - protected static final String buildTemporaryFilename(String prefix, String suffix) { - return prefix + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + suffix; + /** Constructs a temporary file path given the temporary directory and a filename. */ + protected static String buildTemporaryFilename(String tempDirectory, String filename) + throws IOException { + return IOChannelUtils.getFactory(tempDirectory).resolve(tempDirectory, filename); } /** - * Construct a FileBasedWriteOperation using the same base filename for both temporary and - * output files. + * Constructs a FileBasedWriteOperation using the default strategy for generating a temporary + * directory from the base output filename. + * + *

Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is + * /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date. * * @param sink the FileBasedSink that will be used to configure this write operation. */ public FileBasedWriteOperation(FileBasedSink sink) { - this(sink, sink.baseOutputFilename); + this(sink, buildTemporaryDirectoryName(sink.getBaseOutputFilename())); + } + + private static String buildTemporaryDirectoryName(String baseOutputFilename) { + try { + IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename); + Path baseOutputPath = factory.toPath(baseOutputFilename); + return baseOutputPath + .resolveSibling( + "temp-beam-" + + baseOutputPath.getFileName() + + "-" + + Instant.now().toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss"))) + .toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** * Construct a FileBasedWriteOperation. * * @param sink the FileBasedSink that will be used to configure this write operation. - * @param baseTemporaryFilename the base filename to be used for temporary output files. + * @param tempDirectory the base directory to be used for temporary output files. */ - public FileBasedWriteOperation(FileBasedSink sink, String baseTemporaryFilename) { - this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE); + public FileBasedWriteOperation(FileBasedSink sink, String tempDirectory) { + this(sink, tempDirectory, TemporaryFileRetention.REMOVE); } /** * Create a new FileBasedWriteOperation. * * @param sink the FileBasedSink that will be used to configure this write operation. - * @param baseTemporaryFilename the base filename to be used for temporary output files. + * @param tempDirectory the base directory to be used for temporary output files. * @param temporaryFileRetention defines how temporary files are handled. */ - public FileBasedWriteOperation(FileBasedSink sink, String baseTemporaryFilename, + public FileBasedWriteOperation(FileBasedSink sink, String tempDirectory, TemporaryFileRetention temporaryFileRetention) { this.sink = sink; - this.baseTemporaryFilename = baseTemporaryFilename; + this.tempDirectory = tempDirectory; this.temporaryFileRetention = temporaryFileRetention; } @@ -312,7 +327,12 @@ public void finalize(Iterable writerResults, PipelineOptions options // Optionally remove temporary files. if (temporaryFileRetention == TemporaryFileRetention.REMOVE) { - removeTemporaryFiles(options); + // We remove the entire temporary directory, rather than specifically removing the files + // from writerResults, because writerResults includes only successfully completed bundles, + // and we'd like to clean up the failed ones too. + // Note that due to GCS eventual consistency, matching files in the temp directory is also + // currently non-perfect and may fail to delete some files. + removeTemporaryFiles(files, options); } } @@ -370,21 +390,18 @@ protected final List generateDestinationFilenames(int numFiles) { } /** - * Removes temporary output files. Uses the temporary filename to find files to remove. + * Removes temporary output files. Uses the temporary directory to find files to remove. * *

Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}. * Note:If finalize is overridden and does not rename or otherwise finalize * temporary files, this method will remove them. */ - protected final void removeTemporaryFiles(PipelineOptions options) throws IOException { - String pattern = buildTemporaryFilename(baseTemporaryFilename, "*"); - LOG.debug("Finding temporary bundle output files matching {}.", pattern); - FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options); - IOChannelFactory factory = IOChannelUtils.getFactory(pattern); - Collection matches = factory.match(pattern); - LOG.debug("{} temporary files matched {}", matches.size(), pattern); - LOG.debug("Removing {} files.", matches.size()); - fileOperations.remove(matches); + protected final void removeTemporaryFiles(List knownFiles, PipelineOptions options) + throws IOException { + LOG.debug("Removing temporary bundle output files in {}.", tempDirectory); + FileOperations fileOperations = + FileOperationsFactory.getFileOperations(tempDirectory, options); + fileOperations.removeDirectoryAndFiles(tempDirectory, knownFiles); } /** @@ -429,9 +446,7 @@ public abstract static class FileBasedWriter extends Writer { private String id; /** - * The filename of the output bundle. Equal to the - * {@link FileBasedSink.FileBasedWriteOperation#TEMPORARY_FILENAME_SEPARATOR} and id appended to - * the baseName. + * The filename of the output bundle - $tempDirectory/$id. */ private String filename; @@ -484,7 +499,7 @@ protected void writeFooter() throws Exception {} public final void open(String uId) throws Exception { this.id = uId; filename = FileBasedWriteOperation.buildTemporaryFilename( - getWriteOperation().baseTemporaryFilename, uId); + getWriteOperation().tempDirectory, uId); LOG.debug("Opening {}.", filename); channel = IOChannelUtils.create(filename, mimeType); try { @@ -566,7 +581,7 @@ public static FileOperations getFileOperations(String spec, PipelineOptions opti if (factory instanceof GcsIOChannelFactory) { return new GcsOperations(options); } else if (factory instanceof FileIOChannelFactory) { - return new LocalFileOperations(); + return new LocalFileOperations(factory); } else { throw new IOException("Unrecognized file system."); } @@ -590,9 +605,16 @@ private interface FileOperations { void copy(List srcFilenames, List destFilenames) throws IOException; /** - * Remove a collection of files. + * Removes a directory and the files in it (but not subdirectories). + * + *

Additionally, to partially mitigate the effects of filesystems with eventually-consistent + * directory matching APIs, takes a list of files that are known to exist - i.e. removes the + * union of the known files and files that the filesystem says exist in the directory. + * + *

Assumes that, if directory listing had been strongly consistent, it would have matched + * all of knownFiles - i.e. on a strongly consistent filesystem, knownFiles can be ignored. */ - void remove(Collection filenames) throws IOException; + void removeDirectoryAndFiles(String directory, List knownFiles) throws IOException; } /** @@ -601,7 +623,7 @@ private interface FileOperations { private static class GcsOperations implements FileOperations { private final GcsUtil gcsUtil; - public GcsOperations(PipelineOptions options) { + GcsOperations(PipelineOptions options) { gcsUtil = new GcsUtilFactory().create(options); } @@ -611,8 +633,21 @@ public void copy(List srcFilenames, List destFilenames) throws I } @Override - public void remove(Collection filenames) throws IOException { - gcsUtil.remove(filenames); + public void removeDirectoryAndFiles(String directory, List knownFiles) + throws IOException { + IOChannelFactory factory = IOChannelUtils.getFactory(directory); + Collection matches = factory.match(directory + "/*"); + Set allMatches = new HashSet<>(matches); + allMatches.addAll(knownFiles); + LOG.debug( + "Removing {} temporary files found under {} ({} matched glob, {} additional known files)", + allMatches.size(), + directory, + matches.size(), + allMatches.size() - matches.size()); + gcsUtil.remove(allMatches); + // No need to remove the directory itself: GCS doesn't have directories, so if the directory + // is empty, then it already doesn't exist. } } @@ -622,6 +657,12 @@ public void remove(Collection filenames) throws IOException { private static class LocalFileOperations implements FileOperations { private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class); + private final IOChannelFactory factory; + + LocalFileOperations(IOChannelFactory factory) { + this.factory = factory; + } + @Override public void copy(List srcFilenames, List destFilenames) throws IOException { checkArgument( @@ -649,11 +690,20 @@ private void copyOne(String source, String destination) throws IOException { } @Override - public void remove(Collection filenames) throws IOException { - for (String filename : filenames) { + public void removeDirectoryAndFiles(String directory, List knownFiles) + throws IOException { + if (!new File(directory).exists()) { + LOG.debug("Directory {} already doesn't exist", directory); + return; + } + Collection matches = factory.match(new File(directory, "*").getAbsolutePath()); + LOG.debug("Removing {} temporary files found under {}", matches.size(), directory); + for (String filename : matches) { LOG.debug("Removing file {}", filename); removeOne(filename); } + LOG.debug("Removing directory {}", directory); + removeOne(directory); } private void removeOne(String filename) throws IOException { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java index 443c5e240f..0d42f56dbe 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/FileIOChannelFactory.java @@ -35,8 +35,8 @@ import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.nio.file.PathMatcher; -import java.nio.file.Paths; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -56,7 +56,8 @@ public Collection match(String spec) throws IOException { File parent = file.getAbsoluteFile().getParentFile(); if (!parent.exists()) { - throw new IOException("Unable to find parent directory of " + spec); + throw new FileNotFoundException( + "Parent directory " + parent + " of " + spec + " does not exist"); } // Method getAbsolutePath() on Windows platform may return something like @@ -131,6 +132,11 @@ public boolean isReadSeekEfficient(String spec) throws IOException { @Override public String resolve(String path, String other) throws IOException { - return Paths.get(path).resolve(other).toString(); + return toPath(path).resolve(other).toString(); + } + + @Override + public Path toPath(String path) { + return new File(path).toPath(); } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GcsIOChannelFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GcsIOChannelFactory.java index ce933f563a..5f707b6ce7 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GcsIOChannelFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GcsIOChannelFactory.java @@ -81,6 +81,11 @@ public boolean isReadSeekEfficient(String spec) throws IOException { @Override public String resolve(String path, String other) throws IOException { - return GcsPath.fromUri(path).resolve(other).toString(); + return toPath(path).resolve(other).toString(); + } + + @Override + public GcsPath toPath(String path) { + return GcsPath.fromUri(path); } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/IOChannelFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/IOChannelFactory.java index 03822ff110..b27e66d037 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/IOChannelFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/IOChannelFactory.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; import java.util.Collection; /** @@ -98,4 +99,7 @@ public interface IOChannelFactory { * dependent and therefore unspecified. */ String resolve(String path, String other) throws IOException; + + /** Converts the given string to a {@link Path}. */ + Path toPath(String path); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java index df95b1eb56..0fd1779309 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java @@ -263,7 +263,12 @@ public GcsPath getRoot() { @Override public GcsPath getFileName() { - throw new UnsupportedOperationException(); + int nameCount = getNameCount(); + if (nameCount < 2) { + throw new UnsupportedOperationException( + "Can't get filename from root path in the bucket: " + this); + } + return getName(nameCount - 1); } /** @@ -438,7 +443,11 @@ public Path resolveSibling(Path other) { @Override public Path resolveSibling(String other) { - throw new UnsupportedOperationException(); + if (getNameCount() < 2) { + throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this); + } + GcsPath parent = getParent(); + return (parent == null) ? fromUri(other) : parent.resolve(other); } @Override diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/FileBasedSinkTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/FileBasedSinkTest.java index da23f3a562..83ac507771 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/FileBasedSinkTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/FileBasedSinkTest.java @@ -44,6 +44,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; /** @@ -55,7 +56,7 @@ public class FileBasedSinkTest { public TemporaryFolder tmpFolder = new TemporaryFolder(); private String baseOutputFilename = "output"; - private String baseTemporaryFilename = "temp"; + private String tempDirectory = "temp"; private String appendToTempFolder(String filename) { return Paths.get(tmpFolder.getRoot().getPath(), filename).toString(); @@ -65,8 +66,8 @@ private String getBaseOutputFilename() { return appendToTempFolder(baseOutputFilename); } - private String getBaseTempFilename() { - return appendToTempFolder(baseTemporaryFilename); + private String getBaseTempDirectory() { + return appendToTempFolder(tempDirectory); } /** @@ -77,7 +78,7 @@ private String getBaseTempFilename() { public void testWriter() throws Exception { String testUid = "testId"; String expectedFilename = - getBaseTempFilename() + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + testUid; + getBaseTempDirectory() + "/" + testUid; SimpleSink.SimpleWriter writer = buildWriter(); List values = Arrays.asList("sympathetic vulture", "boresome hummingbird"); @@ -129,7 +130,7 @@ private void writeFile(List lines, File file) throws Exception { */ @Test public void testRemoveWithTempFilename() throws Exception { - testRemoveTemporaryFiles(3, baseTemporaryFilename); + testRemoveTemporaryFiles(3, tempDirectory); } /** @@ -181,8 +182,8 @@ public void testFinalizeWithIntermediateState() throws Exception { runFinalize(writeOp, files, false); // create a temporary file - tmpFolder.newFile( - baseTemporaryFilename + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + "1"); + tmpFolder.newFolder(tempDirectory); + tmpFolder.newFile(tempDirectory + "/1"); runFinalize(writeOp, files, false); } @@ -204,8 +205,10 @@ private List generateTemporaryFilesForFinalize(int numFiles) throws Except List temporaryFiles = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { String temporaryFilename = - FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i); - File tmpFile = tmpFolder.newFile(temporaryFilename); + FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, "" + i); + File tmpFile = new File(tmpFolder.getRoot(), temporaryFilename); + tmpFile.getParentFile().mkdirs(); + assertTrue(tmpFile.createNewFile()); temporaryFiles.add(tmpFile); } @@ -237,6 +240,10 @@ private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List tem assertTrue(outputFiles.get(i).exists()); assertEquals(retainTemporaryFiles, temporaryFiles.get(i).exists()); } + + if (!retainTemporaryFiles) { + assertFalse(new File(writeOp.tempDirectory).exists()); + } } /** @@ -251,14 +258,16 @@ private void testRemoveTemporaryFiles(int numFiles, String baseTemporaryFilename List temporaryFiles = new ArrayList<>(); List outputFiles = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - File tmpFile = tmpFolder.newFile( + File tmpFile = new File(tmpFolder.getRoot(), FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i)); + tmpFile.getParentFile().mkdirs(); + assertTrue(tmpFile.createNewFile()); temporaryFiles.add(tmpFile); File outputFile = tmpFolder.newFile(baseOutputFilename + i); outputFiles.add(outputFile); } - writeOp.removeTemporaryFiles(options); + writeOp.removeTemporaryFiles(Collections.emptyList(), options); for (int i = 0; i < numFiles; i++) { assertFalse(temporaryFiles.get(i).exists()); @@ -482,7 +491,7 @@ private SimpleSink buildSink() { private SimpleSink.SimpleWriteOperation buildWriteOperation( TemporaryFileRetention fileRetention) { SimpleSink sink = buildSink(); - return new SimpleSink.SimpleWriteOperation(sink, getBaseTempFilename(), fileRetention); + return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory(), fileRetention); } /** @@ -499,7 +508,7 @@ private SimpleSink.SimpleWriteOperation buildWriteOperation(String baseTemporary private SimpleSink.SimpleWriteOperation buildWriteOperation() { SimpleSink sink = buildSink(); return new SimpleSink.SimpleWriteOperation( - sink, getBaseTempFilename(), TemporaryFileRetention.REMOVE); + sink, getBaseTempDirectory(), TemporaryFileRetention.REMOVE); } /** diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java index 22c1b59f45..50c7c22df4 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java @@ -19,6 +19,7 @@ import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.StringContains.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -41,6 +42,7 @@ import java.io.FileOutputStream; import java.io.FileReader; import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -61,7 +63,7 @@ public class XmlSinkTest { private Class testClass = Bird.class; private String testRootElement = "testElement"; - private String testFilePrefix = "testPrefix"; + private String testFilePrefix = "/path/to/testPrefix"; /** * An XmlWriter correctly writes objects as Xml elements with an enclosing root element. @@ -145,7 +147,11 @@ public void testCreateWriteOperations() { assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename); assertEquals(testRootElement, writeOp.getSink().rootElementName); assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension); - assertEquals(testFilePrefix, writeOp.baseTemporaryFilename); + Path outputPath = new File(testFilePrefix).toPath(); + Path tempPath = new File(writeOp.tempDirectory).toPath(); + assertEquals(outputPath.getParent(), tempPath.getParent()); + assertThat( + tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); } /** @@ -158,7 +164,11 @@ public void testCreateWriter() throws Exception { XmlSink.writeOf(testClass, testRootElement, testFilePrefix) .createWriteOperation(options); XmlWriter writer = writeOp.createWriter(options); - assertEquals(testFilePrefix, writer.getWriteOperation().baseTemporaryFilename); + Path outputPath = new File(testFilePrefix).toPath(); + Path tempPath = new File(writer.getWriteOperation().tempDirectory).toPath(); + assertEquals(outputPath.getParent(), tempPath.getParent()); + assertThat( + tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); assertEquals(testRootElement, writer.getWriteOperation().getSink().rootElementName); assertNotNull(writer.marshaller); } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPathTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPathTest.java index 3aefa6d0f0..5e48665895 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPathTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPathTest.java @@ -24,7 +24,9 @@ import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -73,6 +75,9 @@ static final class TestCase { new TestCase("gs://bucket/", "bucket") ); + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Test public void testGcsPathParsing() throws Exception { for (TestCase testCase : PATH_TEST_CASES) { @@ -236,6 +241,26 @@ public void testResolveOther() { assertEquals("a/b", b.getObject()); } + @Test + public void testGetFileName() { + assertEquals("foo", GcsPath.fromUri("gs://bucket/bar/foo").getFileName().toString()); + assertEquals("foo", GcsPath.fromUri("gs://bucket/foo").getFileName().toString()); + thrown.expect(UnsupportedOperationException.class); + GcsPath.fromUri("gs://bucket/").getFileName(); + } + + @Test + public void testResolveSibling() { + assertEquals( + "gs://bucket/bar/moo", + GcsPath.fromUri("gs://bucket/bar/foo").resolveSibling("moo").toString()); + assertEquals( + "gs://bucket/moo", + GcsPath.fromUri("gs://bucket/foo").resolveSibling("moo").toString()); + thrown.expect(UnsupportedOperationException.class); + GcsPath.fromUri("gs://bucket/").resolveSibling("moo"); + } + @Test public void testCompareTo() { GcsPath a = GcsPath.fromComponents("bucket", "a");