Skip to content

Commit

Permalink
[BEAM-59] Add FileSystems#matchNewResource
Browse files Browse the repository at this point in the history
The new FileSystems API needs a way to generate a ResourceId for a
resource that does not exist. This does not come up in sources, because
we typically are just matching existing files. However, sinks need the
ability to reference a new directory (say, in order to create it).

Couldn't think of anything better than a simple function that says
"treat this string as a full resource path with the specified type",
which is what FileSystems#matchNewResource is.
  • Loading branch information
dhalperi committed Apr 27, 2017
1 parent fdf2de9 commit f93a277
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 9 deletions.
Expand Up @@ -139,4 +139,16 @@ protected abstract void rename(
* to determine the state of the resources.
*/
protected abstract void delete(Collection<ResourceIdT> resourceIds) throws IOException;

/**
* Returns a new {@link ResourceId} for this filesystem that represents the named resource.
* The user supplies both the resource spec and whether it is a directory.
*
* <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
* any necessary escaping, for this {@link FileSystem}.
*
* <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
* such as when the specified {@code singleResourceSpec} is not a valid resource name.
*/
protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);
}
Expand Up @@ -485,4 +485,19 @@ public String apply(@Nonnull FileSystemRegistrar input) {
}
}
}

/**
* Returns a new {@link ResourceId} that represents the named resource of a type corresponding
* to the resource type.
*
* <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
* any necessary escaping, for the underlying {@link FileSystem}.
*
* <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
* such as when the specified {@code singleResourceSpec} is not a valid resource name.
*/
public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
return getFileSystemInternal(parseScheme(singleResourceSpec))
.matchNewResource(singleResourceSpec, isDirectory);
}
}
Expand Up @@ -34,6 +34,7 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -164,6 +165,12 @@ protected void delete(Collection<LocalResourceId> resourceIds) throws IOExceptio
}
}

@Override
protected LocalResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
Path path = Paths.get(singleResourceSpec);
return LocalResourceId.fromPath(path, isDirectory);
}

private MatchResult matchOne(String spec) throws IOException {
File file = Paths.get(spec).toFile();

Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;

/**
Expand All @@ -27,7 +28,21 @@
* <p>{@link ResourceId} is hierarchical and composed of a sequence of directory
* and file name elements separated by a special separator or delimiter.
*
* <p>TODO: add examples for how ResourceId is constructed and used.
* <p>{@link ResourceId ResourceIds} are created using {@link FileSystems}. The two primary
* mechanisms are:
*
* <ul>
* <li>{@link FileSystems#match(java.util.List)}, which takes a list of {@link String} resource
* names or globs, queries the {@link FileSystem} for resources matching these specifications,
* and returns a {@link MatchResult} for each glob. This is typically used when reading from
* files.
*
* <li>{@link FileSystems#matchNewResource(String, boolean)}, which takes a {@link String} full
* resource name and type (file or directory) and generates a {@link FileSystem}-specific
* {@code ResourceId} for that resource. This call does not verify the presence or absence of that
* resource in the file system. This call is typically used when creating new directories or files
* to generate {@link ResourceId ResourceIds} for resources that may not yet exist.
* </ul>
*/
public interface ResourceId extends Serializable {

Expand Down
Expand Up @@ -18,8 +18,10 @@
package org.apache.beam.sdk.io;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -301,6 +303,26 @@ public void testMatchWithoutParentDirectory() throws Exception {
toFilenames(localFileSystem.match(ImmutableList.of(pattern.toString()))).isEmpty());
}

@Test
public void testMatchNewResource() throws Exception {
LocalResourceId fileResource =
localFileSystem
.matchNewResource("/some/test/resource/path", false /* isDirectory */);
LocalResourceId dirResource =
localFileSystem
.matchNewResource("/some/test/resource/path", true /* isDirectory */);
assertNotEquals(fileResource, dirResource);
assertThat(
fileResource.getCurrentDirectory().resolve(
"path", StandardResolveOptions.RESOLVE_DIRECTORY),
equalTo(dirResource.getCurrentDirectory()));
assertThat(
fileResource.getCurrentDirectory().resolve(
"path", StandardResolveOptions.RESOLVE_DIRECTORY),
equalTo(dirResource.getCurrentDirectory()));
assertThat(dirResource.toString(), equalTo("/some/test/resource/path/"));
}

private void createFileWithContent(Path path, String content) throws Exception {
try (Writer writer = Channels.newWriter(
localFileSystem.create(
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.storage;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand All @@ -37,6 +38,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.CreateOptions;
Expand Down Expand Up @@ -69,8 +71,7 @@ protected List<MatchResult> match(List<String> specs) throws IOException {
List<GcsPath> nonGlobs = Lists.newArrayList();
List<Boolean> isGlobBooleans = Lists.newArrayList();

for (int i = 0; i < gcsPaths.size(); ++i) {
GcsPath path = gcsPaths.get(i);
for (GcsPath path : gcsPaths) {
if (GcsUtil.isGlob(path)) {
globs.add(path);
isGlobBooleans.add(true);
Expand Down Expand Up @@ -122,6 +123,22 @@ protected void delete(Collection<GcsResourceId> resourceIds) throws IOException
options.getGcsUtil().remove(toFilenames(resourceIds));
}

@Override
protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
if (isDirectory) {
if (!singleResourceSpec.endsWith("/")) {
singleResourceSpec += '/';
}
} else {
checkArgument(
!singleResourceSpec.endsWith("/"),
"Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.",
singleResourceSpec);
}
GcsPath path = GcsPath.fromUri(singleResourceSpec);
return GcsResourceId.fromGcsPath(path);
}

@Override
protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
throws IOException {
Expand Down Expand Up @@ -196,13 +213,15 @@ List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException {
}

private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) {
if (objectOrException.ioException() instanceof FileNotFoundException) {
return MatchResult.create(Status.NOT_FOUND, objectOrException.ioException());
} else if (objectOrException.ioException() != null) {
return MatchResult.create(Status.ERROR, objectOrException.ioException());
@Nullable IOException exception = objectOrException.ioException();
if (exception instanceof FileNotFoundException) {
return MatchResult.create(Status.NOT_FOUND, exception);
} else if (exception != null) {
return MatchResult.create(Status.ERROR, exception);
} else {
return MatchResult.create(
Status.OK, new Metadata[]{toMetadata(objectOrException.storageObject())});
StorageObject object = objectOrException.storageObject();
assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics.
return MatchResult.create(Status.OK, new Metadata[]{toMetadata(object)});
}
}

Expand Down
Expand Up @@ -68,4 +68,9 @@ protected void rename(
protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
throw new UnsupportedOperationException();
}

@Override
protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
throw new UnsupportedOperationException();
}
}

0 comments on commit f93a277

Please sign in to comment.