Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected Map<String, String> loadConfigFromFile(String resourceConfig) {

// Subclass can override this method to return the property name of resource config.
protected String getResourceConfigPropName() {
return "";
return null;
}

// This method will check if all required properties are set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@

public class COSProperties extends AbstractObjectStorageProperties {

@ConnectorProperty(names = {"cos.endpoint"},
@ConnectorProperty(names = {"cos.endpoint", "s3.endpoint"},
required = false,
description = "The endpoint of COS.")
protected String cosEndpoint = "cos.ap-guangzhou.myqcloud.com";
protected String cosEndpoint = "";

@ConnectorProperty(names = {"cos.region"},
@ConnectorProperty(names = {"cos.region", "s3.region"},
required = false,
description = "The region of COS.")
protected String cosRegion = "";
protected String cosRegion = "ap-guangzhou";

@ConnectorProperty(names = {"cos.access_key"},
description = "The access key of S3.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@ private void loadFinalHdfsConfig(Map<String, String> origProps) {
return;
}
finalHdfsConfig = new HashMap<>();
Configuration configuration = new Configuration();
origProps.forEach((k, v) -> {
if (null != configuration.getTrimmed(k)) {
finalHdfsConfig.put(k, v);
origProps.forEach((key, value) -> {
if (key.startsWith("hadoop.") || key.startsWith("dfs.")) {
finalHdfsConfig.put(key, value);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class OBSProperties extends AbstractObjectStorageProperties {

@ConnectorProperty(names = {"obs.endpoint"}, required = false, description = "The endpoint of OBS.")
@ConnectorProperty(names = {"obs.endpoint", "s3.endpoint"}, required = false, description = "The endpoint of OBS.")
protected String obsEndpoint = "obs.cn-east-3.myhuaweicloud.com";

@ConnectorProperty(names = {"obs.access_key"}, description = "The access key of OBS.")
Expand All @@ -38,7 +38,8 @@ public class OBSProperties extends AbstractObjectStorageProperties {
protected String obsSecretKey = "";


private String region;
@ConnectorProperty(names = {"obs.region", "s3.region"}, required = false, description = "The region of OBS.")
protected String region;

public OBSProperties(Map<String, String> origProps) {
super(Type.OBS, origProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.regex.Pattern;

public class OSSProperties extends AbstractObjectStorageProperties {
@ConnectorProperty(names = {"oss.endpoint"}, required = false, description = "The endpoint of OSS.")
@ConnectorProperty(names = {"oss.endpoint", "s3.endpoint"}, required = false, description = "The endpoint of OSS.")
protected String endpoint = "oss-cn-hangzhou.aliyuncs.com";

@ConnectorProperty(names = {"oss.access_key"}, description = "The access key of OSS.")
Expand All @@ -36,6 +36,7 @@ public class OSSProperties extends AbstractObjectStorageProperties {
@ConnectorProperty(names = {"oss.secret_key"}, description = "The secret key of OSS.")
protected String secretKey = "";

@ConnectorProperty(names = {"oss.region", "s3.region"}, required = false, description = "The region of OSS.")
protected String region;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.datasource.property.metastore.AWSGlueProperties;
import org.apache.doris.datasource.property.metastore.AliyunDLFProperties;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.options.Options;
Expand All @@ -34,14 +33,12 @@
public class S3Properties extends AbstractObjectStorageProperties {

@ConnectorProperty(names = {"s3.endpoint", "AWS_ENDPOINT"},
required = false,
description = "The endpoint of S3.")
protected String s3Endpoint = "";

@ConnectorProperty(names = {"s3.region", "AWS_REGION"},
required = false,
description = "The region of S3.")
protected String s3Region = "";
protected String s3Region = "us-east-1";

@ConnectorProperty(names = {"s3.access_key", "AWS_ACCESS_KEY"},
description = "The access key of S3.")
Expand Down Expand Up @@ -96,11 +93,6 @@ public class S3Properties extends AbstractObjectStorageProperties {

public S3Properties(Map<String, String> origProps) {
super(Type.S3, origProps);
if (Strings.isNullOrEmpty(s3Region)) {
// Some object storage services do not have region concept, eg: minio.
// Use a default one.
s3Endpoint = "us-east-1";
}
}

/**
Expand All @@ -110,6 +102,9 @@ public S3Properties(Map<String, String> origProps) {
* @return
*/
public static boolean guessIsMe(Map<String, String> origProps) {
if (origProps.containsKey("s3.access_key") || origProps.containsKey("AWS_ACCESS_KEY")) {
return true;
}
List<Field> fields = getIdentifyFields();
return StorageProperties.checkIdentifierKey(origProps, fields);
}
Expand All @@ -118,10 +113,9 @@ private static List<Field> getIdentifyFields() {
List<Field> fields = Lists.newArrayList();
try {
//todo AliyunDlfProperties should in OSS storage type.
fields.add(S3Properties.class.getDeclaredField("s3Endpoint"));
fields.add(AliyunDLFProperties.class.getDeclaredField("dlfEndpoint"));
fields.add(AliyunDLFProperties.class.getDeclaredField("dlfRegion"));
fields.add(AWSGlueProperties.class.getDeclaredField("glueEndpoint"));
fields.add(S3Properties.class.getDeclaredField("s3AccessKey"));
fields.add(AliyunDLFProperties.class.getDeclaredField("dlfAccessKey"));
fields.add(AWSGlueProperties.class.getDeclaredField("glueAccessKey"));
return fields;
} catch (NoSuchFieldException e) {
// should not happen
Expand Down
46 changes: 46 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.util.S3URI;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.property.storage.AbstractObjectStorageProperties;
import org.apache.doris.fs.remote.RemoteFile;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class S3ObjStorage implements ObjStorage<S3Client> {
Expand Down Expand Up @@ -108,6 +110,50 @@ public S3Client getClient() throws UserException {
return client;
}



public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
URI uri = new URI(remotePath);
String bucketName = uri.getHost();
String prefix = uri.getPath().substring(1);
int wildcardIndex = prefix.indexOf('*');
String searchPrefix = wildcardIndex > 0 ? prefix.substring(0, wildcardIndex) : prefix;
try (S3Client s3 = getClient()) {
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(searchPrefix)
.build();

ListObjectsV2Response listResponse = s3.listObjectsV2(listRequest);
String regex = prefix.replace(".", "\\.")
.replace("*", ".*")
.replace("?", ".");
Pattern pattern = Pattern.compile(regex);
List<RemoteFile> matchedFiles = listResponse.contents().stream()
.filter(obj -> pattern.matcher(obj.key()).matches())
.map(obj -> {
String fullKey = obj.key();
String fullPath = "s3://" + bucketName + "/" + fullKey;
return new RemoteFile(
fileNameOnly ? fullPath.substring(fullPath.lastIndexOf('/') + 1) : fullPath,
true,
obj.size(),
-1,
obj.lastModified().toEpochMilli()
);
})
.collect(Collectors.toList());

result.addAll(matchedFiles);
}
return Status.OK;
} catch (Exception e) {
LOG.warn("Errors while getting file status", e);
return new Status(Status.ErrCode.COMMON_ERROR, "Errors while getting file status " + e.getMessage());
}
}

@Override
public Triple<String, String, String> getStsToken() throws DdlException {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,16 @@
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.property.storage.AbstractObjectStorageProperties;
import org.apache.doris.fs.obj.S3ObjStorage;

import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;

public class S3FileSystem extends ObjFileSystem {

Expand Down Expand Up @@ -66,77 +58,18 @@ private void initFsProperties() {
this.properties.putAll(storageProperties.getOrigProps());
}


@Override
protected FileSystem nativeFileSystem(String remotePath) throws UserException {
//todo Extracting a common method to achieve logic reuse
if (closed.get()) {
throw new UserException("FileSystem is closed.");
}
if (dfsFileSystem == null) {
synchronized (this) {
if (closed.get()) {
throw new UserException("FileSystem is closed.");
}
if (dfsFileSystem == null) {
Configuration conf = s3Properties.getHadoopConfiguration();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
AuthenticationConfig authConfig = AuthenticationConfig.getSimpleAuthenticationConfig(conf);
HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig);
try {
dfsFileSystem = authenticator.doAs(() -> {
try {
return FileSystem.get(new Path(remotePath).toUri(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
this.authenticator = authenticator;
RemoteFSPhantomManager.registerPhantomReference(this);
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
}
}
}
}
return dfsFileSystem;
return null;

}

// broker file pattern glob is too complex, so we use hadoop directly
@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
FileSystem s3AFileSystem = nativeFileSystem(remotePath);
Path pathPattern = new Path(remotePath);
FileStatus[] files = s3AFileSystem.globStatus(pathPattern);
if (files == null) {
return Status.OK;
}
for (FileStatus fileStatus : files) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
fileStatus.getBlockSize(), fileStatus.getModificationTime());
result.add(remoteFile);
}
} catch (FileNotFoundException e) {
LOG.info("file not found: " + e.getMessage());
return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage());
} catch (Exception e) {
if (e.getCause() instanceof AmazonS3Exception) {
// process minio error msg
AmazonS3Exception ea = (AmazonS3Exception) e.getCause();
Map<String, String> callbackHeaders = ea.getHttpHeaders();
if (callbackHeaders != null && !callbackHeaders.isEmpty()) {
String minioErrMsg = callbackHeaders.get("X-Minio-Error-Desc");
if (minioErrMsg != null) {
return new Status(Status.ErrCode.COMMON_ERROR, "Minio request error: " + minioErrMsg);
}
}
}
LOG.error("errors while get file status ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage());
}
return Status.OK;
S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
return objStorage.globList(remotePath, result, fileNameOnly);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;

/**
Expand Down Expand Up @@ -73,7 +75,22 @@ public S3TableValuedFunction(Map<String, String> properties) throws AnalysisExce

S3URI s3uri = getS3Uri(uriStr, Boolean.parseBoolean(usePathStyle.toLowerCase()),
Boolean.parseBoolean(forceParsingByStandardUri.toLowerCase()));

String endpoint = constructEndpoint(otherProps, s3uri);
if (StringUtils.isNotBlank(endpoint)) {
otherProps.putIfAbsent(S3Properties.ENDPOINT, endpoint);
}
if (!otherProps.containsKey(S3Properties.REGION)) {
String region;
if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
// Azure could run without region
region = s3uri.getRegion().orElse("DUMMY-REGION");
} else {
region = s3uri.getRegion().orElse(null);
}
if (StringUtils.isNotBlank(region)) {
otherProps.put(S3Properties.REGION, region);
}
}
// get endpoint first from properties, if not present, get it from s3 uri.
// If endpoint is missing, exception will be thrown.
locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
Expand All @@ -82,7 +99,9 @@ public S3TableValuedFunction(Map<String, String> properties) throws AnalysisExce
// For Azure's compatibility, we need bucket to connect to the blob storage's container
locationProperties.put(S3Properties.BUCKET, s3uri.getBucket());
}
this.storageProperties = StorageProperties.createStorageProperties(properties);
Map<String, String> p = new HashMap<>(properties);
p.putAll(otherProps);
this.storageProperties = StorageProperties.createStorageProperties(p);
locationProperties.putAll(storageProperties.getBackendConfigProperties());
locationProperties.putAll(otherProps);

Expand All @@ -96,6 +115,26 @@ public S3TableValuedFunction(Map<String, String> properties) throws AnalysisExce
}
}

private String constructEndpoint(Map<String, String> properties, S3URI s3uri) throws AnalysisException {
String endpoint;
if (!AzureProperties.checkAzureProviderPropertyExist(properties)) {
// get endpoint first from properties, if not present, get it from s3 uri.
// If endpoint is missing, exception will be thrown.
endpoint = getOrDefaultAndRemove(properties, S3Properties.ENDPOINT, s3uri.getEndpoint().orElse(""));
/*if (Strings.isNullOrEmpty(endpoint)) {
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ENDPOINT));
}*/
} else {
String bucket = s3uri.getBucket();
String accountName = properties.getOrDefault(S3Properties.ACCESS_KEY, "");
if (accountName.isEmpty()) {
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ACCESS_KEY));
}
endpoint = String.format(AzureProperties.AZURE_ENDPOINT_TEMPLATE, accountName, bucket);
}
return endpoint;
}

private void forwardCompatibleDeprecatedKeys(Map<String, String> props) {
for (String deprecatedKey : DEPRECATED_KEYS) {
String value = props.remove(deprecatedKey);
Expand Down
Loading