From 42c2bc37d616e1a3fa0cffc46a8f51284fecdba7 Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 1 Feb 2017 17:55:30 -0800 Subject: [PATCH 1/9] [BEAM-59] Beam FileSystem: ResourceIdentifier and its local and gcs implementation. --- sdks/java/core/pom.xml | 6 + .../apache/beam/sdk/io/LocalResourceId.java | 113 ++++++++++ .../org/apache/beam/sdk/io/fs/ResourceId.java | 52 +++++ .../apache/beam/sdk/io/fs/package-info.java | 22 ++ .../apache/beam/sdk/util/gcsfs/GcsPath.java | 2 +- .../beam/sdk/io/LocalResourceIdTest.java | 199 ++++++++++++++++++ .../gcp/storage/GcsFileSystemRegistrar.java | 4 +- .../sdk/io/gcp/storage/GcsResourceId.java | 84 ++++++++ .../sdk/io/gcp/storage/GcsResourceIdTest.java | 94 +++++++++ 9 files changed, 574 insertions(+), 2 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/package-info.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index bb019c1459ff3..4da8d35f5adc2 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -371,6 +371,12 @@ 1.9 + + org.apache.commons + commons-lang3 + 3.5 + + joda-time joda-time diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java new file mode 100644 index 0000000000000..d3cc24de26b03 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkState; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; +import java.util.UUID; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.commons.lang3.SystemUtils; + +/** + * {@link ResourceId} implementation for local files. + */ +class LocalResourceId implements ResourceId { + + private final Path path; + private final boolean isDirectory; + + static LocalResourceId fromPath(Path path, boolean isDirectory) { + return new LocalResourceId(path, isDirectory); + } + + private LocalResourceId(Path path, boolean isDirectory) { + this.path = path.normalize(); + this.isDirectory = isDirectory; + } + + @Override + public ResourceId resolve(String other) { + checkState( + isDirectory, + String.format("Expected the path is a directory, but had [%s].", path)); + if (SystemUtils.IS_OS_WINDOWS) { + return resolveLocalPathWindowsOS(other); + } else { + return resolveLocalPath(other); + } + } + + @Override + public ResourceId getCurrentDirectory() { + if (isDirectory) { + return this; + } else { + return fromPath(path.getParent(), true /* isDirectory */); + } + } + + private LocalResourceId resolveLocalPath(String other) { + return new LocalResourceId( + path.resolve(other), + other.endsWith("/")); + } + + private LocalResourceId resolveLocalPathWindowsOS(String other) { + checkState( + path.endsWith("\\"), + String.format("Expected the path is a directory, but had [%s].", path)); + String uuid = UUID.randomUUID().toString(); + Path pathAsterisksReplaced = Paths.get(path.toString().replaceAll("\\*", uuid)); + String otherAsterisksReplaced = other.replaceAll("\\*", uuid); + + return new LocalResourceId( + Paths.get( + pathAsterisksReplaced.resolve(otherAsterisksReplaced) + .toString() + .replaceAll(uuid, "\\*")), + other.endsWith("\\")); + } + + @Override + public String getScheme() { + return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME; + } + + @Override + public String toString() { + return String.format("LocalResourceId: [%s]", path); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof LocalResourceId)) { + return false; + } + LocalResourceId other = (LocalResourceId) obj; + return this.path.equals(other.path) + && this.isDirectory == other.isDirectory; + } + + @Override + public int hashCode() { + return Objects.hash(path, isDirectory); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java new file mode 100644 index 0000000000000..bc2044e77252c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +/** + * An identifier which represents a resource. + * + *

{@code ResourceId} is hierarchical and composed of a sequence of directory + * and file name elements separated by a special separator or delimiter. + */ +public interface ResourceId { + + /** + * Resolves the given {@code String} against this {@code ResourceId}. + * + *

This {@code ResourceId} should represents a directory. + * + *

It is up to each file system to resolve in their own way. + */ + ResourceId resolve(String other); + + /** + * Returns the {@code ResourceId} that represents the current directory of + * this {@code ResourceId}. + * + *

If it is already a directory, trivially returns this. + */ + ResourceId getCurrentDirectory(); + + /** + * Get the scheme which defines the namespace of the {@link ResourceId}. + * + *

The scheme is required to follow URI scheme syntax. + * @see RFC 2396 + */ + String getScheme(); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/package-info.java new file mode 100644 index 0000000000000..2310582c9af51 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Beam FileSystem interfaces and their default implementations. + */ +package org.apache.beam.sdk.io.fs; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java index 863b01b8b83aa..8ab42169baee4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java @@ -440,7 +440,7 @@ public Path resolveSibling(Path other) { } @Override - public Path resolveSibling(String other) { + public GcsPath resolveSibling(String other) { if (getNameCount() < 2) { throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java new file mode 100644 index 0000000000000..54162e71a893f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.nio.file.Paths; +import org.apache.commons.lang3.SystemUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link LocalResourceId}. + */ +@RunWith(JUnit4.class) +public class LocalResourceIdTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testResolve() throws Exception { + if (SystemUtils.IS_OS_WINDOWS) { + // Skip tests + return; + } + // Tests for local files without the scheme. + assertEquals( + toResourceIdentifier("/root/tmp/aa"), + toResourceIdentifier("/root/tmp/").resolve("aa")); + assertEquals( + toResourceIdentifier("/root/tmp/aa/bb/cc/"), + toResourceIdentifier("/root/tmp/").resolve("aa/").resolve("bb/").resolve("cc/")); + + // Tests absolute path. + assertEquals( + toResourceIdentifier("/root/tmp/aa"), + toResourceIdentifier("/root/tmp/bb/").resolve("/root/tmp/aa")); + + // Tests empty authority and path. + assertEquals( + toResourceIdentifier("file:/aa"), + toResourceIdentifier("file:///").resolve("aa")); + + // Tests path with unicode + assertEquals( + toResourceIdentifier("/根目录/输出 文件01.txt"), + toResourceIdentifier("/根目录/").resolve("输出 文件01.txt")); + assertEquals( + toResourceIdentifier("file://根目录/输出 文件01.txt"), + toResourceIdentifier("file://根目录/").resolve("输出 文件01.txt")); + } + + @Test + public void testResolveNormalization() throws Exception { + if (SystemUtils.IS_OS_WINDOWS) { + // Skip tests + return; + } + // Tests normalization of "." and ".." + // + // Normalization is the implementation choice of LocalResourceId, + // and it is not required by ResourceId.resolve(). + assertEquals( + toResourceIdentifier("file://home/bb"), + toResourceIdentifier("file://root/../home/output/../") + .resolve("aa/") + .resolve("../") + .resolve("bb")); + assertEquals( + toResourceIdentifier("file://root/aa/bb"), + toResourceIdentifier("file://root/./") + .resolve("aa/") + .resolve("./") + .resolve("bb")); + assertEquals( + toResourceIdentifier("aa/bb"), + toResourceIdentifier("a/../") + .resolve("aa/") + .resolve("./") + .resolve("bb")); + assertEquals( + toResourceIdentifier("/aa/bb"), + toResourceIdentifier("/a/../") + .resolve("aa/") + .resolve("./") + .resolve("bb")); + + // Tests "./", "../", "~/". + assertEquals( + toResourceIdentifier("aa/bb"), + toResourceIdentifier("./") + .resolve("aa/") + .resolve("./") + .resolve("bb")); + assertEquals( + toResourceIdentifier("../aa/bb"), + toResourceIdentifier("../") + .resolve("aa/") + .resolve("./") + .resolve("bb")); + assertEquals( + toResourceIdentifier("~/aa/bb/"), + toResourceIdentifier("~/") + .resolve("aa/") + .resolve("./") + .resolve("bb/")); + } + + @Test + public void testResolveInvalidNotDirectory() throws Exception { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Expected the path is a directory, but had [/root/tmp]."); + toResourceIdentifier("/root/tmp").resolve("aa"); + } + + @Test + public void testResolveInWindowsOS() throws Exception { + if (!SystemUtils.IS_OS_WINDOWS) { + // Skip tests + return; + } + assertEquals( + toResourceIdentifier("C:\\my home\\out put"), + toResourceIdentifier("C:\\my home\\").resolve("out put")); + + assertEquals( + toResourceIdentifier("C:\\out put"), + toResourceIdentifier("C:\\my home\\") + .resolve("..\\") + .resolve(".\\") + .resolve("out put")); + + assertEquals( + toResourceIdentifier("C:\\my home\\**\\*"), + toResourceIdentifier("C:\\my home\\") + .resolve("**") + .resolve("*")); + } + + @Test + public void testGetCurrentDirectory() throws Exception { + // Tests for local files without the scheme. + assertEquals( + toResourceIdentifier("/root/tmp/"), + toResourceIdentifier("/root/tmp/").getCurrentDirectory()); + assertEquals( + toResourceIdentifier("/"), + toResourceIdentifier("/").getCurrentDirectory()); + + // Tests path with unicode + assertEquals( + toResourceIdentifier("/根目录/"), + toResourceIdentifier("/根目录/输出 文件01.txt").getCurrentDirectory()); + assertEquals( + toResourceIdentifier("file://根目录/"), + toResourceIdentifier("file://根目录/输出 文件01.txt").getCurrentDirectory()); + } + + @Test + public void testEquals() throws Exception { + assertEquals( + toResourceIdentifier("/root/tmp/"), + toResourceIdentifier("/root/tmp/")); + + assertNotEquals( + toResourceIdentifier("/root/tmp"), + toResourceIdentifier("/root/tmp/")); + } + + private LocalResourceId toResourceIdentifier(String str) throws Exception { + boolean isDirectory; + if (SystemUtils.IS_OS_WINDOWS) { + isDirectory = str.endsWith("\\"); + } else { + isDirectory = str.endsWith("/"); + } + return LocalResourceId.fromPath(Paths.get(str), isDirectory); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java index 10452a156d450..d7821e6e28b21 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java @@ -30,6 +30,8 @@ @AutoService(FileSystemRegistrar.class) public class GcsFileSystemRegistrar implements FileSystemRegistrar { + static final String GCS_SCHEME = "gs"; + @Override public FileSystem fromOptions(@Nonnull PipelineOptions options) { return new GcsFileSystem(options.as(GcsOptions.class)); @@ -37,6 +39,6 @@ public FileSystem fromOptions(@Nonnull PipelineOptions options) { @Override public String getScheme() { - return "gs"; + return GCS_SCHEME; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java new file mode 100644 index 0000000000000..336eae0084ed8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.storage; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.util.gcsfs.GcsPath; + +/** + * {@link ResourceId} implementation for Google Cloud Storage. + */ +public class GcsResourceId implements ResourceId { + + private final GcsPath gcsPath; + + static GcsResourceId fromGcsPath(GcsPath gcsPath) { + return new GcsResourceId(gcsPath); + } + + private GcsResourceId(GcsPath gcsPath) { + this.gcsPath = gcsPath; + } + + @Override + public ResourceId resolve(String other) { + checkState( + isDirectory(), + String.format("Expected the gcsPath is a directory, but had [%s].", gcsPath)); + return fromGcsPath(gcsPath.resolve(other)); + } + + @Override + public ResourceId getCurrentDirectory() { + if (isDirectory()) { + return this; + } else { + return fromGcsPath(gcsPath.getParent()); + } + } + + private boolean isDirectory() { + return gcsPath.endsWith("/"); + } + + @Override + public String getScheme() { + return GcsFileSystemRegistrar.GCS_SCHEME; + } + + @Override + public String toString() { + return String.format("GcsResourceId: [%s]", gcsPath); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GcsResourceId)) { + return false; + } + GcsResourceId other = (GcsResourceId) obj; + return this.gcsPath.equals(other.gcsPath); + } + + @Override + public int hashCode() { + return gcsPath.hashCode(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java new file mode 100644 index 0000000000000..b0080a5e23f31 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link GcsResourceId}. + */ +@RunWith(JUnit4.class) +public class GcsResourceIdTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testResolve() throws Exception { + // Tests for local files without the scheme. + assertEquals( + toResourceIdentifier("gs://bucket/tmp/aa"), + toResourceIdentifier("gs://bucket/tmp/").resolve("aa")); + assertEquals( + toResourceIdentifier("gs://bucket/tmp/aa/bb/cc/"), + toResourceIdentifier("gs://bucket/tmp/").resolve("aa/").resolve("bb/").resolve("cc/")); + + // Tests absolute path. + assertEquals( + toResourceIdentifier("gs://bucket/tmp/aa"), + toResourceIdentifier("gs://bucket/tmp/bb/").resolve("gs://bucket/tmp/aa")); + + // Tests path with unicode + assertEquals( + toResourceIdentifier("gs://bucket/输出 目录/输出 文件01.txt"), + toResourceIdentifier("gs://bucket/输出 目录/").resolve("输出 文件01.txt")); + } + + @Test + public void testResolveInvalidNotDirectory() throws Exception { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Expected the gcsPath is a directory, but had [gs://my_bucket/tmp dir]."); + toResourceIdentifier("gs://my_bucket/tmp dir").resolve("aa"); + } + + @Test + public void testGetCurrentDirectory() throws Exception { + // Tests for local files without the scheme. + assertEquals( + toResourceIdentifier("gs://my_bucket/tmp dir/"), + toResourceIdentifier("gs://my_bucket/tmp dir/").getCurrentDirectory()); + + // Tests path with unicode + assertEquals( + toResourceIdentifier("gs://my_bucket/输出 目录/"), + toResourceIdentifier("gs://my_bucket/输出 目录/文件01.txt").getCurrentDirectory()); + } + + @Test + public void testEquals() throws Exception { + assertEquals( + toResourceIdentifier("gs://my_bucket/tmp/"), + toResourceIdentifier("gs://my_bucket/tmp/")); + + assertNotEquals( + toResourceIdentifier("gs://my_bucket/tmp"), + toResourceIdentifier("gs://my_bucket/tmp/")); + } + + private GcsResourceId toResourceIdentifier(String str) throws Exception { + return GcsResourceId.fromGcsPath(GcsPath.fromUri(str)); + } +} From 551b15a839230c79904e0aea1dcf0e2fa28104d1 Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 2 Feb 2017 12:13:47 -0800 Subject: [PATCH 2/9] fixup! add ResolveOptions. --- .../apache/beam/sdk/io/LocalResourceId.java | 36 ++++-- .../apache/beam/sdk/io/fs/ResolveOptions.java | 41 +++++++ .../org/apache/beam/sdk/io/fs/ResourceId.java | 17 ++- .../beam/sdk/io/LocalResourceIdTest.java | 103 ++++++++++++------ .../sdk/io/gcp/storage/GcsResourceId.java | 28 ++++- .../sdk/io/gcp/storage/GcsResourceIdTest.java | 39 ++++++- 6 files changed, 210 insertions(+), 54 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResolveOptions.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java index d3cc24de26b03..f74d0980d840b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java @@ -17,12 +17,16 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Objects; import java.util.UUID; +import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.commons.lang3.SystemUtils; @@ -35,6 +39,7 @@ class LocalResourceId implements ResourceId { private final boolean isDirectory; static LocalResourceId fromPath(Path path, boolean isDirectory) { + checkNotNull(path, "path"); return new LocalResourceId(path, isDirectory); } @@ -44,14 +49,22 @@ private LocalResourceId(Path path, boolean isDirectory) { } @Override - public ResourceId resolve(String other) { + public ResourceId resolve(String other, ResolveOptions resolveOptions) { checkState( isDirectory, String.format("Expected the path is a directory, but had [%s].", path)); + checkArgument( + resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE) + || resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY), + String.format("ResolveOptions: [%s] is not supported.", resolveOptions)); + checkArgument( + !(resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE) + && other.endsWith("/")), + "The resolved file: [%s] should not end with '/'.", other); if (SystemUtils.IS_OS_WINDOWS) { - return resolveLocalPathWindowsOS(other); + return resolveLocalPathWindowsOS(other, resolveOptions); } else { - return resolveLocalPath(other); + return resolveLocalPath(other, resolveOptions); } } @@ -60,20 +73,21 @@ public ResourceId getCurrentDirectory() { if (isDirectory) { return this; } else { - return fromPath(path.getParent(), true /* isDirectory */); + return fromPath( + checkNotNull( + path.getParent(), + String.format("Path: [%s] doesn't have the current directory.", path)), + true /* isDirectory */); } } - private LocalResourceId resolveLocalPath(String other) { + private LocalResourceId resolveLocalPath(String other, ResolveOptions resolveOptions) { return new LocalResourceId( path.resolve(other), - other.endsWith("/")); + resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY)); } - private LocalResourceId resolveLocalPathWindowsOS(String other) { - checkState( - path.endsWith("\\"), - String.format("Expected the path is a directory, but had [%s].", path)); + private LocalResourceId resolveLocalPathWindowsOS(String other, ResolveOptions resolveOptions) { String uuid = UUID.randomUUID().toString(); Path pathAsterisksReplaced = Paths.get(path.toString().replaceAll("\\*", uuid)); String otherAsterisksReplaced = other.replaceAll("\\*", uuid); @@ -83,7 +97,7 @@ private LocalResourceId resolveLocalPathWindowsOS(String other) { pathAsterisksReplaced.resolve(otherAsterisksReplaced) .toString() .replaceAll(uuid, "\\*")), - other.endsWith("\\")); + resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY)); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResolveOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResolveOptions.java new file mode 100644 index 0000000000000..3dded82869cb3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResolveOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +/** + * An object that configures {@link ResourceId#resolve}. + */ +public interface ResolveOptions { + + /** + * Defines the standard resolve options. + */ + enum StandardResolveOptions implements ResolveOptions { + /** + * Resolve a file. + */ + RESOLVE_FILE, + + /** + * Resolve a directory. + * + *

This requires {@link ResourceId} implementation to append a delimiter. + */ + RESOLVE_DIRECTORY, + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java index bc2044e77252c..1707016b050b9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.fs; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; + /** * An identifier which represents a resource. * @@ -28,11 +30,24 @@ public interface ResourceId { /** * Resolves the given {@code String} against this {@code ResourceId}. * + *

In order to write file system agnostic code, callers should not include delimiters + * in {@code other}, and should use {@link StandardResolveOptions} to specify + * whether to resolve a file or a directory. + * + *

For example: + * + *

{@code
+   * ResourceId homeDir = ...
+   * ResourceId tempOutput = homeDir
+   *     .resolve("tempDir", StandardResolveOptions.RESOLVE_DIRECTORY)
+   *     .resolve("output", StandardResolveOptions.RESOLVE_FILE);
+   * }
+ * *

This {@code ResourceId} should represents a directory. * *

It is up to each file system to resolve in their own way. */ - ResourceId resolve(String other); + ResourceId resolve(String other, ResolveOptions resolveOptions); /** * Returns the {@code ResourceId} that represents the current directory of diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java index 54162e71a893f..52f013f9be6e3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertNotEquals; import java.nio.file.Paths; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.commons.lang3.SystemUtils; import org.junit.Rule; import org.junit.Test; @@ -46,28 +48,36 @@ public void testResolve() throws Exception { // Tests for local files without the scheme. assertEquals( toResourceIdentifier("/root/tmp/aa"), - toResourceIdentifier("/root/tmp/").resolve("aa")); + toResourceIdentifier("/root/tmp/") + .resolve("aa", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("/root/tmp/aa/bb/cc/"), - toResourceIdentifier("/root/tmp/").resolve("aa/").resolve("bb/").resolve("cc/")); + toResourceIdentifier("/root/tmp/") + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("cc", StandardResolveOptions.RESOLVE_DIRECTORY)); // Tests absolute path. assertEquals( toResourceIdentifier("/root/tmp/aa"), - toResourceIdentifier("/root/tmp/bb/").resolve("/root/tmp/aa")); + toResourceIdentifier("/root/tmp/bb/") + .resolve("/root/tmp/aa", StandardResolveOptions.RESOLVE_FILE)); // Tests empty authority and path. assertEquals( toResourceIdentifier("file:/aa"), - toResourceIdentifier("file:///").resolve("aa")); + toResourceIdentifier("file:///") + .resolve("aa", StandardResolveOptions.RESOLVE_FILE)); // Tests path with unicode assertEquals( toResourceIdentifier("/根目录/输出 文件01.txt"), - toResourceIdentifier("/根目录/").resolve("输出 文件01.txt")); + toResourceIdentifier("/根目录/") + .resolve("输出 文件01.txt", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("file://根目录/输出 文件01.txt"), - toResourceIdentifier("file://根目录/").resolve("输出 文件01.txt")); + toResourceIdentifier("file://根目录/") + .resolve("输出 文件01.txt", StandardResolveOptions.RESOLVE_FILE)); } @Test @@ -83,54 +93,71 @@ public void testResolveNormalization() throws Exception { assertEquals( toResourceIdentifier("file://home/bb"), toResourceIdentifier("file://root/../home/output/../") - .resolve("aa/") - .resolve("../") - .resolve("bb")); + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("..", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("file://root/aa/bb"), toResourceIdentifier("file://root/./") - .resolve("aa/") - .resolve("./") - .resolve("bb")); + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve(".", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("aa/bb"), toResourceIdentifier("a/../") - .resolve("aa/") - .resolve("./") - .resolve("bb")); + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve(".", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("/aa/bb"), toResourceIdentifier("/a/../") - .resolve("aa/") - .resolve("./") - .resolve("bb")); + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve(".", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_FILE)); // Tests "./", "../", "~/". assertEquals( toResourceIdentifier("aa/bb"), toResourceIdentifier("./") - .resolve("aa/") - .resolve("./") - .resolve("bb")); + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve(".", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("../aa/bb"), toResourceIdentifier("../") - .resolve("aa/") - .resolve("./") - .resolve("bb")); + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve(".", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("~/aa/bb/"), toResourceIdentifier("~/") - .resolve("aa/") - .resolve("./") - .resolve("bb/")); + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve(".", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_DIRECTORY)); + } + + @Test + public void testResolveHandleBadInputs() throws Exception { + assertEquals( + toResourceIdentifier("/root/tmp/"), + toResourceIdentifier("/root/") + .resolve("tmp/", StandardResolveOptions.RESOLVE_DIRECTORY)); + } + + @Test + public void testResolveInvalidInputs() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("The resolved file: [tmp/] should not end with '/'."); + toResourceIdentifier("/root/").resolve("tmp/", StandardResolveOptions.RESOLVE_FILE); } @Test public void testResolveInvalidNotDirectory() throws Exception { + ResourceId tmp = toResourceIdentifier("/root/") + .resolve("tmp", StandardResolveOptions.RESOLVE_FILE); thrown.expect(IllegalStateException.class); thrown.expectMessage("Expected the path is a directory, but had [/root/tmp]."); - toResourceIdentifier("/root/tmp").resolve("aa"); + tmp.resolve("aa", StandardResolveOptions.RESOLVE_FILE); } @Test @@ -141,20 +168,21 @@ public void testResolveInWindowsOS() throws Exception { } assertEquals( toResourceIdentifier("C:\\my home\\out put"), - toResourceIdentifier("C:\\my home\\").resolve("out put")); + toResourceIdentifier("C:\\my home\\") + .resolve("out put", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("C:\\out put"), toResourceIdentifier("C:\\my home\\") - .resolve("..\\") - .resolve(".\\") - .resolve("out put")); + .resolve("..", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve(".", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("out put", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("C:\\my home\\**\\*"), toResourceIdentifier("C:\\my home\\") - .resolve("**") - .resolve("*")); + .resolve("**", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("*", StandardResolveOptions.RESOLVE_FILE)); } @Test @@ -176,6 +204,13 @@ public void testGetCurrentDirectory() throws Exception { toResourceIdentifier("file://根目录/输出 文件01.txt").getCurrentDirectory()); } + @Test + public void testGetCurrentDirectoryInvalid() throws Exception { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Path: [output] doesn't have the current directory."); + toResourceIdentifier("output").getCurrentDirectory(); + } + @Test public void testEquals() throws Exception { assertEquals( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java index 336eae0084ed8..d3c04089d437d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java @@ -17,8 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.storage; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.util.gcsfs.GcsPath; @@ -30,6 +34,7 @@ public class GcsResourceId implements ResourceId { private final GcsPath gcsPath; static GcsResourceId fromGcsPath(GcsPath gcsPath) { + checkNotNull(gcsPath, "gcsPath"); return new GcsResourceId(gcsPath); } @@ -38,11 +43,30 @@ private GcsResourceId(GcsPath gcsPath) { } @Override - public ResourceId resolve(String other) { + public ResourceId resolve(String other, ResolveOptions resolveOptions) { checkState( isDirectory(), String.format("Expected the gcsPath is a directory, but had [%s].", gcsPath)); - return fromGcsPath(gcsPath.resolve(other)); + checkArgument( + resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE) + || resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY), + String.format("ResolveOptions: [%s] is not supported.", resolveOptions)); + if (resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)) { + checkArgument( + !other.endsWith("/"), + "The resolved file: [%s] should not end with '/'.", other); + return fromGcsPath(gcsPath.resolve(other)); + } else { + // StandardResolveOptions.RESOLVE_DIRECTORY + if (other.endsWith("/")) { + // other already contains the delimiter for gcs. + // It is not recommended for callers to set the delimiter. + // However, we consider it as a valid input. + return fromGcsPath(gcsPath.resolve(other)); + } else { + return fromGcsPath(gcsPath.resolve(other + "/")); + } + } } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java index b0080a5e23f31..40a0b82debc96 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.junit.Rule; import org.junit.Test; @@ -38,30 +40,55 @@ public class GcsResourceIdTest { @Test public void testResolve() throws Exception { - // Tests for local files without the scheme. + // Tests for common gcs paths. assertEquals( toResourceIdentifier("gs://bucket/tmp/aa"), - toResourceIdentifier("gs://bucket/tmp/").resolve("aa")); + toResourceIdentifier("gs://bucket/tmp/") + .resolve("aa", StandardResolveOptions.RESOLVE_FILE)); assertEquals( toResourceIdentifier("gs://bucket/tmp/aa/bb/cc/"), - toResourceIdentifier("gs://bucket/tmp/").resolve("aa/").resolve("bb/").resolve("cc/")); + toResourceIdentifier("gs://bucket/tmp/") + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("cc", StandardResolveOptions.RESOLVE_DIRECTORY)); // Tests absolute path. assertEquals( toResourceIdentifier("gs://bucket/tmp/aa"), - toResourceIdentifier("gs://bucket/tmp/bb/").resolve("gs://bucket/tmp/aa")); + toResourceIdentifier("gs://bucket/tmp/bb/") + .resolve("gs://bucket/tmp/aa", StandardResolveOptions.RESOLVE_FILE)); // Tests path with unicode assertEquals( toResourceIdentifier("gs://bucket/输出 目录/输出 文件01.txt"), - toResourceIdentifier("gs://bucket/输出 目录/").resolve("输出 文件01.txt")); + toResourceIdentifier("gs://bucket/输出 目录/") + .resolve("输出 文件01.txt", StandardResolveOptions.RESOLVE_FILE)); + } + + @Test + public void testResolveHandleBadInputs() throws Exception { + assertEquals( + toResourceIdentifier("gs://my_bucket/tmp/"), + toResourceIdentifier("gs://my_bucket/") + .resolve("tmp/", StandardResolveOptions.RESOLVE_DIRECTORY)); + } + + @Test + public void testResolveInvalidInputs() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "The resolved file: [tmp/] should not end with '/'."); + toResourceIdentifier("gs://my_bucket/").resolve("tmp/", StandardResolveOptions.RESOLVE_FILE); } @Test public void testResolveInvalidNotDirectory() throws Exception { + ResourceId tmpDir = toResourceIdentifier("gs://my_bucket/") + .resolve("tmp dir", StandardResolveOptions.RESOLVE_FILE); + thrown.expect(IllegalStateException.class); thrown.expectMessage("Expected the gcsPath is a directory, but had [gs://my_bucket/tmp dir]."); - toResourceIdentifier("gs://my_bucket/tmp dir").resolve("aa"); + tmpDir.resolve("aa", StandardResolveOptions.RESOLVE_FILE); } @Test From 0a5094a3f0a1b8fef6ab72c7c2e3b8831019879c Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 2 Feb 2017 13:00:57 -0800 Subject: [PATCH 3/9] fixup! address findbugs. --- .../main/resources/beam/findbugs-filter.xml | 18 ++++++++++++++++++ .../apache/beam/sdk/io/LocalResourceId.java | 8 +++++--- .../beam/sdk/io/gcp/storage/GcsResourceId.java | 6 +++++- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 91ab9beb1430d..40f854e865e37 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -166,6 +166,24 @@ + + + + + + + + + + + + + + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java index f74d0980d840b..c29a9c3adaf33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java @@ -73,10 +73,12 @@ public ResourceId getCurrentDirectory() { if (isDirectory) { return this; } else { + Path parent = path.getParent(); + checkState( + parent != null, + String.format("Path: [%s] doesn't have the current directory.", path)); return fromPath( - checkNotNull( - path.getParent(), - String.format("Path: [%s] doesn't have the current directory.", path)), + parent, true /* isDirectory */); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java index d3c04089d437d..e0470fe1129ec 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java @@ -74,7 +74,11 @@ public ResourceId getCurrentDirectory() { if (isDirectory()) { return this; } else { - return fromGcsPath(gcsPath.getParent()); + GcsPath parent = gcsPath.getParent(); + checkState( + parent != null, + String.format("Path: [%s] doesn't have the current directory.", gcsPath)); + return fromGcsPath(parent); } } From 91d8a9b8462f6f1118e35aec3517b8f3885976e3 Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 3 Feb 2017 14:58:40 -0800 Subject: [PATCH 4/9] fixup! addressed comments. --- pom.xml | 7 +++++++ sdks/java/core/pom.xml | 1 - .../org/apache/beam/sdk/io/fs/ResourceId.java | 13 +++++++++++++ .../apache/beam/sdk/util/gcsfs/GcsPath.java | 2 +- .../sdk/io/gcp/storage/GcsResourceIdTest.java | 19 +++++++++++++++---- 5 files changed, 36 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 4ed676e469781..dd7e43ce12a54 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,7 @@ + 3.5 2.24.0 1.8.1 v2-rev295-1.22.0 @@ -452,6 +453,12 @@ ${project.version} + + org.apache.commons + commons-lang3 + ${apache.commons.lang.version} + + io.grpc grpc-all diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 4da8d35f5adc2..fd83dbe9e9ed5 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -374,7 +374,6 @@ org.apache.commons commons-lang3 - 3.5 diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java index 1707016b050b9..d607e4a06ebe0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java @@ -46,6 +46,19 @@ public interface ResourceId { *

This {@code ResourceId} should represents a directory. * *

It is up to each file system to resolve in their own way. + * + *

Resolving special characters: + *

+ * + * @throws IllegalArgumentException if other contains illegal characters or is an illegal name. + * It is recommended that callers use common characters, such as [_a-zA-Z0-9-], in {@code other}. */ ResourceId resolve(String other, ResolveOptions resolveOptions); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java index 8ab42169baee4..863b01b8b83aa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java @@ -440,7 +440,7 @@ public Path resolveSibling(Path other) { } @Override - public GcsPath resolveSibling(String other) { + public Path resolveSibling(String other) { if (getNameCount() < 2) { throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java index 40a0b82debc96..ee76ca5fc615a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java @@ -76,8 +76,7 @@ public void testResolveHandleBadInputs() throws Exception { @Test public void testResolveInvalidInputs() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "The resolved file: [tmp/] should not end with '/'."); + thrown.expectMessage("The resolved file: [tmp/] should not end with '/'."); toResourceIdentifier("gs://my_bucket/").resolve("tmp/", StandardResolveOptions.RESOLVE_FILE); } @@ -93,15 +92,27 @@ public void testResolveInvalidNotDirectory() throws Exception { @Test public void testGetCurrentDirectory() throws Exception { - // Tests for local files without the scheme. + // Tests gcs paths. assertEquals( toResourceIdentifier("gs://my_bucket/tmp dir/"), toResourceIdentifier("gs://my_bucket/tmp dir/").getCurrentDirectory()); - // Tests path with unicode + // Tests path with unicode. assertEquals( toResourceIdentifier("gs://my_bucket/输出 目录/"), toResourceIdentifier("gs://my_bucket/输出 目录/文件01.txt").getCurrentDirectory()); + + // Tests bucket with no ending '/'. + assertEquals( + toResourceIdentifier("gs://my_bucket/"), + toResourceIdentifier("gs://my_bucket").getCurrentDirectory()); + } + + @Test + public void testInvalidGcsPath() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid GCS URI: gs://"); + toResourceIdentifier("gs://"); } @Test From d02d1ea51a623f00e3aa40853cd8149cb2e95db4 Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 3 Feb 2017 15:46:56 -0800 Subject: [PATCH 5/9] fixup! addressed comments. --- .../apache/beam/sdk/io/LocalResourceIdTest.java | 9 +++++++++ .../sdk/io/gcp/storage/GcsResourceIdTest.java | 15 +++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java index 52f013f9be6e3..213837c41460c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java @@ -211,6 +211,15 @@ public void testGetCurrentDirectoryInvalid() throws Exception { toResourceIdentifier("output").getCurrentDirectory(); } + @Test + public void testGetScheme() throws Exception { + // Tests for local files without the scheme. + assertEquals("file", toResourceIdentifier("/root/tmp/").getScheme()); + + // Tests path with unicode. + assertEquals("file", toResourceIdentifier("file://根目录/").getScheme()); + } + @Test public void testEquals() throws Exception { assertEquals( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java index ee76ca5fc615a..db282658e2a9c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java @@ -58,6 +58,12 @@ public void testResolve() throws Exception { toResourceIdentifier("gs://bucket/tmp/bb/") .resolve("gs://bucket/tmp/aa", StandardResolveOptions.RESOLVE_FILE)); + // Tests bucket with no ending '/'. + assertEquals( + toResourceIdentifier("gs://my_bucket/tmp"), + toResourceIdentifier("gs://my_bucket") + .resolve("tmp", StandardResolveOptions.RESOLVE_FILE)); + // Tests path with unicode assertEquals( toResourceIdentifier("gs://bucket/输出 目录/输出 文件01.txt"), @@ -115,6 +121,15 @@ public void testInvalidGcsPath() throws Exception { toResourceIdentifier("gs://"); } + @Test + public void testGetScheme() throws Exception { + // Tests gcs paths. + assertEquals("gs", toResourceIdentifier("gs://my_bucket/tmp dir/").getScheme()); + + // Tests bucket with no ending '/'. + assertEquals("gs", toResourceIdentifier("gs://my_bucket").getScheme()); + } + @Test public void testEquals() throws Exception { assertEquals( From 72cb08d0836a536058999031589057ebfbab9cda Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 6 Feb 2017 14:34:53 -0800 Subject: [PATCH 6/9] fixup! addressed tests. --- runners/apex/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 002bf9aa01961..d81f85c85d36f 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -96,6 +96,11 @@
+ + org.apache.commons + commons-lang3 + + com.google.code.findbugs From 31df2b3089a26cf9467c74974e964e96f5df8035 Mon Sep 17 00:00:00 2001 From: Pei He Date: Tue, 7 Feb 2017 13:36:13 -0800 Subject: [PATCH 7/9] fixup! addressed comments. --- .../apache/beam/sdk/io/LocalResourceId.java | 5 +++- .../org/apache/beam/sdk/io/fs/ResourceId.java | 23 +++++++++++-------- .../apache/beam/sdk/io/fs/package-info.java | 2 +- .../beam/sdk/io/LocalResourceIdTest.java | 23 +++++++++++-------- .../sdk/io/gcp/storage/GcsResourceId.java | 2 +- 5 files changed, 33 insertions(+), 22 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java index c29a9c3adaf33..c0723c7f1898f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java @@ -74,9 +74,12 @@ public ResourceId getCurrentDirectory() { return this; } else { Path parent = path.getParent(); + if (parent == null && path.getNameCount() == 1) { + parent = Paths.get("."); + } checkState( parent != null, - String.format("Path: [%s] doesn't have the current directory.", path)); + String.format("Failed to get the current directory for path: [%s].", path)); return fromPath( parent, true /* isDirectory */); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java index d607e4a06ebe0..eed785565431c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java @@ -20,15 +20,17 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; /** - * An identifier which represents a resource. + * An identifier which represents a file-like resource. * - *

{@code ResourceId} is hierarchical and composed of a sequence of directory + *

{@link ResourceId} is hierarchical and composed of a sequence of directory * and file name elements separated by a special separator or delimiter. + * + * TODO: add examples for how ResourceId is constructed and used. */ public interface ResourceId { /** - * Resolves the given {@code String} against this {@code ResourceId}. + * Returns a child {@code ResourceId} under {@code this}. * *

In order to write file system agnostic code, callers should not include delimiters * in {@code other}, and should use {@link StandardResolveOptions} to specify @@ -37,13 +39,13 @@ public interface ResourceId { *

For example: * *

{@code
-   * ResourceId homeDir = ...
+   * ResourceId homeDir = ...;
    * ResourceId tempOutput = homeDir
    *     .resolve("tempDir", StandardResolveOptions.RESOLVE_DIRECTORY)
    *     .resolve("output", StandardResolveOptions.RESOLVE_FILE);
    * }
* - *

This {@code ResourceId} should represents a directory. + *

This {@link ResourceId} should represents a directory. * *

It is up to each file system to resolve in their own way. * @@ -57,8 +59,11 @@ public interface ResourceId { * returns a {@code ResourceId} which matches all directories in this {@code ResourceId}. * * - * @throws IllegalArgumentException if other contains illegal characters or is an illegal name. - * It is recommended that callers use common characters, such as [_a-zA-Z0-9-], in {@code other}. + * @throws IllegalStateException if this {@link ResourceId} is not a directory. + * + * @throws IllegalArgumentException if {@code other} contains illegal characters + * or is an illegal name. It is recommended that callers use common characters, + * such as {@code [_a-zA-Z0-9.-]}, in {@code other}. */ ResourceId resolve(String other, ResolveOptions resolveOptions); @@ -73,8 +78,8 @@ public interface ResourceId { /** * Get the scheme which defines the namespace of the {@link ResourceId}. * - *

The scheme is required to follow URI scheme syntax. - * @see RFC 2396 + *

The scheme is required to follow URI scheme syntax. See + * RFC 2396 */ String getScheme(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/package-info.java index 2310582c9af51..ef6d519ee79db 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/package-info.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/package-info.java @@ -17,6 +17,6 @@ */ /** - * Beam FileSystem interfaces and their default implementations. + * Apache Beam FileSystem interfaces and their default implementations. */ package org.apache.beam.sdk.io.fs; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java index 213837c41460c..94e7620afa228 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java @@ -40,7 +40,7 @@ public class LocalResourceIdTest { public ExpectedException thrown = ExpectedException.none(); @Test - public void testResolve() throws Exception { + public void testResolveInUnix() throws Exception { if (SystemUtils.IS_OS_WINDOWS) { // Skip tests return; @@ -81,7 +81,7 @@ public void testResolve() throws Exception { } @Test - public void testResolveNormalization() throws Exception { + public void testResolveNormalizationInUnix() throws Exception { if (SystemUtils.IS_OS_WINDOWS) { // Skip tests return; @@ -137,7 +137,11 @@ public void testResolveNormalization() throws Exception { } @Test - public void testResolveHandleBadInputs() throws Exception { + public void testResolveHandleBadInputsInUnix() throws Exception { + if (SystemUtils.IS_OS_WINDOWS) { + // Skip tests + return; + } assertEquals( toResourceIdentifier("/root/tmp/"), toResourceIdentifier("/root/") @@ -186,7 +190,7 @@ public void testResolveInWindowsOS() throws Exception { } @Test - public void testGetCurrentDirectory() throws Exception { + public void testGetCurrentDirectoryInUnix() throws Exception { // Tests for local files without the scheme. assertEquals( toResourceIdentifier("/root/tmp/"), @@ -202,13 +206,12 @@ public void testGetCurrentDirectory() throws Exception { assertEquals( toResourceIdentifier("file://根目录/"), toResourceIdentifier("file://根目录/输出 文件01.txt").getCurrentDirectory()); - } - @Test - public void testGetCurrentDirectoryInvalid() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Path: [output] doesn't have the current directory."); - toResourceIdentifier("output").getCurrentDirectory(); + + // Tests path without parent. + assertEquals( + toResourceIdentifier("./"), + toResourceIdentifier("output").getCurrentDirectory()); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java index e0470fe1129ec..a03bebf06f789 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java @@ -77,7 +77,7 @@ public ResourceId getCurrentDirectory() { GcsPath parent = gcsPath.getParent(); checkState( parent != null, - String.format("Path: [%s] doesn't have the current directory.", gcsPath)); + String.format("Failed to get the current directory for path: [%s].", gcsPath)); return fromGcsPath(parent); } } From 30ce18c86186adf9f8913760e71af4552c0b3f31 Mon Sep 17 00:00:00 2001 From: Pei He Date: Tue, 7 Feb 2017 15:30:36 -0800 Subject: [PATCH 8/9] fixup! addressed checkstyle. --- .../src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java index eed785565431c..2bdd6604e06be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java @@ -25,7 +25,7 @@ *

{@link ResourceId} is hierarchical and composed of a sequence of directory * and file name elements separated by a special separator or delimiter. * - * TODO: add examples for how ResourceId is constructed and used. + *

TODO: add examples for how ResourceId is constructed and used. */ public interface ResourceId { From 4cbb1676812d80eda46d58944233efc1bf041533 Mon Sep 17 00:00:00 2001 From: Pei He Date: Tue, 7 Feb 2017 16:24:15 -0800 Subject: [PATCH 9/9] fixup! addressed findbugs. --- .../build-tools/src/main/resources/beam/findbugs-filter.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 40f854e865e37..983b40229ddbe 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -169,7 +169,7 @@ - +