Skip to content
Permalink
Browse files

Add a UFS to read from files hosted using http/https protocols

#8574

pr-link: #8996
change-id: cid-cf2732558bbdc467c6c81a1e5a306708b0f5371f
  • Loading branch information...
IT-DONG authored and alluxio-bot committed May 15, 2019
1 parent 784fe19 commit fe853db776e29a685a0a02898728a05b098585cd
Showing with 783 additions and 17 deletions.
  1. +2 −0 core/base/src/main/java/alluxio/Constants.java
  2. +1 −1 core/common/src/main/java/alluxio/concurrent/ManagedBlockingUfsForwarder.java
  3. +34 −0 core/common/src/main/java/alluxio/conf/PropertyKey.java
  4. +1 −1 core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java
  5. +5 −1 core/common/src/main/java/alluxio/underfs/UnderFileSystem.java
  6. +1 −1 core/common/src/main/java/alluxio/underfs/UnderFileSystemWithLogging.java
  7. +8 −0 core/common/src/main/java/alluxio/util/UnderFileSystemUtils.java
  8. +105 −8 core/common/src/main/java/alluxio/util/network/HttpUtils.java
  9. +6 −0 core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java
  10. +1 −1 shell/src/main/java/alluxio/cli/fs/command/LsCommand.java
  11. +1 −1 tests/src/test/java/alluxio/testutils/underfs/delegating/DelegatingUnderFileSystem.java
  12. +1 −1 tests/src/test/java/alluxio/testutils/underfs/sleeping/SleepingUnderFileSystem.java
  13. +1 −1 underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java
  14. +1 −1 underfs/local/src/main/java/alluxio/underfs/local/LocalUnderFileSystem.java
  15. +1 −0 underfs/pom.xml
  16. +59 −0 underfs/web/pom.xml
  17. +362 −0 underfs/web/src/main/java/alluxio/underfs/web/WebUnderFileSystem.java
  18. +48 −0 underfs/web/src/main/java/alluxio/underfs/web/WebUnderFileSystemFactory.java
  19. +12 −0 underfs/web/src/main/resources/META-INF/services/alluxio.underfs.UnderFileSystemFactory
  20. +49 −0 underfs/web/src/test/java/alluxio/underfs/web/WebUnderFileSystemFactoryTest.java
  21. +56 −0 underfs/web/src/test/java/alluxio/underfs/web/WebUnderFileSystemTest.java
  22. +28 −0 underfs/web/src/test/resources/log4j.properties
@@ -72,6 +72,8 @@
// See https://cloud.google.com/storage/docs/cloud-console
public static final String HEADER_GCS = "gs://";
public static final String HEADER_COS = "cos://";
public static final String HEADER_HTTP = "http://";
public static final String HEADER_HTTPS = "https://";
public static final String HEADER_KODO = "kodo://";

public static final int MAX_PORT = 65535;
@@ -515,7 +515,7 @@ public Void execute() throws IOException {
}

