Skip to content

Commit

Permalink
HDFS-13934. Multipart uploaders to be created through FileSystem/File…
Browse files Browse the repository at this point in the history
…Context.

Contributed by Steve Loughran.

Change-Id: Iebd34140c1a0aa71f44a3f4d0fee85f6bdf123a3
  • Loading branch information
steveloughran committed Jul 13, 2020
1 parent b97fea6 commit b9fa5e0
Show file tree
Hide file tree
Showing 45 changed files with 2,603 additions and 794 deletions.
Expand Up @@ -1382,4 +1382,34 @@ public boolean hasPathCapability(final Path path,
return false;
}
}

/**
* Create a multipart uploader.
* @param basePath file path under which all files are uploaded
* @return a MultipartUploaderBuilder object to build the uploader
* @throws IOException if some early checks cause IO failures.
* @throws UnsupportedOperationException if support is checked early.
*/
@InterfaceStability.Unstable
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
throws IOException {
methodNotSupported();
return null;
}

/**
* Helper method that throws an {@link UnsupportedOperationException} for the
* current {@link FileSystem} method being called.
*/
protected final void methodNotSupported() {
// The order of the stacktrace elements is (from top to bottom):
// - java.lang.Thread.getStackTrace
// - org.apache.hadoop.fs.FileSystem.methodNotSupported
// - <the FileSystem method>
// therefore, to find out the current method name, we use the element at
// index 2.
String name = Thread.currentThread().getStackTrace()[2].getMethodName();
throw new UnsupportedOperationException(getClass().getCanonicalName() +
" does not support method " + name);
}
}
Expand Up @@ -131,4 +131,12 @@ private CommonPathCapabilities() {
@InterfaceStability.Unstable
public static final String FS_EXPERIMENTAL_BATCH_LISTING =
"fs.capability.batch.listing";

/**
* Does the store support multipart uploading?
* Value: {@value}.
*/
public static final String FS_MULTIPART_UPLOADER =
"fs.capability.multipart.uploader";

}
Expand Up @@ -2957,4 +2957,31 @@ public boolean hasPathCapability(Path path, String capability)
(fs, p) -> fs.hasPathCapability(p, capability));
}

/**
* Return a set of server default configuration values based on path.
* @param path path to fetch server defaults
* @return server default configuration values for path
* @throws IOException an I/O error occurred
*/
public FsServerDefaults getServerDefaults(final Path path)
throws IOException {
return FsLinkResolution.resolve(this,
fixRelativePart(path),
(fs, p) -> fs.getServerDefaults(p));
}

/**
* Create a multipart uploader.
* @param basePath file path under which all files are uploaded
* @return a MultipartUploaderBuilder object to build the uploader
* @throws IOException if some early checks cause IO failures.
* @throws UnsupportedOperationException if support is checked early.
*/
@InterfaceStability.Unstable
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
throws IOException {
return FsLinkResolution.resolve(this,
fixRelativePart(basePath),
(fs, p) -> fs.createMultipartUploader(p));
}
}
Expand Up @@ -132,22 +132,35 @@
* New methods may be marked as Unstable or Evolving for their initial release,
* as a warning that they are new and may change based on the
* experience of use in applications.
* <p></p>
* <b>Important note for developers</b>
*
* If you're making changes here to the public API or protected methods,
* <p></p>
* If you are making changes here to the public API or protected methods,
* you must review the following subclasses and make sure that
* they are filtering/passing through new methods as appropriate.
* <p></p>
*
* {@link FilterFileSystem}: methods are passed through.
* {@link FilterFileSystem}: methods are passed through. If not,
* then {@code TestFilterFileSystem.MustNotImplement} must be
* updated with the unsupported interface.
* Furthermore, if the new API's support is probed for via
* {@link #hasPathCapability(Path, String)} then
* {@link FilterFileSystem#hasPathCapability(Path, String)}
* must return false, always.
* <p></p>
* {@link ChecksumFileSystem}: checksums are created and
* verified.
* <p></p>
* {@code TestHarFileSystem} will need its {@code MustNotImplement}
* interface updated.
* <p></p>
*
* There are some external places your changes will break things.
* Do co-ordinate changes here.
* <p></p>
*
* HBase: HBoss
* <p></p>
* Hive: HiveShim23
* {@code shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java}
*
Expand Down Expand Up @@ -4644,4 +4657,17 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {

}

/**
* Create a multipart uploader.
* @param basePath file path under which all files are uploaded
* @return a MultipartUploaderBuilder object to build the uploader
* @throws IOException if some early checks cause IO failures.
* @throws UnsupportedOperationException if support is checked early.
*/
@InterfaceStability.Unstable
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
throws IOException {
methodNotSupported();
return null;
}
}
Expand Up @@ -41,6 +41,8 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.Progressable;

import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;

/****************************************************************
* A <code>FilterFileSystem</code> contains
* some other file system, which it uses as
Expand Down Expand Up @@ -728,7 +730,16 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
return fs.hasPathCapability(path, capability);
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
case CommonPathCapabilities.FS_EXPERIMENTAL_BATCH_LISTING:
// operations known to be unsupported, irrespective of what
// the wrapped class implements.
return false;
default:
// the feature is not implemented.
return fs.hasPathCapability(path, capability);
}
}

}
Expand Up @@ -448,4 +448,10 @@ public boolean hasPathCapability(final Path path,
throws IOException {
return myFs.hasPathCapability(path, capability);
}

@Override
public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
throws IOException {
return myFs.createMultipartUploader(basePath);
}
}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -15,26 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemMultipartUploader;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.MultipartUploaderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
package org.apache.hadoop.fs;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;


/**
* Support for HDFS multipart uploads, built on
* {@link FileSystem#concat(Path, Path[])}.
* This method allows access to Package-scoped operations from classes
* in org.apache.hadoop.fs.impl and other file system implementations
* in the hadoop modules.
* This is absolutely not for used by any other application or library.
*/
public class DFSMultipartUploaderFactory extends MultipartUploaderFactory {
protected MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) {
if (fs.getScheme().equals(HdfsConstants.HDFS_URI_SCHEME)) {
return new FileSystemMultipartUploader(fs);
}
return null;
@InterfaceAudience.Private
public class InternalOperations {

@SuppressWarnings("deprecation") // rename w/ OVERWRITE
public void rename(FileSystem fs, final Path src, final Path dst,
final Options.Rename...options) throws IOException {
fs.rename(src, dst, options);
}
}

0 comments on commit b9fa5e0

Please sign in to comment.