Skip to content

Commit

Permalink
Merge 2c9babb into 9151676
Browse files Browse the repository at this point in the history
  • Loading branch information
peihe committed Feb 16, 2017
2 parents 9151676 + 2c9babb commit 38c53d8
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 2 deletions.
Expand Up @@ -225,6 +225,13 @@ public static String globToRegexp(String globExp) {
return dst.toString();
}

/**
* Returns true if the given {@code spec} contains glob.
*/
public static boolean isGlob(GcsPath spec) {
return GLOB_PREFIX.matcher(spec.getObject()).matches();
}

private GcsUtil(
Storage storageClient,
HttpRequestInitializer httpRequestInitializer,
Expand All @@ -250,7 +257,7 @@ public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
checkArgument(isGcsPatternSupported(gcsPattern.getObject()));
Pattern p = null;
String prefix = null;
if (!GLOB_PREFIX.matcher(gcsPattern.getObject()).matches()) {
if (!isGlob(gcsPattern)) {
// Not a glob.
try {
// Use a get request to fetch the metadata of the object, and ignore the return value.
Expand Down
Expand Up @@ -18,19 +18,22 @@
package org.apache.beam.sdk.io.gcp.storage;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -60,7 +63,39 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {

@Override
protected List<MatchResult> match(List<String> specs) throws IOException {
throw new UnsupportedOperationException();
List<GcsPath> gcsPaths = toGcsPaths(specs);

List<GcsPath> globs = Lists.newArrayList();
List<GcsPath> nonGlobs = Lists.newArrayList();
List<Boolean> isGlobBooleans = Lists.newArrayList();

for (int i = 0; i < gcsPaths.size(); ++i) {
GcsPath path = gcsPaths.get(i);
if (GcsUtil.isGlob(path)) {
globs.add(path);
isGlobBooleans.add(true);
} else {
nonGlobs.add(path);
isGlobBooleans.add(false);
}
}

Iterator<MatchResult> globsMatchResults = matchGlobs(globs).iterator();
Iterator<MatchResult> nonGlobsMatchResults = matchNonGlobs(nonGlobs).iterator();

ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
for (Boolean isGlob : isGlobBooleans) {
if (isGlob) {
checkState(globsMatchResults.hasNext(), "Expect globsMatchResults has next.");
ret.add(globsMatchResults.next());
} else {
checkState(nonGlobsMatchResults.hasNext(), "Expect nonGlobsMatchResults has next.");
ret.add(nonGlobsMatchResults.next());
}
}
checkState(!globsMatchResults.hasNext(), "Expect no more elements in globsMatchResults.");
checkState(!nonGlobsMatchResults.hasNext(), "Expect no more elements in nonGlobsMatchResults.");
return ret.build();
}

@Override
Expand Down Expand Up @@ -93,6 +128,21 @@ protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> dest
options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
}

private List<MatchResult> matchGlobs(List<GcsPath> globs) {
// TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503.
return FluentIterable.from(globs)
.transform(new Function<GcsPath, MatchResult>() {
@Override
public MatchResult apply(GcsPath gcsPath) {
try {
return expand(gcsPath);
} catch (IOException e) {
return MatchResult.create(Status.ERROR, e);
}
}})
.toList();
}

/**
* Expands a pattern into {@link MatchResult}.
*
Expand Down Expand Up @@ -179,4 +229,14 @@ public String apply(GcsResourceId resource) {
}})
.toList();
}

private List<GcsPath> toGcsPaths(Collection<String> specs) {
return FluentIterable.from(specs)
.transform(new Function<String, GcsPath>() {
@Override
public GcsPath apply(String spec) {
return GcsPath.fromUri(spec);
}})
.toList();
}
}
Expand Up @@ -73,6 +73,56 @@ public void setUp() {
gcsFileSystem = new GcsFileSystem(gcsOptions);
}

@Test
public void testMatch() throws Exception {
Objects modelObjects = new Objects();
List<StorageObject> items = new ArrayList<>();
// A directory
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));

// Files within the directory
items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */));
items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */));
items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */));
items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */));
items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize */));
items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize */));

modelObjects.setItems(items);
when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
.thenReturn(modelObjects);

List<GcsPath> gcsPaths = ImmutableList.of(
GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"),
GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));

when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException()),
StorageObjectOrIOException.create(
createStorageObject("gs://testbucket/testdirectory/otherfile", 4L))));

List<String> specs = ImmutableList.of(
"gs://testbucket/testdirectory/file[1-3]*",
"gs://testbucket/testdirectory/non-exist-file",
"gs://testbucket/testdirectory/otherfile");
List<MatchResult> matchResults = gcsFileSystem.match(specs);
assertEquals(3, matchResults.size());
assertEquals(Status.OK, matchResults.get(0).status());
assertThat(
ImmutableList.of(
"gs://testbucket/testdirectory/file1name",
"gs://testbucket/testdirectory/file2name",
"gs://testbucket/testdirectory/file3name"),
contains(toFilenames(matchResults.get(0)).toArray()));
assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
assertEquals(Status.OK, matchResults.get(2).status());
assertThat(
ImmutableList.of("gs://testbucket/testdirectory/otherfile"),
contains(toFilenames(matchResults.get(2)).toArray()));

}

@Test
public void testGlobExpansion() throws IOException {
Objects modelObjects = new Objects();
Expand Down

0 comments on commit 38c53d8

Please sign in to comment.