@Override
public boolean supportsFlush() {
public boolean supportsFlush() throws IOException {
return mUfs.supportsFlush();
}

@@ -588,6 +588,34 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_WEB_HEADER_LAST_MODIFIED =
new Builder(Name.UNDERFS_WEB_HEADER_LAST_MODIFIED)
.setDefaultValue("EEE, dd MMM yyyy HH:mm:ss zzz")
.setDescription("Date format of last modified for a http response header.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_WEB_CONNECTION_TIMEOUT =
new Builder(Name.UNDERFS_WEB_CONNECTION_TIMEOUT)
.setDefaultValue("60s")
.setDescription("Default timeout for a http connection.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_WEB_PARENT_NAMES =
new Builder(Name.UNDERFS_WEB_PARENT_NAMES)
.setDefaultValue("Parent Directory,..,../")
.setDescription("The text of the http link for the parent directory.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_WEB_TITLES =
new Builder(Name.UNDERFS_WEB_TITLES)
.setDefaultValue("Index of ,Directory listing for ")
.setDescription("The title of the content for a http url.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_OBJECT_STORE_MULTI_RANGE_CHUNK_SIZE =
new Builder(Name.UNDERFS_OBJECT_STORE_MULTI_RANGE_CHUNK_SIZE)
.setDefaultValue(String.format("${%s}", Name.USER_BLOCK_SIZE_BYTES_DEFAULT))
@@ -3566,6 +3594,12 @@ private static String javadocLink(String fullyQualifiedClassname) {
public static final String UNDERFS_HDFS_IMPL = "alluxio.underfs.hdfs.impl";
public static final String UNDERFS_HDFS_PREFIXES = "alluxio.underfs.hdfs.prefixes";
public static final String UNDERFS_HDFS_REMOTE = "alluxio.underfs.hdfs.remote";
public static final String UNDERFS_WEB_HEADER_LAST_MODIFIED =
"alluxio.underfs.web.header.last.modified";
public static final String UNDERFS_WEB_CONNECTION_TIMEOUT =
"alluxio.underfs.web.connnection.timeout";
public static final String UNDERFS_WEB_PARENT_NAMES = "alluxio.underfs.web.parent.names";
public static final String UNDERFS_WEB_TITLES = "alluxio.underfs.web.titles";
public static final String UNDERFS_VERSION = "alluxio.underfs.version";
public static final String UNDERFS_OBJECT_STORE_SERVICE_THREADS =
"alluxio.underfs.object.store.service.threads";
@@ -748,7 +748,7 @@ public boolean renameRenamableFile(String src, String dst) throws IOException {
}

@Override
public boolean supportsFlush() {
public boolean supportsFlush() throws IOException {
return false;
}

@@ -38,6 +38,7 @@
import java.util.List;
import java.util.Map;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/**
@@ -321,6 +322,7 @@ public int getValue() {
* return null if ACL is unsupported or disabled
* @throws IOException if ACL is supported and enabled but cannot be retrieved
*/
@Nullable
Pair<AccessControlList, DefaultAccessControlList> getAclPair(String path) throws IOException;

/**
@@ -520,6 +522,7 @@ public int getValue() {
* this abstract pathname. The array will be empty if the directory is empty. Returns
* {@code null} if this abstract pathname does not denote a directory.
*/
@Nullable
UfsStatus[] listStatus(String path) throws IOException;

/**
@@ -542,6 +545,7 @@ public int getValue() {
* abstract pathname. The array will be empty if the directory is empty. Returns
* {@code null} if this abstract pathname does not denote a directory.
*/
@Nullable
UfsStatus[] listStatus(String path, ListOptions options) throws IOException;

/**
@@ -691,7 +695,7 @@ public int getValue() {
*
* @return true if this type of UFS supports flush, false otherwise
*/
boolean supportsFlush();
boolean supportsFlush() throws IOException;

/**
* Whether this type of UFS supports active sync.
@@ -825,7 +825,7 @@ public String toString() {
}

@Override
public boolean supportsFlush() {
public boolean supportsFlush() throws IOException {
return mUnderFileSystem.supportsFlush();
}

@@ -129,6 +129,14 @@ public static boolean isSwift(UnderFileSystem ufs) {
return "swift".equals(ufs.getUnderFSType());
}

/**
* @param ufs the {@link UnderFileSystem} implementation to check
* @return true if the implementation is a Http implementation
*/
public static boolean isWeb(UnderFileSystem ufs) {
return "web".equals(ufs.getUnderFSType());
}

/**
* @param uri the UFS path
* @return the bucket or container name of the object storage
@@ -12,12 +12,16 @@
package alluxio.util.network;

import com.google.common.base.Preconditions;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.HeadMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -29,8 +33,7 @@
public final class HttpUtils {
private static final Logger LOG = LoggerFactory.getLogger(HttpUtils.class);

private HttpUtils() {
}
private HttpUtils() {}

/**
* Uses the post method to send a url with arguments by http, this method can call RESTful Api.
@@ -39,8 +42,7 @@ private HttpUtils() {
* @param timeout milliseconds to wait for the server to respond before giving up
* @param processInputStream the response body stream processor
*/
public static void post(String url, Integer timeout,
IProcessInputStream processInputStream)
public static void post(String url, Integer timeout, IProcessInputStream processInputStream)
throws IOException {
Preconditions.checkNotNull(timeout, "timeout");
Preconditions.checkNotNull(processInputStream, "processInputStream");
@@ -68,12 +70,10 @@ public static void post(String url, Integer timeout,
* @param timeout milliseconds to wait for the server to respond before giving up
* @return the response body stream as UTF-8 string if response status is OK or CREATED
*/
public static String post(String url, Integer timeout)
throws IOException {
public static String post(String url, Integer timeout) throws IOException {
final StringBuilder contentBuffer = new StringBuilder();
post(url, timeout, inputStream -> {
try (BufferedReader br = new BufferedReader(
new InputStreamReader(inputStream, "UTF-8"))) {
try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))) {
String line;
while ((line = br.readLine()) != null) {
contentBuffer.append(line);
@@ -83,6 +83,103 @@ public static String post(String url, Integer timeout)
return contentBuffer.toString();
}

/**
* Uses the get method to send a url with arguments by http, this method can call RESTful Api.
*
* @param url the http url
* @param timeout milliseconds to wait for the server to respond before giving up
* @return the response body stream if response status is OK or CREATED
*/
public static InputStream getInputStream(String url, Integer timeout) throws IOException {
Preconditions.checkNotNull(url, "url");
Preconditions.checkNotNull(timeout, "timeout");
GetMethod getMethod = new GetMethod(url);
HttpClient httpClient = new HttpClient();
httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(timeout);
httpClient.getHttpConnectionManager().getParams().setSoTimeout(timeout);
int statusCode = httpClient.executeMethod(getMethod);
if (statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_CREATED) {
throw new IOException("Failed to perform GET request. Status code: " + statusCode);
}
InputStream inputStream = getMethod.getResponseBodyAsStream();
return new BufferedInputStream(inputStream) {
@Override
public void close() throws IOException {
getMethod.releaseConnection();
}
};
}

/**
* Uses the get method to send a url with arguments by http, this method can call RESTful Api.
*
* @param url the http url
* @param timeout milliseconds to wait for the server to respond before giving up
* @param processInputStream the response body stream processor
*/
public static void get(String url, Integer timeout, IProcessInputStream processInputStream)
throws IOException {
Preconditions.checkNotNull(url, "url");
Preconditions.checkNotNull(timeout, "timeout");
Preconditions.checkNotNull(processInputStream, "processInputStream");

try (InputStream inputStream = getInputStream(url, timeout)) {
processInputStream.process(inputStream);
}
}

/**
* Uses the get method to send a url with arguments by http, this method can call RESTful Api.
*
* @param url the http url
* @param timeout milliseconds to wait for the server to respond before giving up
* @return the response content string if response status is OK or CREATED
*/
public static String get(String url, Integer timeout) throws IOException {
Preconditions.checkNotNull(url, "url");
Preconditions.checkNotNull(timeout, "timeout");
final StringBuilder contentBuffer = new StringBuilder();
get(url, timeout, inputStream -> {
try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))) {
String line;
while ((line = br.readLine()) != null) {
contentBuffer.append(line);
}
}
});
return contentBuffer.toString();
}

/**
* Uses the head method to send a url with arguments by http, this method can call RESTful Api.
*
* @param url the http url
* @param timeout milliseconds to wait for the server to respond before giving up
* @return the response headers
*/
public static Header[] head(String url, Integer timeout) {
Preconditions.checkNotNull(url, "url");
Preconditions.checkNotNull(timeout, "timeout");
HeadMethod headMethod = new HeadMethod(url);
try {
HttpClient httpClient = new HttpClient();
httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(timeout);
httpClient.getHttpConnectionManager().getParams().setSoTimeout(timeout);
int statusCode = httpClient.executeMethod(headMethod);
if (statusCode == HttpStatus.SC_OK || statusCode == HttpStatus.SC_CREATED) {
return headMethod.getResponseHeaders();
} else {
LOG.error("Failed to perform HEAD request. Status code: {}", statusCode);
}
} catch (Exception e) {
LOG.error("Failed to execute URL request: {}", url, e);
} finally {
headMethod.releaseConnection();
}

return null;
}

/**
* This interface should be implemented by the http response body stream processor.
*/
@@ -155,6 +155,7 @@
import alluxio.util.interfaces.Scoped;
import alluxio.util.io.PathUtils;
import alluxio.util.proto.ProtoUtils;
import alluxio.util.UnderFileSystemUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.CommandType;
@@ -2660,6 +2661,7 @@ private void mountInternal(Supplier<JournalContext> journalContext, LockedInodeP
.setShared(context.getOptions().getShared())
.createMountSpecificConf(context.getOptions().getPropertiesMap()));
try {
MountPOptions.Builder mountOption = context.getOptions();
try (CloseableResource<UnderFileSystem> ufsResource =
mUfsManager.get(mountId).acquireUfsResource()) {
UnderFileSystem ufs = ufsResource.get();
@@ -2668,6 +2670,10 @@ private void mountInternal(Supplier<JournalContext> journalContext, LockedInodeP
throw new IOException(
ExceptionMessage.UFS_PATH_DOES_NOT_EXIST.getMessage(ufsPath.getPath()));
}

if (UnderFileSystemUtils.isWeb(ufs)) {
mountOption.setReadOnly(true);
}
}
// Check that the alluxioPath we're creating doesn't shadow a path in the parent UFS
MountTable.Resolution resolution = mMountTable.resolve(alluxioPath);
@@ -250,7 +250,7 @@ private void ls(AlluxioURI path, boolean recursive, boolean forceLoadMetadata, b
@Override
public void run() {
System.out.printf("Getting directory status of %s files or sub-directories "
+ "may take a while.", pathStatus.getLength());
+ "may take a while.%n", pathStatus.getLength());
}
}, 10000);
}
@@ -306,7 +306,7 @@ public void setMode(String path, short mode) throws IOException {
}

@Override
public boolean supportsFlush() {
public boolean supportsFlush() throws IOException {
return mUfs.supportsFlush();
}

@@ -219,7 +219,7 @@ public void setMode(String path, short mode) throws IOException {
}

@Override
public boolean supportsFlush() {
public boolean supportsFlush() throws IOException {
sleepIfNecessary(mOptions.getSupportsFlushMs());
return super.supportsFlush();
}
@@ -643,7 +643,7 @@ public void setMode(String path, short mode) throws IOException {
}

@Override
public boolean supportsFlush() {
public boolean supportsFlush() throws IOException {
return true;
}

@@ -409,7 +409,7 @@ public void connectFromWorker(String hostname) throws IOException {
}

@Override
public boolean supportsFlush() {
public boolean supportsFlush() throws IOException {
return true;
}

@@ -31,6 +31,7 @@
<module>s3a</module>
<module>swift</module>
<module>wasb</module>
<module>web</module>
<module>kodo</module>
</modules>

0 comments on commit fe853db

Please sign in to comment.
You can’t perform that action at this time.