Skip to content

Commit

Permalink
Merge 4cbb167 into 570ddb7
Browse files Browse the repository at this point in the history
  • Loading branch information
peihe committed Feb 8, 2017
2 parents 570ddb7 + 4cbb167 commit 21502e1
Show file tree
Hide file tree
Showing 12 changed files with 823 additions and 1 deletion.
7 changes: 7 additions & 0 deletions pom.xml
Expand Up @@ -101,6 +101,7 @@
<beamSurefireArgline />

<!-- If updating dependencies, please update any relevant javadoc offlineLinks -->
<apache.commons.lang.version>3.5</apache.commons.lang.version>
<apex.kryo.version>2.24.0</apex.kryo.version>
<avro.version>1.8.1</avro.version>
<bigquery.version>v2-rev295-1.22.0</bigquery.version>
Expand Down Expand Up @@ -458,6 +459,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${apache.commons.lang.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions runners/apex/pom.xml
Expand Up @@ -96,6 +96,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<!-- javax.annotation.Nullable -->
<groupId>com.google.code.findbugs</groupId>
Expand Down
18 changes: 18 additions & 0 deletions sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
Expand Up @@ -166,6 +166,24 @@
<!-- Intentionally overriding parent name because inheritors should replace the parent. -->
</Match>

<Match>
<Class name="org.apache.beam.sdk.io.LocalResourceId"/>
<Method name="getCurrentDirectory" />
<Bug pattern="NP_NULL_PARAM_DEREF"/>
<!--
Path.getParent() could return null. However, we check the returned Path is not null.
-->
</Match>

<Match>
<Class name="org.apache.beam.sdk.io.gcp.storage.GcsResourceId"/>
<Method name="getCurrentDirectory" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
<!--
GcsPath.getParent() could return null. However, we check the returned Path is not null.
-->
</Match>

<Match>
<Class name="org.apache.beam.sdk.util.ZipFiles"/>
<Method name="zipDirectory" />
Expand Down
5 changes: 5 additions & 0 deletions sdks/java/core/pom.xml
Expand Up @@ -371,6 +371,11 @@
<version>1.9</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.UUID;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.commons.lang3.SystemUtils;

/**
* {@link ResourceId} implementation for local files.
*/
class LocalResourceId implements ResourceId {

private final Path path;
private final boolean isDirectory;

static LocalResourceId fromPath(Path path, boolean isDirectory) {
checkNotNull(path, "path");
return new LocalResourceId(path, isDirectory);
}

private LocalResourceId(Path path, boolean isDirectory) {
this.path = path.normalize();
this.isDirectory = isDirectory;
}

@Override
public ResourceId resolve(String other, ResolveOptions resolveOptions) {
checkState(
isDirectory,
String.format("Expected the path is a directory, but had [%s].", path));
checkArgument(
resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
|| resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY),
String.format("ResolveOptions: [%s] is not supported.", resolveOptions));
checkArgument(
!(resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
&& other.endsWith("/")),
"The resolved file: [%s] should not end with '/'.", other);
if (SystemUtils.IS_OS_WINDOWS) {
return resolveLocalPathWindowsOS(other, resolveOptions);
} else {
return resolveLocalPath(other, resolveOptions);
}
}

@Override
public ResourceId getCurrentDirectory() {
if (isDirectory) {
return this;
} else {
Path parent = path.getParent();
if (parent == null && path.getNameCount() == 1) {
parent = Paths.get(".");
}
checkState(
parent != null,
String.format("Failed to get the current directory for path: [%s].", path));
return fromPath(
parent,
true /* isDirectory */);
}
}

private LocalResourceId resolveLocalPath(String other, ResolveOptions resolveOptions) {
return new LocalResourceId(
path.resolve(other),
resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY));
}

private LocalResourceId resolveLocalPathWindowsOS(String other, ResolveOptions resolveOptions) {
String uuid = UUID.randomUUID().toString();
Path pathAsterisksReplaced = Paths.get(path.toString().replaceAll("\\*", uuid));
String otherAsterisksReplaced = other.replaceAll("\\*", uuid);

return new LocalResourceId(
Paths.get(
pathAsterisksReplaced.resolve(otherAsterisksReplaced)
.toString()
.replaceAll(uuid, "\\*")),
resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY));
}

@Override
public String getScheme() {
return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME;
}

@Override
public String toString() {
return String.format("LocalResourceId: [%s]", path);
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof LocalResourceId)) {
return false;
}
LocalResourceId other = (LocalResourceId) obj;
return this.path.equals(other.path)
&& this.isDirectory == other.isDirectory;
}

@Override
public int hashCode() {
return Objects.hash(path, isDirectory);
}
}
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.fs;

/**
* An object that configures {@link ResourceId#resolve}.
*/
public interface ResolveOptions {

/**
* Defines the standard resolve options.
*/
enum StandardResolveOptions implements ResolveOptions {
/**
* Resolve a file.
*/
RESOLVE_FILE,

/**
* Resolve a directory.
*
* <p>This requires {@link ResourceId} implementation to append a delimiter.
*/
RESOLVE_DIRECTORY,
}
}
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.fs;

import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;

/**
* An identifier which represents a file-like resource.
*
* <p>{@link ResourceId} is hierarchical and composed of a sequence of directory
* and file name elements separated by a special separator or delimiter.
*
* <p>TODO: add examples for how ResourceId is constructed and used.
*/
public interface ResourceId {

/**
* Returns a child {@code ResourceId} under {@code this}.
*
* <p>In order to write file system agnostic code, callers should not include delimiters
* in {@code other}, and should use {@link StandardResolveOptions} to specify
* whether to resolve a file or a directory.
*
* <p>For example:
*
* <pre>{@code
* ResourceId homeDir = ...;
* ResourceId tempOutput = homeDir
* .resolve("tempDir", StandardResolveOptions.RESOLVE_DIRECTORY)
* .resolve("output", StandardResolveOptions.RESOLVE_FILE);
* }</pre>
*
* <p>This {@link ResourceId} should represents a directory.
*
* <p>It is up to each file system to resolve in their own way.
*
* <p>Resolving special characters:
* <ul>
* <li>{@code resourceId.resolve("..", StandardResolveOptions.RESOLVE_DIRECTORY)} returns
* the parent directory of this {@code ResourceId}.
* <li>{@code resourceId.resolve("{@literal *}", StandardResolveOptions.RESOLVE_FILE)} returns
* a {@code ResourceId} which matches all files in this {@code ResourceId}.
* <li>{@code resourceId.resolve("{@literal *}", StandardResolveOptions.RESOLVE_DIRECTORY)}
* returns a {@code ResourceId} which matches all directories in this {@code ResourceId}.
* </ul>
*
* @throws IllegalStateException if this {@link ResourceId} is not a directory.
*
* @throws IllegalArgumentException if {@code other} contains illegal characters
* or is an illegal name. It is recommended that callers use common characters,
* such as {@code [_a-zA-Z0-9.-]}, in {@code other}.
*/
ResourceId resolve(String other, ResolveOptions resolveOptions);

/**
* Returns the {@code ResourceId} that represents the current directory of
* this {@code ResourceId}.
*
* <p>If it is already a directory, trivially returns this.
*/
ResourceId getCurrentDirectory();

/**
* Get the scheme which defines the namespace of the {@link ResourceId}.
*
* <p>The scheme is required to follow URI scheme syntax. See
* <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
*/
String getScheme();
}
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Apache Beam FileSystem interfaces and their default implementations.
*/
package org.apache.beam.sdk.io.fs;

0 comments on commit 21502e1

Please sign in to comment.