Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement](broker)Doris support obs broker load #13523

Merged
merged 4 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public class ExportStmt extends StatementBase {
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_COLUMNS = "";


private TableName tblName;
private List<String> partitions;
private Expr whereExpr;
Expand Down Expand Up @@ -235,10 +233,13 @@ public static String checkPath(String path, StorageBackend.StorageType type) thr
URI uri = URI.create(path);
String schema = uri.getScheme();
if (type == StorageBackend.StorageType.BROKER) {
if (schema == null || (!schema.equalsIgnoreCase("bos") && !schema.equalsIgnoreCase("afs")
&& !schema.equalsIgnoreCase("hdfs"))) {
throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' or 'BOS://' " +
"path.");
if (schema == null || (!schema.equalsIgnoreCase("bos")
&& !schema.equalsIgnoreCase("afs")
&& !schema.equalsIgnoreCase("hdfs")
&& !schema.equalsIgnoreCase("ofs")
&& !schema.equalsIgnoreCase("obs"))) {
throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' , 'BOS://', "
+ "or 'ofs://' or 'obs://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
Expand Down
28 changes: 23 additions & 5 deletions fs_brokers/apache_hdfs_broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ under the License.
<maven.compiler.target>1.8</maven.compiler.target>
<log4j2.version>2.18.0</log4j2.version>
<project.scm.id>github</project.scm.id>
<hadoop.version>2.8.3</hadoop.version>
</properties>
<profiles>
<!-- for custom internal repository -->
Expand Down Expand Up @@ -111,6 +112,11 @@ under the License.
<id>oracleReleases</id>
<url>http://download.oracle.com/maven</url>
</repository>
<!-- for huawei obs sdk -->
<repository>
<id>huawei-obs-sdk</id>
<url>https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/</url>
</repository>
</repositories>
<pluginRepositories>
<!-- for cup-maven-plugin -->
Expand Down Expand Up @@ -186,25 +192,25 @@ under the License.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>2.7.3</version>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-auth -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.7.3</version>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core -->
<dependency>
Expand Down Expand Up @@ -266,7 +272,19 @@ under the License.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.3</version>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.huaweicloud/esdk-obs-java-bundle -->
<dependency>
<groupId>com.huaweicloud</groupId>
<artifactId>esdk-obs-java-bundle</artifactId>
<version>3.21.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-huaweicloud -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-huaweicloud</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class FileSystemManager {
private static final String HDFS_SCHEME = "hdfs";
private static final String S3A_SCHEME = "s3a";
private static final String KS3_SCHEME = "ks3";
private static final String CHDFS_SCHEME = "ofs";
private static final String OBS_SCHEME = "obs";

private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
Expand Down Expand Up @@ -98,6 +100,14 @@ public class FileSystemManager {
// This property is used like 'fs.hdfs.impl.disable.cache'
private static final String FS_S3A_IMPL_DISABLE_CACHE = "fs.s3a.impl.disable.cache";

// arguments for obs
private static final String FS_OBS_ACCESS_KEY = "fs.obs.access.key";
private static final String FS_OBS_SECRET_KEY = "fs.obs.secret.key";
private static final String FS_OBS_ENDPOINT = "fs.obs.endpoint";
// This property is used like 'fs.hdfs.impl.disable.cache'
private static final String FS_OBS_IMPL_DISABLE_CACHE = "fs.obs.impl.disable.cache";
private static final String FS_OBS_IMPL = "fs.obs.impl";

// arguments for ks3
private static final String FS_KS3_ACCESS_KEY = "fs.ks3.AccessKey";
private static final String FS_KS3_SECRET_KEY = "fs.ks3.AccessSecret";
Expand Down Expand Up @@ -164,7 +174,9 @@ public BrokerFileSystem getFileSystem(String path, Map<String, String> propertie
brokerFileSystem = getS3AFileSystem(path, properties);
} else if (scheme.equals(KS3_SCHEME)) {
brokerFileSystem = getKS3FileSystem(path, properties);
}else {
} else if (scheme.equals(OBS_SCHEME)) {
brokerFileSystem = getOBSFileSystem(path, properties);
} else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
}
Expand Down Expand Up @@ -437,6 +449,47 @@ public BrokerFileSystem getS3AFileSystem(String path, Map<String, String> proper
}
}


/**
* file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
* @param path
* @param properties
* @return
*/
public BrokerFileSystem getOBSFileSystem(String path, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
String accessKey = properties.getOrDefault(FS_OBS_ACCESS_KEY, "");
String secretKey = properties.getOrDefault(FS_OBS_SECRET_KEY, "");
String endpoint = properties.getOrDefault(FS_OBS_ENDPOINT, "");
String disableCache = properties.getOrDefault(FS_OBS_IMPL_DISABLE_CACHE, "true");
String host = OBS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
String obsUgi = accessKey + "," + secretKey;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, obsUgi);
cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
BrokerFileSystem fileSystem = cachedFileSystem.get(fileSystemIdentity);
fileSystem.getLock().lock();
try {
if (fileSystem.getDFSFileSystem() == null) {
logger.info("create file system for new path " + path);
// create a new filesystem
Configuration conf = new Configuration();
conf.set(FS_OBS_ACCESS_KEY, accessKey);
conf.set(FS_OBS_SECRET_KEY, secretKey);
conf.set(FS_OBS_ENDPOINT, endpoint);
conf.set(FS_OBS_IMPL, "org.apache.hadoop.fs.obs.OBSFileSystem");
conf.set(FS_OBS_IMPL_DISABLE_CACHE, disableCache);
FileSystem obsFileSystem = FileSystem.get(pathUri.getUri(), conf);
fileSystem.setFileSystem(obsFileSystem);
}
return fileSystem;
} catch (Exception e) {
logger.error("errors while connect to " + path, e);
throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
fileSystem.getLock().unlock();
}
}

/**
* visible for test
* <p>
Expand Down