diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java index 001f596995825..76c5dc126fcb7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -139,4 +139,16 @@ protected abstract void rename( * to determine the state of the resources. */ protected abstract void delete(Collection resourceIds) throws IOException; + + /** + * Returns a new {@link ResourceId} for this filesystem that represents the named resource. + * The user supplies both the resource spec and whether it is a directory. + * + *

The supplied {@code singleResourceSpec} is expected to be in a proper format, including + * any necessary escaping, for this {@link FileSystem}. + * + *

This function may throw an {@link IllegalArgumentException} if given an invalid argument, + * such as when the specified {@code singleResourceSpec} is not a valid resource name. + */ + protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 0b50070453d44..b2904985a1c81 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -485,4 +485,19 @@ public String apply(@Nonnull FileSystemRegistrar input) { } } } + + /** + * Returns a new {@link ResourceId} that represents the named resource of a type corresponding + * to the resource type. + * + *

The supplied {@code singleResourceSpec} is expected to be in a proper format, including + * any necessary escaping, for the underlying {@link FileSystem}. + * + *

This function may throw an {@link IllegalArgumentException} if given an invalid argument, + * such as when the specified {@code singleResourceSpec} is not a valid resource name. + */ + public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + return getFileSystemInternal(parseScheme(singleResourceSpec)) + .matchNewResource(singleResourceSpec, isDirectory); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index 8349a35078af8..2d80ae4bf512c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -34,6 +34,7 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; @@ -164,6 +165,12 @@ protected void delete(Collection resourceIds) throws IOExceptio } } + @Override + protected LocalResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + Path path = Paths.get(singleResourceSpec); + return LocalResourceId.fromPath(path, isDirectory); + } + private MatchResult matchOne(String spec) throws IOException { File file = Paths.get(spec).toFile(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java index b7859ca5e1931..26a21bbad1905 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java @@ -19,6 +19,7 @@ import java.io.Serializable; import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; /** @@ -27,7 +28,21 @@ *

{@link ResourceId} is hierarchical and composed of a sequence of directory * and file name elements separated by a special separator or delimiter. * - *

TODO: add examples for how ResourceId is constructed and used. + *

{@link ResourceId ResourceIds} are created using {@link FileSystems}. The two primary + * mechanisms are: + * + *

*/ public interface ResourceId extends Serializable { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java index d335974f1274e..ac4fe615609a2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java @@ -18,8 +18,10 @@ package org.apache.beam.sdk.io; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -301,6 +303,26 @@ public void testMatchWithoutParentDirectory() throws Exception { toFilenames(localFileSystem.match(ImmutableList.of(pattern.toString()))).isEmpty()); } + @Test + public void testMatchNewResource() throws Exception { + LocalResourceId fileResource = + localFileSystem + .matchNewResource("/some/test/resource/path", false /* isDirectory */); + LocalResourceId dirResource = + localFileSystem + .matchNewResource("/some/test/resource/path", true /* isDirectory */); + assertNotEquals(fileResource, dirResource); + assertThat( + fileResource.getCurrentDirectory().resolve( + "path", StandardResolveOptions.RESOLVE_DIRECTORY), + equalTo(dirResource.getCurrentDirectory())); + assertThat( + fileResource.getCurrentDirectory().resolve( + "path", StandardResolveOptions.RESOLVE_DIRECTORY), + equalTo(dirResource.getCurrentDirectory())); + assertThat(dirResource.toString(), equalTo("/some/test/resource/path/")); + } + private void createFileWithContent(Path path, String content) throws Exception { try (Writer writer = Channels.newWriter( localFileSystem.create( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java index 2ae6b7e4cadd6..1b0bd9d2f22fc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.storage; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -37,6 +38,7 @@ import java.util.LinkedList; import java.util.List; import java.util.regex.Pattern; +import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.fs.CreateOptions; @@ -69,8 +71,7 @@ protected List match(List specs) throws IOException { List nonGlobs = Lists.newArrayList(); List isGlobBooleans = Lists.newArrayList(); - for (int i = 0; i < gcsPaths.size(); ++i) { - GcsPath path = gcsPaths.get(i); + for (GcsPath path : gcsPaths) { if (GcsUtil.isGlob(path)) { globs.add(path); isGlobBooleans.add(true); @@ -122,6 +123,22 @@ protected void delete(Collection resourceIds) throws IOException options.getGcsUtil().remove(toFilenames(resourceIds)); } + @Override + protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + if (isDirectory) { + if (!singleResourceSpec.endsWith("/")) { + singleResourceSpec += '/'; + } + } else { + checkArgument( + !singleResourceSpec.endsWith("/"), + "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.", + singleResourceSpec); + } + GcsPath path = GcsPath.fromUri(singleResourceSpec); + return GcsResourceId.fromGcsPath(path); + } + @Override protected void copy(List srcResourceIds, List destResourceIds) throws IOException { @@ -196,13 +213,15 @@ List matchNonGlobs(List gcsPaths) throws IOException { } private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) { - if (objectOrException.ioException() instanceof FileNotFoundException) { - return MatchResult.create(Status.NOT_FOUND, objectOrException.ioException()); - } else if (objectOrException.ioException() != null) { - return MatchResult.create(Status.ERROR, objectOrException.ioException()); + @Nullable IOException exception = objectOrException.ioException(); + if (exception instanceof FileNotFoundException) { + return MatchResult.create(Status.NOT_FOUND, exception); + } else if (exception != null) { + return MatchResult.create(Status.ERROR, exception); } else { - return MatchResult.create( - Status.OK, new Metadata[]{toMetadata(objectOrException.storageObject())}); + StorageObject object = objectOrException.storageObject(); + assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics. + return MatchResult.create(Status.OK, new Metadata[]{toMetadata(object)}); } } diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index f4e35ac95c0df..ca56a60be149b 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -68,4 +68,9 @@ protected void rename( protected void delete(Collection resourceIds) throws IOException { throw new UnsupportedOperationException(); } + + @Override + protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + throw new UnsupportedOperationException(); + } }