Skip to content

Commit

Permalink
Merge 12e6d90 into 0c740f4
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed Apr 27, 2017
2 parents 0c740f4 + 12e6d90 commit 27ad9f8
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 8 deletions.
Expand Up @@ -139,4 +139,16 @@ protected abstract void rename(
* to determine the state of the resources.
*/
protected abstract void delete(Collection<ResourceIdT> resourceIds) throws IOException;

/**
* Returns a new {@link ResourceId} for this filesystem that represents the named resource.
* The user supplies both the resource spec and whether it is a directory.
*
* <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
* any necessary escaping, for this {@link FileSystem}.
*
* <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
* such as when the specified {@code singleResourceSpec} is not a valid resource name.
*/
protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);
}
Expand Up @@ -485,4 +485,19 @@ public String apply(@Nonnull FileSystemRegistrar input) {
}
}
}

/**
* Returns a new {@link ResourceId} that represents the named resource of a type corresponding
* to the resource type.
*
* <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
* any necessary escaping, for the underlying {@link FileSystem}.
*
* <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
* such as when the specified {@code singleResourceSpec} is not a valid resource name.
*/
public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
return getFileSystemInternal(parseScheme(singleResourceSpec))
.matchNewResource(singleResourceSpec, isDirectory);
}
}
Expand Up @@ -34,6 +34,7 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -164,6 +165,12 @@ protected void delete(Collection<LocalResourceId> resourceIds) throws IOExceptio
}
}

@Override
protected LocalResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
Path path = Paths.get(singleResourceSpec);
return LocalResourceId.fromPath(path, isDirectory);
}

private MatchResult matchOne(String spec) throws IOException {
File file = Paths.get(spec).toFile();

Expand Down
Expand Up @@ -18,8 +18,10 @@
package org.apache.beam.sdk.io;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -301,6 +303,26 @@ public void testMatchWithoutParentDirectory() throws Exception {
toFilenames(localFileSystem.match(ImmutableList.of(pattern.toString()))).isEmpty());
}

@Test
public void testMatchNewResource() throws Exception {
LocalResourceId fileResource =
localFileSystem
.matchNewResource("/some/test/resource/path", false /* isDirectory */);
LocalResourceId dirResource =
localFileSystem
.matchNewResource("/some/test/resource/path", true /* isDirectory */);
assertNotEquals(fileResource, dirResource);
assertThat(
fileResource.getCurrentDirectory().resolve(
"path", StandardResolveOptions.RESOLVE_DIRECTORY),
equalTo(dirResource.getCurrentDirectory()));
assertThat(
fileResource.getCurrentDirectory().resolve(
"path", StandardResolveOptions.RESOLVE_DIRECTORY),
equalTo(dirResource.getCurrentDirectory()));
assertThat(dirResource.toString(), equalTo("/some/test/resource/path/"));
}

private void createFileWithContent(Path path, String content) throws Exception {
try (Writer writer = Channels.newWriter(
localFileSystem.create(
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.storage;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand All @@ -37,6 +38,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
Expand Down Expand Up @@ -69,8 +71,7 @@ protected List<MatchResult> match(List<String> specs) throws IOException {
List<GcsPath> nonGlobs = Lists.newArrayList();
List<Boolean> isGlobBooleans = Lists.newArrayList();

for (int i = 0; i < gcsPaths.size(); ++i) {
GcsPath path = gcsPaths.get(i);
for (GcsPath path : gcsPaths) {
if (GcsUtil.isGlob(path)) {
globs.add(path);
isGlobBooleans.add(true);
Expand Down Expand Up @@ -122,6 +123,22 @@ protected void delete(Collection<GcsResourceId> resourceIds) throws IOException
options.getGcsUtil().remove(toFilenames(resourceIds));
}

@Override
protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
if (isDirectory) {
if (!singleResourceSpec.endsWith("/")) {
singleResourceSpec += '/';
}
} else {
checkArgument(
!singleResourceSpec.endsWith("/"),
"Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.",
singleResourceSpec);
}
GcsPath path = GcsPath.fromUri(singleResourceSpec);
return GcsResourceId.fromGcsPath(path);
}

@Override
protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
throws IOException {
Expand Down Expand Up @@ -196,13 +213,15 @@ List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException {
}

private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) {
if (objectOrException.ioException() instanceof FileNotFoundException) {
return MatchResult.create(Status.NOT_FOUND, objectOrException.ioException());
} else if (objectOrException.ioException() != null) {
return MatchResult.create(Status.ERROR, objectOrException.ioException());
@Nullable IOException exception = objectOrException.ioException();
if (exception instanceof FileNotFoundException) {
return MatchResult.create(Status.NOT_FOUND, exception);
} else if (exception != null) {
return MatchResult.create(Status.ERROR, exception);
} else {
return MatchResult.create(
Status.OK, new Metadata[]{toMetadata(objectOrException.storageObject())});
StorageObject object = objectOrException.storageObject();
assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics.
return MatchResult.create(Status.OK, new Metadata[]{toMetadata(object)});
}
}

Expand Down
Expand Up @@ -68,4 +68,9 @@ protected void rename(
protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
throw new UnsupportedOperationException();
}

@Override
protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
throw new UnsupportedOperationException();
}
}

0 comments on commit 27ad9f8

Please sign in to comment.