From 187fb984f36a090ab31a9aa38564688d5811b61f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Tue, 31 May 2016 18:04:52 +0200 Subject: [PATCH 1/8] [BEAM-316] Add file scheme support in TextIO --- .../main/java/org/apache/beam/sdk/util/IOChannelUtils.java | 1 + .../java/org/apache/beam/sdk/util/IOChannelUtilsTest.java | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java index efb0ff6ff8bb..913824d821a8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java @@ -63,6 +63,7 @@ public static void setIOFactory(String scheme, IOChannelFactory factory) { */ public static void registerStandardIOFactories(PipelineOptions options) { setIOFactory("gs", new GcsIOChannelFactory(options.as(GcsOptions.class))); + setIOFactory("file", new FileIOChannelFactory()); } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java index 12312d13885b..9168fd681b84 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java @@ -79,6 +79,13 @@ public void testLargeShardCount() { 100, 5000)); } + @Test + public void testFilePrefix() throws Exception { + IOChannelUtils.getFactory("file://tmp"); + IOChannelUtils.getFactory("file:/tmp"); + IOChannelUtils.getFactory("file:tmp"); + } + @Test public void testGetSizeBytes() throws Exception { String data = "TestData"; From b469989d8dd3d9c6e432bbd5bdbc65d998847725 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 18 Jul 2016 15:35:06 -0700 Subject: [PATCH 2/8] [BEAM-316] Add file URI handling * Register FileIOChannelFactory for file scheme * Modify FileIOChannelFactory to dynamically remove the file:// scheme string. --- .../beam/sdk/util/FileIOChannelFactory.java | 29 ++++++++++++++----- .../sdk/util/FileIOChannelFactoryTest.java | 23 +++++++++++++-- .../beam/sdk/util/IOChannelUtilsTest.java | 9 +++--- 3 files changed, 48 insertions(+), 13 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index 7bc09e9c95ef..c395a7169964 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -36,24 +36,38 @@ import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.nio.file.PathMatcher; -import java.nio.file.Paths; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.regex.Matcher; /** - * Implements IOChannelFactory for local files. + * Implements {@link IOChannelFactory} for local files. */ public class FileIOChannelFactory implements IOChannelFactory { private static final Logger LOG = LoggerFactory.getLogger(FileIOChannelFactory.class); + /** + * Converts the given file spec to a java {@link File}. If {@code spec} is actually a URI with + * the {@code file://} prefix, then this function will ensure that the returned {@link File} + * has the correct path. + */ + private static File specToFile(String spec) { + // Strip the "file://" prefix from the spec, if present. + if (spec.toLowerCase().startsWith("file://")) { + return new File(spec.substring("file://".length())); + } else { + return new File(spec); + } + } + // This implementation only allows for wildcards in the file name. // The directory portion must exist as-is. @Override public Collection match(String spec) throws IOException { - File file = new File(spec); + File file = specToFile(spec); File parent = file.getAbsoluteFile().getParentFile(); if (!parent.exists()) { @@ -95,7 +109,7 @@ public boolean apply(File input) { public ReadableByteChannel open(String spec) throws IOException { LOG.debug("opening file {}", spec); @SuppressWarnings("resource") // The caller is responsible for closing the channel. - FileInputStream inputStream = new FileInputStream(spec); + FileInputStream inputStream = new FileInputStream(specToFile(spec)); // Use this method for creating the channel (rather than new FileChannel) so that we get // regular FileNotFoundException. Closing the underyling channel will close the inputStream. return inputStream.getChannel(); @@ -105,7 +119,7 @@ public ReadableByteChannel open(String spec) throws IOException { public WritableByteChannel create(String spec, String mimeType) throws IOException { LOG.debug("creating file {}", spec); - File file = new File(spec); + File file = specToFile(spec); if (file.getAbsoluteFile().getParentFile() != null && !file.getAbsoluteFile().getParentFile().exists() && !file.getAbsoluteFile().getParentFile().mkdirs()) { @@ -118,7 +132,7 @@ public WritableByteChannel create(String spec, String mimeType) @Override public long getSizeBytes(String spec) throws IOException { try { - return Files.size(FileSystems.getDefault().getPath(spec)); + return Files.size(specToFile(spec).toPath()); } catch (NoSuchFileException e) { throw new FileNotFoundException(e.getReason()); } @@ -131,6 +145,7 @@ public boolean isReadSeekEfficient(String spec) throws IOException { @Override public String resolve(String path, String other) throws IOException { - return Paths.get(path).resolve(other).toString(); + Path p = specToFile(path).toPath(); + return p.resolve(other).toString(); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java index b510408fe960..73086c36a008 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util; +import static org.apache.beam.sdk.util.IOChannelUtils.resolve; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -52,6 +54,7 @@ public class FileIOChannelFactoryTest { private void testCreate(Path path) throws Exception { String expected = "my test string"; + // First with the path string try (Writer writer = Channels.newWriter( factory.create(path.toString(), MimeTypes.TEXT), StandardCharsets.UTF_8.name())) { writer.write(expected); @@ -59,6 +62,18 @@ private void testCreate(Path path) throws Exception { assertThat( Files.readLines(path.toFile(), StandardCharsets.UTF_8), containsInAnyOrder(expected)); + + // Delete the file before trying as URI + assertTrue("Unable to delete file " + path, path.toFile().delete()); + + // Second with the path URI + try (Writer writer = Channels.newWriter( + factory.create(path.toUri().toString(), MimeTypes.TEXT), StandardCharsets.UTF_8.name())) { + writer.write(expected); + } + assertThat( + Files.readLines(path.toFile(), StandardCharsets.UTF_8), + containsInAnyOrder(expected)); } @Test @@ -194,8 +209,12 @@ public void testMatchWithDirectoryFiltersOutDirectory() throws Exception { @Test public void testResolve() throws Exception { - String expected = temporaryFolder.getRoot().toPath().resolve("aa").toString(); - assertEquals(expected, factory.resolve(temporaryFolder.getRoot().toString(), "aa")); + Path rootPath = temporaryFolder.getRoot().toPath(); + String rootString = rootPath.toString(); + + String expected = rootPath.resolve("aa").toString(); + assertEquals(expected, factory.resolve(rootString, "aa")); + assertEquals(expected, factory.resolve("file://" + rootString, "aa")); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java index 9168fd681b84..8a7eb0216954 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java @@ -17,7 +17,9 @@ */ package org.apache.beam.sdk.util; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import com.google.common.io.Files; @@ -80,10 +82,9 @@ public void testLargeShardCount() { } @Test - public void testFilePrefix() throws Exception { - IOChannelUtils.getFactory("file://tmp"); - IOChannelUtils.getFactory("file:/tmp"); - IOChannelUtils.getFactory("file:tmp"); + public void testHandlerNoScheme() throws Exception { + String pathToTempFolder = tmpFolder.getRoot().getAbsolutePath(); + assertThat(IOChannelUtils.getFactory(pathToTempFolder), instanceOf(FileIOChannelFactory.class)); } @Test From 4349e7a8b38abd76a299ff1bd5f38207734e6151 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 18 Jul 2016 15:53:47 -0700 Subject: [PATCH 3/8] fixup! [BEAM-316] Add file URI handling --- .../java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java index 73086c36a008..8d429d5ecd53 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.util; -import static org.apache.beam.sdk.util.IOChannelUtils.resolve; - import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; From 44b841de9e2fb70d2659eeff989b0eb04c6ab7b0 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 20 Jul 2016 23:28:48 -0700 Subject: [PATCH 4/8] fixup! [BEAM-316] Add file URI handling --- .../beam/sdk/util/FileIOChannelFactory.java | 18 +++++++++++++----- .../sdk/util/FileIOChannelFactoryTest.java | 4 ++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index 95e86def127c..c9123f7d1f00 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; @@ -30,6 +32,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; @@ -38,6 +41,7 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.PathMatcher; +import java.nio.file.Paths; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -51,14 +55,18 @@ public class FileIOChannelFactory implements IOChannelFactory { /** * Converts the given file spec to a java {@link File}. If {@code spec} is actually a URI with - * the {@code file://} prefix, then this function will ensure that the returned {@link File} + * the {@code file} scheme, then this function will ensure that the returned {@link File} * has the correct path. */ private static File specToFile(String spec) { - // Strip the "file://" prefix from the spec, if present. - if (spec.toLowerCase().startsWith("file://")) { - return new File(spec.substring("file://".length())); - } else { + try { + // Handle URI. + URI uri = URI.create(spec); + checkArgument( + "file".equals(uri.getScheme()), "Expected a file:// scheme, but got %s", uri.getScheme()); + return Paths.get(uri).toFile(); + } catch (IllegalArgumentException e) { + // Fall back to assuming this is actually a file. return new File(spec); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java index 8d429d5ecd53..0f70636316b0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java @@ -217,13 +217,13 @@ public void testResolve() throws Exception { @Test public void testResolveOtherIsFullPath() throws Exception { - String expected = temporaryFolder.getRoot().getPath().toString(); + String expected = temporaryFolder.getRoot().getPath(); assertEquals(expected, factory.resolve(expected, expected)); } @Test public void testResolveOtherIsEmptyPath() throws Exception { - String expected = temporaryFolder.getRoot().getPath().toString(); + String expected = temporaryFolder.getRoot().getPath(); assertEquals(expected, factory.resolve(expected, "")); } From e1cebeb01b256efbaa08b738163e6dc50fcf94e8 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 20 Jul 2016 23:31:00 -0700 Subject: [PATCH 5/8] fixup! [BEAM-316] Add file URI handling --- .../java/org/apache/beam/sdk/util/FileIOChannelFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index c9123f7d1f00..e91697e9acec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -67,7 +67,7 @@ private static File specToFile(String spec) { return Paths.get(uri).toFile(); } catch (IllegalArgumentException e) { // Fall back to assuming this is actually a file. - return new File(spec); + return Paths.get(spec).toFile(); } } From 66ef1371047eed71ace08c0a6f98ed009fe129fd Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 20 Jul 2016 23:31:58 -0700 Subject: [PATCH 6/8] fixup! [BEAM-316] Add file URI handling --- .../java/org/apache/beam/sdk/util/FileIOChannelFactory.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index e91697e9acec..c171a925e607 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -62,8 +62,6 @@ private static File specToFile(String spec) { try { // Handle URI. URI uri = URI.create(spec); - checkArgument( - "file".equals(uri.getScheme()), "Expected a file:// scheme, but got %s", uri.getScheme()); return Paths.get(uri).toFile(); } catch (IllegalArgumentException e) { // Fall back to assuming this is actually a file. From 95e84a6e5849018c818a6ca72e188f14a7b29fd2 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 21 Jul 2016 14:43:27 -0700 Subject: [PATCH 7/8] fixup! [BEAM-316] Add file URI handling --- .../java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java index 0f70636316b0..16fcd712719c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java @@ -41,6 +41,7 @@ import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; /** Tests for {@link FileIOChannelFactory}. */ @@ -212,6 +213,7 @@ public void testResolve() throws Exception { String expected = rootPath.resolve("aa").toString(); assertEquals(expected, factory.resolve(rootString, "aa")); + assertEquals(expected, factory.resolve("file:" + rootString, "aa")); assertEquals(expected, factory.resolve("file://" + rootString, "aa")); } From e533e47b300da20a772242e0a963660920e19fe6 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 21 Jul 2016 15:25:26 -0700 Subject: [PATCH 8/8] fixup! [BEAM-316] Add file URI handling --- .../java/org/apache/beam/sdk/util/FileIOChannelFactory.java | 2 -- .../java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java | 1 - 2 files changed, 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index c171a925e607..92f351bb3af2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.util; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java index 16fcd712719c..79e6e5cd6b0c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java @@ -41,7 +41,6 @@ import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.List; /** Tests for {@link FileIOChannelFactory}. */