Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-59] Add FileSystems#matchNewResource #2728

Merged
merged 1 commit into from Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -139,4 +139,16 @@ protected abstract void rename(
* to determine the state of the resources.
*/
protected abstract void delete(Collection<ResourceIdT> resourceIds) throws IOException;

/**
* Returns a new {@link ResourceId} for this filesystem that represents the named resource.
* The user supplies both the resource spec and whether it is a directory.
*
* <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
* any necessary escaping, for this {@link FileSystem}.
*
* <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
* such as when the specified {@code singleResourceSpec} is not a valid resource name.
*/
protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);
}
Expand Up @@ -485,4 +485,19 @@ public String apply(@Nonnull FileSystemRegistrar input) {
}
}
}

/**
* Returns a new {@link ResourceId} that represents the named resource of a type corresponding
* to the resource type.
*
* <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
* any necessary escaping, for the underlying {@link FileSystem}.
*
* <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
* such as when the specified {@code singleResourceSpec} is not a valid resource name.
*/
public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
return getFileSystemInternal(parseScheme(singleResourceSpec))
.matchNewResource(singleResourceSpec, isDirectory);
}
}
Expand Up @@ -34,6 +34,7 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -164,6 +165,12 @@ protected void delete(Collection<LocalResourceId> resourceIds) throws IOExceptio
}
}

@Override
protected LocalResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
Path path = Paths.get(singleResourceSpec);
return LocalResourceId.fromPath(path, isDirectory);
}

private MatchResult matchOne(String spec) throws IOException {
File file = Paths.get(spec).toFile();

Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;

/**
Expand All @@ -27,7 +28,21 @@
* <p>{@link ResourceId} is hierarchical and composed of a sequence of directory
* and file name elements separated by a special separator or delimiter.
*
* <p>TODO: add examples for how ResourceId is constructed and used.
* <p>{@link ResourceId ResourceIds} are created using {@link FileSystems}. The two primary
* mechanisms are:
*
* <ul>
* <li>{@link FileSystems#match(java.util.List)}, which takes a list of {@link String} resource
* names or globs, queries the {@link FileSystem} for resources matching these specifications,
* and returns a {@link MatchResult} for each glob. This is typically used when reading from
* files.
*
* <li>{@link FileSystems#matchNewResource(String, boolean)}, which takes a {@link String} full
* resource name and type (file or directory) and generates a {@link FileSystem}-specific
* {@code ResourceId} for that resource. This call does not verify the presence or absence of that
* resource in the file system. This call is typically used when creating new directories or files
* to generate {@link ResourceId ResourceIds} for resources that may not yet exist.
* </ul>
*/
public interface ResourceId extends Serializable {

Expand Down
Expand Up @@ -18,8 +18,10 @@
package org.apache.beam.sdk.io;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -301,6 +303,26 @@ public void testMatchWithoutParentDirectory() throws Exception {
toFilenames(localFileSystem.match(ImmutableList.of(pattern.toString()))).isEmpty());
}

@Test
public void testMatchNewResource() throws Exception {
LocalResourceId fileResource =
localFileSystem
.matchNewResource("/some/test/resource/path", false /* isDirectory */);
LocalResourceId dirResource =
localFileSystem
.matchNewResource("/some/test/resource/path", true /* isDirectory */);
assertNotEquals(fileResource, dirResource);
assertThat(
fileResource.getCurrentDirectory().resolve(
"path", StandardResolveOptions.RESOLVE_DIRECTORY),
equalTo(dirResource.getCurrentDirectory()));
assertThat(
fileResource.getCurrentDirectory().resolve(
"path", StandardResolveOptions.RESOLVE_DIRECTORY),
equalTo(dirResource.getCurrentDirectory()));
assertThat(dirResource.toString(), equalTo("/some/test/resource/path/"));
}

private void createFileWithContent(Path path, String content) throws Exception {
try (Writer writer = Channels.newWriter(
localFileSystem.create(
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.storage;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand All @@ -37,6 +38,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.CreateOptions;
Expand Down Expand Up @@ -69,8 +71,7 @@ protected List<MatchResult> match(List<String> specs) throws IOException {
List<GcsPath> nonGlobs = Lists.newArrayList();
List<Boolean> isGlobBooleans = Lists.newArrayList();

for (int i = 0; i < gcsPaths.size(); ++i) {
GcsPath path = gcsPaths.get(i);
for (GcsPath path : gcsPaths) {
if (GcsUtil.isGlob(path)) {
globs.add(path);
isGlobBooleans.add(true);
Expand Down Expand Up @@ -122,6 +123,22 @@ protected void delete(Collection<GcsResourceId> resourceIds) throws IOException
options.getGcsUtil().remove(toFilenames(resourceIds));
}

@Override
protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
if (isDirectory) {
if (!singleResourceSpec.endsWith("/")) {
singleResourceSpec += '/';
}
} else {
checkArgument(
!singleResourceSpec.endsWith("/"),
"Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.",
singleResourceSpec);
}
GcsPath path = GcsPath.fromUri(singleResourceSpec);
return GcsResourceId.fromGcsPath(path);
}

@Override
protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
throws IOException {
Expand Down Expand Up @@ -196,13 +213,15 @@ List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException {
}

private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) {
if (objectOrException.ioException() instanceof FileNotFoundException) {
return MatchResult.create(Status.NOT_FOUND, objectOrException.ioException());
} else if (objectOrException.ioException() != null) {
return MatchResult.create(Status.ERROR, objectOrException.ioException());
@Nullable IOException exception = objectOrException.ioException();
if (exception instanceof FileNotFoundException) {
return MatchResult.create(Status.NOT_FOUND, exception);
} else if (exception != null) {
return MatchResult.create(Status.ERROR, exception);
} else {
return MatchResult.create(
Status.OK, new Metadata[]{toMetadata(objectOrException.storageObject())});
StorageObject object = objectOrException.storageObject();
assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics.
return MatchResult.create(Status.OK, new Metadata[]{toMetadata(object)});
}
}

Expand Down
Expand Up @@ -68,4 +68,9 @@ protected void rename(
protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
throw new UnsupportedOperationException();
}

@Override
protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
throw new UnsupportedOperationException();
}
}