Skip to content

Commit

Permalink
This closes #2866
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed May 11, 2017
2 parents 62ee275 + 316ff6b commit 15bd3a3
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected List<MatchResult> match(List<String> specs) throws IOException {
List<Boolean> isGlobBooleans = Lists.newArrayList();

for (GcsPath path : gcsPaths) {
if (GcsUtil.isGlob(path)) {
if (GcsUtil.isWildcard(path)) {
globs.add(path);
isGlobBooleans.add(true);
} else {
Expand Down Expand Up @@ -178,8 +178,8 @@ public MatchResult apply(GcsPath gcsPath) {
*/
@VisibleForTesting
MatchResult expand(GcsPath gcsPattern) throws IOException {
String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject());
Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject()));
String prefix = GcsUtil.getNonWildcardPrefix(gcsPattern.getObject());
Pattern p = Pattern.compile(GcsUtil.wildcardToRegexp(gcsPattern.getObject()));

LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
prefix, p.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.gcsfs.GcsPath;

/**
Expand All @@ -47,8 +46,7 @@ public static GcsPathValidator fromOptions(PipelineOptions options) {
*/
@Override
public void validateInputFilePatternSupported(String filepattern) {
GcsPath gcsPath = getGcsPath(filepattern);
checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject()));
getGcsPath(filepattern);
verifyPath(filepattern);
verifyPathIsAccessible(filepattern, "Could not find file %s");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,6 @@ public static GcsUtil create(
/** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */
private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");

private static final String RECURSIVE_WILDCARD = "[*]{2}";

/**
* A {@link Pattern} for globs with a recursive wildcard.
*/
private static final Pattern RECURSIVE_GCS_PATTERN =
Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*");

/**
* Maximum number of requests permitted in a GCS batch request.
*/
Expand All @@ -159,23 +151,10 @@ public static GcsUtil create(
// Exposed for testing.
final ExecutorService executorService;

/**
* Returns true if the given GCS pattern is supported otherwise fails with an
* exception.
*/
public static boolean isGcsPatternSupported(String gcsPattern) {
if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) {
throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": "
+ " recursive wildcards are not supported.");
}
return true;
}

/**
* Returns the prefix portion of the glob that doesn't contain wildcards.
*/
public static String getGlobPrefix(String globExp) {
checkArgument(isGcsPatternSupported(globExp));
public static String getNonWildcardPrefix(String globExp) {
Matcher m = GLOB_PREFIX.matcher(globExp);
checkArgument(
m.matches(),
Expand All @@ -189,15 +168,15 @@ public static String getGlobPrefix(String globExp) {
* @param globExp the glob expression to expand
* @return a string with the regular expression this glob expands to
*/
public static String globToRegexp(String globExp) {
public static String wildcardToRegexp(String globExp) {
StringBuilder dst = new StringBuilder();
char[] src = globExp.toCharArray();
char[] src = globExp.replace("**/*", "**").toCharArray();
int i = 0;
while (i < src.length) {
char c = src[i++];
switch (c) {
case '*':
dst.append("[^/]*");
dst.append(".*");
break;
case '?':
dst.append("[^/]");
Expand Down Expand Up @@ -226,9 +205,9 @@ public static String globToRegexp(String globExp) {
}

/**
* Returns true if the given {@code spec} contains glob.
* Returns true if the given {@code spec} contains wildcard.
*/
public static boolean isGlob(GcsPath spec) {
public static boolean isWildcard(GcsPath spec) {
return GLOB_PREFIX.matcher(spec.getObject()).matches();
}

Expand All @@ -254,11 +233,14 @@ protected void setStorageClient(Storage storageClient) {
* exists.
*/
public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
checkArgument(isGcsPatternSupported(gcsPattern.getObject()));
Pattern p = null;
String prefix = null;
if (!isGlob(gcsPattern)) {
// Not a glob.
if (isWildcard(gcsPattern)) {
// Part before the first wildcard character.
prefix = getNonWildcardPrefix(gcsPattern.getObject());
p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
} else {
// Not a wildcard.
try {
// Use a get request to fetch the metadata of the object, and ignore the return value.
// The request has strong global consistency.
Expand All @@ -268,10 +250,6 @@ public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
// If the path was not found, return an empty list.
return ImmutableList.of();
}
} else {
// Part before the first wildcard character.
prefix = getGlobPrefix(gcsPattern.getObject());
p = Pattern.compile(globToRegexp(gcsPattern.getObject()));
}

LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,6 @@ public void testExpandNonGlob() throws Exception {
gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
}

// Patterns that contain recursive wildcards ('**') are not supported.
@Test
public void testRecursiveGlobExpansionFails() throws IOException {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Unsupported wildcard usage");
gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**"));
}

@Test
public void testMatchNonGlobs() throws Exception {
List<StorageObjectOrIOException> items = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ public class GcsUtilTest {

@Test
public void testGlobTranslation() {
assertEquals("foo", GcsUtil.globToRegexp("foo"));
assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o"));
assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?"));
assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*"));
assertEquals("foo", GcsUtil.wildcardToRegexp("foo"));
assertEquals("fo.*o", GcsUtil.wildcardToRegexp("fo*o"));
assertEquals("f.*o\\.[^/]", GcsUtil.wildcardToRegexp("f*o.?"));
assertEquals("foo-[0-9].*", GcsUtil.wildcardToRegexp("foo-[0-9]*"));
assertEquals(".*.*foo", GcsUtil.wildcardToRegexp("**/*foo"));
assertEquals(".*.*foo", GcsUtil.wildcardToRegexp("**foo"));
}

private static GcsOptions gcsOptionsWithTestCredential() {
Expand Down Expand Up @@ -260,16 +262,51 @@ public void testGlobExpansion() throws IOException {
}
}

// Patterns that contain recursive wildcards ('**') are not supported.
@Test
public void testRecursiveGlobExpansionFails() throws IOException {
public void testRecursiveGlobExpansion() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**");

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Unsupported wildcard usage");
gcsUtil.expand(pattern);
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);

Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class);

Objects modelObjects = new Objects();
List<StorageObject> items = new ArrayList<>();
// A directory
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));

// Files within the directory
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file1.txt"));
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file2.txt"));
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file3.txt"));
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/otherfile"));
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/anotherfile"));
items.add(new StorageObject().setBucket("testbucket").setName("test/file4.txt"));

modelObjects.setItems(items);

when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket", "test/directory/otherfile")).thenReturn(
mockStorageGet);
when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList);
when(mockStorageGet.execute()).thenReturn(
new StorageObject().setBucket("testbucket").setName("test/directory/otherfile"));
when(mockStorageList.execute()).thenReturn(modelObjects);

{
GcsPath pattern = GcsPath.fromUri("gs://testbucket/test/**/*.txt");
List<GcsPath> expectedFiles = ImmutableList.of(
GcsPath.fromUri("gs://testbucket/test/directory/file1.txt"),
GcsPath.fromUri("gs://testbucket/test/directory/file2.txt"),
GcsPath.fromUri("gs://testbucket/test/directory/file3.txt"),
GcsPath.fromUri("gs://testbucket/test/file4.txt"));

assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
}
}

// GCSUtil.expand() should fail when matching a single object when that object does not exist.
Expand Down

0 comments on commit 15bd3a3

Please sign in to comment.