From 6ce46b93f8476c66d5911fceb35a3804094e6c0f Mon Sep 17 00:00:00 2001 From: Michael Luckey Date: Wed, 1 Mar 2017 01:13:11 +0100 Subject: [PATCH] BEAM-1569 support file patterns containing spaces --- .../beam/sdk/io/hdfs/HDFSFileSource.java | 6 +++--- .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index 2a731fb12004..df7264379eef 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -30,7 +30,6 @@ import java.io.ObjectOutput; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.ListIterator; @@ -337,9 +336,10 @@ public void validate() { UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - FileSystem fs = FileSystem.get(new URI(filepattern()), + final Path pathPattern = new Path(filepattern()); + FileSystem fs = FileSystem.get(pathPattern.toUri(), SerializableConfiguration.newConfiguration(serializableConfiguration())); - FileStatus[] fileStatuses = fs.globStatus(new Path(filepattern())); + FileStatus[] fileStatuses = fs.globStatus(pathPattern); checkState( fileStatuses != null && fileStatuses.length > 0, "Unable to find any files matching %s", filepattern()); diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java index ac6af40aa063..c821d9dc0077 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java @@ -63,8 +63,23 @@ public void testFullyReadSingleFile() throws Exception { File file = createFileWithData("tmp.seq", expectedResults); HDFSFileSource, IntWritable, Text> source = - HDFSFileSource.from( - file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); + HDFSFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); + + assertEquals(file.length(), source.getEstimatedSizeBytes(null)); + + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); + } + + @Test + public void testFullyReadSingleFileWithSpaces() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + List> expectedResults = createRandomRecords(3, 10, 0); + File file = createFileWithData("tmp data.seq", expectedResults); + + HDFSFileSource, IntWritable, Text> source = + HDFSFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); assertEquals(file.length(), source.getEstimatedSizeBytes(null));