Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker](bos) support baidu bos object storage file system for broker #15448

Merged
merged 1 commit into from
Dec 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,16 @@ public static String checkPath(String path, StorageBackend.StorageType type) thr
URI uri = URI.create(path);
String schema = uri.getScheme();
if (type == StorageBackend.StorageType.BROKER) {
if (schema == null || (!schema.equalsIgnoreCase("hdfs")
if (schema == null || (!schema.equalsIgnoreCase("bos")
&& !schema.equalsIgnoreCase("afs")
&& !schema.equalsIgnoreCase("hdfs")
&& !schema.equalsIgnoreCase("ofs")
&& !schema.equalsIgnoreCase("obs")
&& !schema.equalsIgnoreCase("oss")
&& !schema.equalsIgnoreCase("s3a")
&& !schema.equalsIgnoreCase("cosn"))) {
throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'ofs://', 'obs://',"
+ "'oss://', 's3a://' or 'cosn://' path.");
throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://',"
+ " 'ofs://', 'obs://', 'oss://', 's3a://' or 'cosn://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@ public class FileSystemIdentity {

private final String hostName;
private final String ugiInfo;

private final String extraInfo;

public FileSystemIdentity(String hostName, String ugiInfo) {
this.hostName = hostName;
this.ugiInfo = ugiInfo;
this.extraInfo = null;
}

public FileSystemIdentity(String hostName, String ugiInfo, String extraInfo) {
this.hostName = hostName;
this.ugiInfo = ugiInfo;
this.extraInfo = extraInfo;
}

@Override
Expand All @@ -34,6 +43,7 @@ public int hashCode() {
result = prime * result
+ ((hostName == null) ? 0 : hostName.hashCode());
result = prime * result + ((ugiInfo == null) ? 0 : ugiInfo.hashCode());
result = prime * result + ((extraInfo == null) ? 0 : extraInfo.hashCode());
return result;
}

Expand Down Expand Up @@ -63,6 +73,13 @@ public boolean equals(Object obj) {
} else if (!ugiInfo.equals(other.ugiInfo)) {
return false;
}
if (extraInfo == null) {
if (other.extraInfo != null) {
return false;
}
} else if (!extraInfo.equals(other.extraInfo)) {
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -71,6 +72,8 @@ public class FileSystemManager {
private static final String OBS_SCHEME = "obs";
private static final String OSS_SCHEME = "oss";
private static final String COS_SCHEME = "cosn";
private static final String BOS_SCHEME = "bos";
private static final String AFS_SCHEME = "afs";

private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
Expand Down Expand Up @@ -132,6 +135,23 @@ public class FileSystemManager {
private static final String FS_COS_IMPL = "fs.cosn.impl";
private static final String FS_COS_IMPL_DISABLE_CACHE = "fs.cosn.impl.disable.cache";

// arguments for bos
private static final String FS_BOS_ACCESS_KEY = "fs.bos.access.key";
private static final String FS_BOS_SECRET_KEY = "fs.bos.secret.access.key";
private static final String FS_BOS_ENDPOINT = "fs.bos.endpoint";
private static final String FS_BOS_IMPL = "fs.bos.impl";
private static final String FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE = "fs.bos.multipart.uploads.block.size";


// arguments for afs
private static final String HADOOP_JOB_GROUP_NAME = "hadoop.job.group.name";
private static final String HADOOP_JOB_UGI = "hadoop.job.ugi";
private static final String FS_DEFAULT_NAME = "fs.default.name";
private static final String FS_AFS_IMPL = "fs.afs.impl";
private static final String DFS_AGENT_PORT = "dfs.agent.port";
private static final String DFS_CLIENT_AUTH_METHOD = "dfs.client.auth.method";
private static final String DFS_RPC_TIMEOUT = "dfs.rpc.timeout";

private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);

private int readBufferSize = 128 << 10; // 128k
Expand Down Expand Up @@ -197,6 +217,8 @@ public BrokerFileSystem getFileSystem(String path, Map<String, String> propertie
brokerFileSystem = getOSSFileSystem(path, properties);
} else if (scheme.equals(COS_SCHEME)) {
brokerFileSystem = getCOSFileSystem(path, properties);
} else if (scheme.equals(BOS_SCHEME)) {
brokerFileSystem = getBOSFileSystem(path, properties);
} else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
Expand Down Expand Up @@ -548,8 +570,8 @@ public BrokerFileSystem getOSSFileSystem(String path, Map<String, String> proper
String endpoint = properties.getOrDefault(FS_OSS_ENDPOINT, "");
String disableCache = properties.getOrDefault(FS_OSS_IMPL_DISABLE_CACHE, "true");
String host = OSS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
String obsUgi = accessKey + "," + secretKey;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, obsUgi);
String ossUgi = accessKey + "," + secretKey;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ossUgi);
cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
fileSystem.getLock().lock();
Expand Down Expand Up @@ -608,11 +630,11 @@ public BrokerFileSystem getChdfsFileSystem(String path, Map<String, String> prop
} else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT);
} else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"keytab is required for kerberos authentication");
}
if (!properties.containsKey(KERBEROS_PRINCIPAL)) {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"principal is required for kerberos authentication");
} else {
kerberosContent = kerberosContent + properties.get(KERBEROS_PRINCIPAL);
Expand Down Expand Up @@ -651,8 +673,8 @@ public BrokerFileSystem getChdfsFileSystem(String path, Map<String, String> prop
// pass kerberos keytab content use base64 encoding
// so decode it and write it to tmp path under /tmp
// because ugi api only accept a local path as argument
String keytab_content = properties.get(KERBEROS_KEYTAB_CONTENT);
byte[] base64decodedBytes = Base64.getDecoder().decode(keytab_content);
String keytabContent = properties.get(KERBEROS_KEYTAB_CONTENT);
byte[] base64decodedBytes = Base64.getDecoder().decode(keytabContent);
long currentTime = System.currentTimeMillis();
Random random = new Random(currentTime);
int randNumber = random.nextInt(10000);
Expand Down Expand Up @@ -737,6 +759,91 @@ public BrokerFileSystem getCOSFileSystem(String path, Map<String, String> proper
}
}

/**
* visible for test
* <p>
* file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
*
* @param path
* @param properties
* @return
* @throws URISyntaxException
* @throws Exception
*/
public BrokerFileSystem getBOSFileSystem(String path, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
String accessKey = properties.getOrDefault(FS_BOS_ACCESS_KEY, "");
String secretKey = properties.getOrDefault(FS_BOS_SECRET_KEY, "");
String endpoint = properties.getOrDefault(FS_BOS_ENDPOINT, "");
String multiPartUploadBlockSize = properties.getOrDefault(FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE, "9437184");
// endpoint is the server host, pathUri.getUri().getHost() is the bucket
// we should use these two params as the host identity, because FileSystem will cache both.
String host = BOS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
String bosUgi = accessKey + "," + secretKey;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, bosUgi);
BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
fileSystem.getLock().lock();
try {
if (fileSystem.getDFSFileSystem() == null) {
logger.info("could not find file system for path " + path + " create a new one");
// create a new filesystem
Configuration conf = new Configuration();
conf.set(FS_BOS_ACCESS_KEY, accessKey);
conf.set(FS_BOS_SECRET_KEY, secretKey);
conf.set(FS_BOS_ENDPOINT, endpoint);
conf.set(FS_BOS_IMPL, "org.apache.hadoop.fs.bos.BaiduBosFileSystem");
conf.set(FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE, multiPartUploadBlockSize);
FileSystem bosFileSystem = FileSystem.get(pathUri.getUri(), conf);
fileSystem.setFileSystem(bosFileSystem);
}
return fileSystem;
} catch (Exception e) {
logger.error("errors while connect to " + path, e);
throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
fileSystem.getLock().unlock();
}
}

