Skip to content

Commit

Permalink
[feat] Add type local turn on resource center by default (#13303)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiajie committed Jan 31, 2023
1 parent 78ba71b commit 78e5569
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 45 deletions.
17 changes: 13 additions & 4 deletions docs/docs/en/guide/resource/configuration.md
Expand Up @@ -9,13 +9,22 @@

### Configure `common.properties`

If you deploy DolphinScheduler in `Cluster` or `Pseudo-Cluster` mode, you need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`.
If you deploy DolphinScheduler in `Standalone` mode, you only need to configure `standalone-server/conf/common.properties` as follows:
DolphinScheduler Resource Center uses local file system by default, and does not require any additional configuration.
But please make sure to change the following configuration at the same time when you need to modify the default value.

- If you deploy DolphinScheduler in `Cluster` or `Pseudo-Cluster` mode, you need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`.
- If you deploy DolphinScheduler in `Standalone` mode, you only need to configure `standalone-server/conf/common.properties` as follows:

The configuration you may need to change:

- Change `resource.storage.upload.base.path` to your local directory path. Please make sure the `tenant resource.hdfs.root.user` has read and write permissions for `resource.storage.upload.base.path`, e,g. `/tmp/dolphinscheduler`. `DolphinScheduler` will create the directory you configure if it does not exist.
- Modify `resource.storage.type=HDFS` and `resource.hdfs.fs.defaultFS=file:///`.

> NOTE: Please modify the value of `resource.storage.upload.base.path` if you do not want to use the default value as the base path.
> NOTE:
> 1. LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless use shared file mount point
> 2. Please modify the value of `resource.storage.upload.base.path` if you do not want to use the default value as the base path.
> 3. The local config is `resource.storage.type=LOCAL` it has actually configured two setting, `resource.storage.type=HDFS`
> and `resource.hdfs.fs.defaultFS=file:///`, The configuration of `resource.storage.type=LOCAL` is for user-friendly, and enables
> the local resource center to be enabled by default
## Use HDFS or Remote Object Storage

Expand Down
19 changes: 13 additions & 6 deletions docs/docs/zh/guide/resource/configuration.md
Expand Up @@ -9,13 +9,20 @@

### 配置 `common.properties` 文件

如果您以 `集群` 模式或者 `伪集群` 模式部署DolphinScheduler,您需要对以下路径的文件进行配置:`api-server/conf/common.properties``worker-server/conf/common.properties`
若您以 `单机` 模式部署DolphinScheduler,您只需要配置 `standalone-server/conf/common.properties`,具体配置如下:
Dolphinscheduler 资源中心使用本地系统默认是开启的,不需要用户做任何额外的配置,但是当用户需要对默认配置做修改时,请确保同时完成下面的修改。

- 如果您以 `集群` 模式或者 `伪集群` 模式部署DolphinScheduler,您需要对以下路径的文件进行配置:`api-server/conf/common.properties``worker-server/conf/common.properties`
- 若您以 `单机` 模式部署DolphinScheduler,您只需要配置 `standalone-server/conf/common.properties`,具体配置如下:

您可能需要涉及如下的修改:

-`resource.storage.upload.base.path` 改为本地存储路径,请确保部署 DolphinScheduler 的用户拥有读写权限,例如:`resource.storage.upload.base.path=/tmp/dolphinscheduler`。当路径不存在时会自动创建文件夹
- 修改 `resource.storage.type=HDFS``resource.hdfs.fs.defaultFS=file:///`

> **注意**:如果您不想用默认值作为资源中心的基础路径,请修改`resource.storage.upload.base.path`的值。
> **注意**
> 1. LOCAL模式不支持分布式模式读写,意味着上传的资源只能在一台机器上使用,除非使用共享文件挂载点
> 2. 如果您不想用默认值作为资源中心的基础路径,请修改`resource.storage.upload.base.path`的值。
> 3. 当配置 `resource.storage.type=LOCAL`,其实您配置了两个配置项,分别是 `resource.storage.type=HDFS``resource.hdfs.fs.defaultFS=file:///` ,我们单独配置 `resource.storage.type=LOCAL` 这个值是为了
> 方便用户,并且能使得本地资源中心默认开启
## 对接分布式或远端对象存储

Expand Down Expand Up @@ -46,8 +53,8 @@
# user data local directory path, please make sure the directory exists and have read write permissions
data.basedir.path=/tmp/dolphinscheduler

# resource storage type: HDFS, S3, OSS, NONE
resource.storage.type=HDFS
# resource storage type: LOCAL, HDFS, S3, OSS
resource.storage.type=LOCAL

# resource store on HDFS/S3/OSS path, resource file will store to this hadoop hdfs path, self configuration,
# please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
Expand Down
Expand Up @@ -21,5 +21,5 @@
* data base types
*/
public enum ResUploadType {
HDFS, S3, OSS, NONE
LOCAL, HDFS, S3, OSS, NONE
}
6 changes: 4 additions & 2 deletions dolphinscheduler-common/src/main/resources/common.properties
Expand Up @@ -21,8 +21,10 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js

# resource storage type: HDFS, S3, OSS, NONE
resource.storage.type=NONE
# resource storage type: LOCAL, HDFS, S3, OSS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless
# use shared file mount point
resource.storage.type=LOCAL
# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/dolphinscheduler

Expand Down
Expand Up @@ -31,6 +31,6 @@ public void getString() {

@Test
public void getResUploadStartupState() {
Assertions.assertFalse(PropertyUtils.getResUploadStartupState());
Assertions.assertTrue(PropertyUtils.getResUploadStartupState());
}
}
Expand Up @@ -21,9 +21,10 @@

public enum StorageType {

HDFS(0, "HDFS"),
OSS(1, "OSS"),
S3(2, "S3"),
LOCAL(0, "LOCAL"),
HDFS(1, "HDFS"),
OSS(2, "OSS"),
S3(3, "S3"),
;

private final int code;
Expand Down
Expand Up @@ -77,22 +77,17 @@
public class HdfsStorageOperator implements Closeable, StorageOperate {

private static final Logger logger = LoggerFactory.getLogger(HdfsStorageOperator.class);
private String hdfsUser;
public static final String RM_HA_IDS = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
public static final String APP_ADDRESS = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
public static final String JOB_HISTORY_ADDRESS = PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
public static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE =
PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
private static HdfsStorageProperties hdfsProperties;
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";

private static final LoadingCache<String, HdfsStorageOperator> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(PropertyUtils.getInt(Constants.KERBEROS_EXPIRE_TIME, 2), TimeUnit.HOURS)
.expireAfterWrite(HdfsStorageProperties.getKerberosExpireTime(), TimeUnit.HOURS)
.build(new CacheLoader<String, HdfsStorageOperator>() {

@Override
public HdfsStorageOperator load(String key) throws Exception {
return new HdfsStorageOperator();
return new HdfsStorageOperator(hdfsProperties);
}
});

Expand All @@ -101,8 +96,13 @@ public HdfsStorageOperator load(String key) throws Exception {
private Configuration configuration;
private FileSystem fs;

private HdfsStorageOperator() {
hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
public HdfsStorageOperator() {
this(new HdfsStorageProperties());
}

public HdfsStorageOperator(HdfsStorageProperties hdfsStorageProperties) {
// Overwrite config from passing hdfsStorageProperties
hdfsProperties = hdfsStorageProperties;
init();
initHdfsPath();
}
Expand Down Expand Up @@ -133,16 +133,12 @@ private void init() throws NullPointerException {
try {
configuration = new HdfsConfiguration();

String hdfsUser = hdfsProperties.getUser();
if (CommonUtils.loadKerberosConf(configuration)) {
hdfsUser = "";
}

String defaultFS = configuration.get(Constants.FS_DEFAULT_FS);

if (StringUtils.isBlank(defaultFS)) {
defaultFS = PropertyUtils.getString(Constants.FS_DEFAULT_FS);
}

String defaultFS = getDefaultFS();
// first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
// the default is the local file system
if (StringUtils.isNotBlank(defaultFS)) {
Expand Down Expand Up @@ -189,7 +185,7 @@ public Configuration getConfiguration() {
public String getDefaultFS() {
String defaultFS = getConfiguration().get(Constants.FS_DEFAULT_FS);
if (StringUtils.isBlank(defaultFS)) {
defaultFS = PropertyUtils.getString(Constants.FS_DEFAULT_FS);
defaultFS = hdfsProperties.getDefaultFS();
}
return defaultFS;
}
Expand All @@ -207,18 +203,20 @@ public String getDefaultFS() {
public String getApplicationUrl(String applicationId) throws BaseException {

yarnEnabled = true;
String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : getAppAddress(APP_ADDRESS, RM_HA_IDS);
String appUrl = StringUtils.isEmpty(hdfsProperties.getYarnResourceRmIds())
? hdfsProperties.getYarnAppStatusAddress()
: getAppAddress(hdfsProperties.getYarnAppStatusAddress(), hdfsProperties.getYarnResourceRmIds());
if (StringUtils.isBlank(appUrl)) {
throw new BaseException("yarn application url generation failed");
}
logger.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId);
return String.format(appUrl, HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId);
return String.format(appUrl, hdfsProperties.getHadoopResourceManagerHttpAddressPort(), applicationId);
}

public String getJobHistoryUrl(String applicationId) {
// eg:application_1587475402360_712719 -> job_1587475402360_712719
String jobId = applicationId.replace("application", "job");
return String.format(JOB_HISTORY_ADDRESS, jobId);
return String.format(hdfsProperties.getYarnJobHistoryStatusAddress(), jobId);
}

/**
Expand Down Expand Up @@ -601,7 +599,7 @@ public boolean isYarnEnabled() {
* @return data hdfs path
*/
public static String getHdfsDataBasePath() {
String defaultFS = PropertyUtils.getString(Constants.FS_DEFAULT_FS);
String defaultFS = hdfsProperties.getDefaultFS();
defaultFS = defaultFS.endsWith("/") ? StringUtils.chop(defaultFS) : defaultFS;
if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) {
return defaultFS + "";
Expand Down Expand Up @@ -764,7 +762,8 @@ public static String getActiveRMName(String protocol, String rmIds) {

String[] rmIdArr = rmIds.split(Constants.COMMA);

String yarnUrl = protocol + "%s:" + HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info";
String yarnUrl =
protocol + "%s:" + hdfsProperties.getHadoopResourceManagerHttpAddressPort() + "/ws/v1/cluster/info";

try {

Expand All @@ -791,7 +790,7 @@ public static String getActiveRMName(String protocol, String rmIds) {
public static String getRMState(String url) {

String retStr = Boolean.TRUE
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
.equals(hdfsProperties.isHadoopSecurityAuthStartupState())
? KerberosHttpClient.get(url)
: HttpUtils.get(url);

Expand Down
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.storage.hdfs;

import static org.apache.dolphinscheduler.common.constants.Constants.FS_DEFAULT_FS;
import static org.apache.dolphinscheduler.common.constants.Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT;
import static org.apache.dolphinscheduler.common.constants.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE;
import static org.apache.dolphinscheduler.common.constants.Constants.HDFS_ROOT_USER;
import static org.apache.dolphinscheduler.common.constants.Constants.KERBEROS_EXPIRE_TIME;
import static org.apache.dolphinscheduler.common.constants.Constants.YARN_APPLICATION_STATUS_ADDRESS;
import static org.apache.dolphinscheduler.common.constants.Constants.YARN_JOB_HISTORY_STATUS_ADDRESS;
import static org.apache.dolphinscheduler.common.constants.Constants.YARN_RESOURCEMANAGER_HA_RM_IDS;

import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import lombok.Data;

import org.springframework.context.annotation.Configuration;

@Data
@Configuration
public class HdfsStorageProperties {

/**
* HDFS storage user
*/
private String user = PropertyUtils.getString(HDFS_ROOT_USER);

/**
* HDFS default fs
*/
private String defaultFS = PropertyUtils.getString(FS_DEFAULT_FS);

/**
* YARN resource manager HA RM ids
*/
private String yarnResourceRmIds = PropertyUtils.getString(YARN_RESOURCEMANAGER_HA_RM_IDS);

/**
* YARN application status address
*/
private String yarnAppStatusAddress = PropertyUtils.getString(YARN_APPLICATION_STATUS_ADDRESS);

/**
* YARN job history status address
*/
private String yarnJobHistoryStatusAddress = PropertyUtils.getString(YARN_JOB_HISTORY_STATUS_ADDRESS);

/**
* Hadoop resouece manager http address port
*/
private String hadoopResourceManagerHttpAddressPort =
PropertyUtils.getString(HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT);

/**
* Hadoop security authentication startup state
*/
private boolean hadoopSecurityAuthStartupState =
PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);

/**
* Kerberos expire time
*/
public static int getKerberosExpireTime() {
return PropertyUtils.getInt(KERBEROS_EXPIRE_TIME, 2);
}
}
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.storage.hdfs;

import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory;
import org.apache.dolphinscheduler.plugin.storage.api.StorageType;

import com.google.auto.service.AutoService;

@AutoService(StorageOperateFactory.class)
public class LocalStorageOperatorFactory implements StorageOperateFactory {

private static final String LOCAL_DEFAULT_FS = "file:///";

@Override
public StorageOperate createStorageOperate() {
HdfsStorageProperties hdfsStorageProperties = new HdfsStorageProperties();
hdfsStorageProperties.setDefaultFS(LOCAL_DEFAULT_FS);
return new HdfsStorageOperator(hdfsStorageProperties);
}

@Override
public StorageType getStorageOperate() {
return StorageType.LOCAL;
}
}
Expand Up @@ -39,34 +39,39 @@ public class HdfsStorageOperatorTest {

@Test
public void getHdfsTenantDir() {
logger.info(HdfsStorageOperator.getHdfsTenantDir("1234"));
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
logger.info(hdfsStorageOperator.getHdfsTenantDir("1234"));
Assertions.assertTrue(true);
}

@Test
public void getHdfsUdfFileName() {
logger.info(HdfsStorageOperator.getHdfsUdfFileName("admin", "file_name"));
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
logger.info(hdfsStorageOperator.getHdfsUdfFileName("admin", "file_name"));
Assertions.assertTrue(true);
}

@Test
public void getHdfsResourceFileName() {
logger.info(HdfsStorageOperator.getHdfsResourceFileName("admin", "file_name"));
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
logger.info(hdfsStorageOperator.getHdfsResourceFileName("admin", "file_name"));
Assertions.assertTrue(true);
}

@Test
public void getHdfsFileName() {
logger.info(HdfsStorageOperator.getHdfsFileName(ResourceType.FILE, "admin", "file_name"));
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
logger.info(hdfsStorageOperator.getHdfsFileName(ResourceType.FILE, "admin", "file_name"));
Assertions.assertTrue(true);
}

@Test
public void getAppAddress() {
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
try (MockedStatic<HttpUtils> mockedHttpUtils = Mockito.mockStatic(HttpUtils.class)) {
mockedHttpUtils.when(() -> HttpUtils.get("http://ds1:8088/ws/v1/cluster/info"))
.thenReturn("{\"clusterInfo\":{\"state\":\"STARTED\",\"haState\":\"ACTIVE\"}}");
logger.info(HdfsStorageOperator.getAppAddress("http://ds1:8088/ws/v1/cluster/apps/%s", "ds1,ds2"));
logger.info(hdfsStorageOperator.getAppAddress("http://ds1:8088/ws/v1/cluster/apps/%s", "ds1,ds2"));
Assertions.assertTrue(true);
}
}
Expand Down

0 comments on commit 78e5569

Please sign in to comment.