private BrokerFileSystem getAfsFileSystem(String path, Map<String, String> properties) {
URI pathUri = new WildcardURI(path).getUri();
String host = pathUri.getScheme() + "://" + pathUri.getAuthority();
String username = properties.containsKey(USER_NAME_KEY) ? properties.get(USER_NAME_KEY) : "";
String password = properties.containsKey(PASSWORD_KEY) ? properties.get(PASSWORD_KEY) : "";
String group = properties.containsKey(HADOOP_JOB_GROUP_NAME) ? properties.get(HADOOP_JOB_GROUP_NAME) : "";
String afsUgi = username + "," + password;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, afsUgi, group);
cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
fileSystem.getLock().lock();
try {
if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
// this means the file system is closed by file system checker thread
// it is a corner case
return null;
}
if (fileSystem.getDFSFileSystem() == null) {
logger.info("could not find file system for path " + path + " create a new one");
// create a new file system
Configuration conf = new Configuration();
conf.set(HADOOP_JOB_UGI, afsUgi);
conf.set(HADOOP_JOB_GROUP_NAME, group);
conf.set(FS_DEFAULT_NAME, host);
conf.set(FS_AFS_IMPL, "org.apache.hadoop.fs.DFileSystem");
conf.set(DFS_CLIENT_AUTH_METHOD, properties.getOrDefault(DFS_CLIENT_AUTH_METHOD, "3"));
conf.set(DFS_AGENT_PORT, properties.getOrDefault(DFS_AGENT_PORT, "20001"));
conf.set(DFS_RPC_TIMEOUT, properties.getOrDefault(DFS_RPC_TIMEOUT, "300000"));
FileSystem dfsFileSystem = FileSystem.get(URI.create(host), conf);
fileSystem.setFileSystem(dfsFileSystem);
}
return fileSystem;
} catch (Exception e) {
logger.error("errors while connect to " + path, e);
throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e, e.getMessage());
} finally {
fileSystem.getLock().unlock();
}
}

public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
Expand Down