From c74b4d51ea4dd7a7b41b6f687eb3ddc82133c0b6 Mon Sep 17 00:00:00 2001 From: casionone Date: Wed, 8 Dec 2021 16:37:01 +0800 Subject: [PATCH 1/8] fix ddl bugs (cherry picked from commit c6265bd676127025008d98752e2d8eba0aa2fe97) # Conflicts: # db/linkis_ddl.sql # linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/com/webank/wedatasphere/linkis/manager/am/service/engine/DefaultEngineConnStatusCallbackService.scala fix ddl bugs:add AUTO_INCREMENT PRIMARY KEY id for some tables --- db/linkis_ddl.sql | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/db/linkis_ddl.sql b/db/linkis_ddl.sql index 20c9530ba3..8861684b1b 100644 --- a/db/linkis_ddl.sql +++ b/db/linkis_ddl.sql @@ -174,8 +174,10 @@ CREATE TABLE `linkis_ps_udf_tree` ( -- ---------------------------- DROP TABLE IF EXISTS `linkis_ps_udf_user_load_info`; CREATE TABLE `linkis_ps_udf_user_load_info` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT, `udf_id` int(11) NOT NULL, - `user_name` varchar(50) NOT NULL + `user_name` varchar(50) NOT NULL, + PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; @@ -530,7 +532,7 @@ create table if not exists linkis_ps_bml_project_user( `id` int(10) NOT NULL AUTO_INCREMENT, `project_id` int(10) NOT NULL, `username` varchar(64) DEFAULT NULL, - `priv` int(10) not null default 7, + `priv` int(10) not null default 7, -- rwx 421 相加, 8是管理员,可以为其他用户授权 `creator` varchar(128) not null, `create_time` datetime DEFAULT now(), `expire_time` datetime default null, @@ -564,11 +566,13 @@ CREATE TABLE `linkis_ps_instance_label` ( DROP TABLE IF EXISTS `linkis_ps_instance_label_value_relation`; CREATE TABLE `linkis_ps_instance_label_value_relation` ( + `id` int(20) NOT NULL AUTO_INCREMENT, `label_value_key` varchar(255) COLLATE utf8_bin NOT NULL COMMENT 'value key', `label_value_content` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT 'value content', `label_id` int(20) DEFAULT NULL COMMENT 'id reference linkis_ps_instance_label -> id', `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'update unix timestamp', `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'create unix timestamp', + PRIMARY KEY (`id`), UNIQUE KEY `label_value_key_label_id` (`label_value_key`,`label_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; @@ -688,11 +692,13 @@ CREATE TABLE `linkis_cg_manager_label` ( DROP TABLE IF EXISTS `linkis_cg_manager_label_value_relation`; CREATE TABLE `linkis_cg_manager_label_value_relation` ( + `id` int(20) NOT NULL AUTO_INCREMENT, `label_value_key` varchar(255) COLLATE utf8_bin NOT NULL, `label_value_content` varchar(255) COLLATE utf8_bin DEFAULT NULL, `label_id` int(20) DEFAULT NULL, `update_time` datetime DEFAULT CURRENT_TIMESTAMP, `create_time` datetime DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), UNIQUE KEY `label_value_key_label_id` (`label_value_key`,`label_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; @@ -732,6 +738,7 @@ CREATE TABLE `linkis_cg_manager_label_user` ( DROP TABLE IF EXISTS `linkis_cg_manager_metrics_history`; CREATE TABLE `linkis_cg_manager_metrics_history` ( + `id` int(20) NOT NULL AUTO_INCREMENT, `instance_status` int(20) DEFAULT NULL, `overload` varchar(255) COLLATE utf8_bin DEFAULT NULL, `heartbeat_msg` varchar(255) COLLATE utf8_bin DEFAULT NULL, @@ -740,7 +747,8 @@ CREATE TABLE `linkis_cg_manager_metrics_history` ( `creator` varchar(255) COLLATE utf8_bin DEFAULT NULL, `ticketID` varchar(255) COLLATE utf8_bin DEFAULT NULL, `serviceName` varchar(255) COLLATE utf8_bin DEFAULT NULL, - `instance` varchar(255) COLLATE utf8_bin DEFAULT NULL + `instance` varchar(255) COLLATE utf8_bin DEFAULT NULL, + PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; DROP TABLE IF EXISTS `linkis_cg_manager_service_instance_metrics`; @@ -769,8 +777,4 @@ CREATE TABLE `linkis_cg_engine_conn_plugin_bml_resources` ( `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'created time', `last_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'updated time', PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4; - - - - +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4; \ No newline at end of file From 6ea91a16107b4fb4ea31a3b321c3d77358ededaa Mon Sep 17 00:00:00 2001 From: casionone Date: Wed, 8 Dec 2021 16:58:35 +0800 Subject: [PATCH 2/8] fix the bug of adding engine in configuration global settings --- .../apache/linkis/configuration/conf/Configuration.scala | 4 ++++ .../linkis/configuration/service/CategoryService.scala | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/conf/Configuration.scala b/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/conf/Configuration.scala index badc0ee2d6..6407f01515 100644 --- a/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/conf/Configuration.scala +++ b/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/conf/Configuration.scala @@ -28,4 +28,8 @@ object Configuration { val ENGINE_TYPE = CommonVars.apply("wds.linkis.configuration.engine.type", EngineType.getAllEngineTypes.asScala.mkString(",")) val MANAGER_SPRING_NAME = CommonVars("wds.linkis.engineconn.manager.name", "linkis-cg-linkismanager") + + val GLOBAL_CONF_CHN_NAME = "全局设置" + + val GLOBAL_CONF_CHN_OLDNAME = "通用设置" } diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/CategoryService.scala b/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/CategoryService.scala index bd58c7bca0..6362d8e3be 100644 --- a/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/CategoryService.scala +++ b/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/CategoryService.scala @@ -27,8 +27,8 @@ import org.apache.linkis.configuration.util.LabelEntityParser import org.apache.linkis.manager.label.builder.CombinedLabelBuilder import org.apache.linkis.manager.label.entity.CombinedLabel import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel} -import org.apache.linkis.manager.label.utils.LabelUtils import org.apache.commons.lang.StringUtils +import org.apache.linkis.configuration.conf.Configuration import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional @@ -158,8 +158,8 @@ class CategoryService extends Logging{ throw new ConfigurationException(s"${engineType}-${version} is exist, cannot be created(${engineType}-${version}已经存在,无法创建)") } val creator = categoryList.getCategoryName match { - case "全局设置" => LabelUtils.COMMON_VALUE - case "通用设置" => LabelUtils.COMMON_VALUE + case Configuration.GLOBAL_CONF_CHN_NAME | Configuration.GLOBAL_CONF_CHN_OLDNAME => + throw new ConfigurationException("Global setting do not allow the configuration of engines to be added(全局设置不允许添加引擎配置!)") case _ => categoryList.getCategoryName } val combinedLabel = configurationService.generateCombinedLabel(engineType,version,null,creator) From d665d39fcbc0aadab4c5702c6959edf82662dbd0 Mon Sep 17 00:00:00 2001 From: casionone Date: Wed, 8 Dec 2021 17:18:08 +0800 Subject: [PATCH 3/8] Add fixed EM routing --- .../am/service/em/DefaultEMEngineService.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala index 57d7049125..b0c78966ce 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala @@ -33,6 +33,7 @@ import org.apache.linkis.manager.label.entity.{EngineNodeLabel, Label} import org.apache.linkis.manager.label.service.NodeLabelService import org.apache.linkis.manager.service.common.label.LabelFilter import org.apache.commons.collections.MapUtils +import org.apache.linkis.manager.label.entity.em.EMInstanceLabel import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @@ -91,12 +92,21 @@ class DefaultEMEngineService extends EMEngineService with Logging { if (MapUtils.isEmpty(instanceAndLabels)) { new AMErrorException(AMConstant.EM_ERROR_CODE, "No corresponding EM") } - val nodes = getEMNodes(instanceAndLabels.keys.toArray) + // TODO add em select rule to do this + val emInstanceLabelOption = labels.find(_.isInstanceOf[EMInstanceLabel]) + val filterInstanceAndLabel = if (emInstanceLabelOption.isDefined) { + val emInstanceLabel = emInstanceLabelOption.get.asInstanceOf[EMInstanceLabel] + info(s"use emInstanceLabel , will be route to ${emInstanceLabel.getServiceInstance}") + instanceAndLabels.filter(_._1.getServiceInstance.equals(emInstanceLabel.getServiceInstance)) + } else { + instanceAndLabels.toMap + } + val nodes = getEMNodes(filterInstanceAndLabel.keys.toArray) if (null == nodes) { return null } nodes.foreach { node => - val persistenceLabel = instanceAndLabels.find(_._1.getServiceInstance.equals(node.getServiceInstance)).map(_._2) + val persistenceLabel = filterInstanceAndLabel.find(_._1.getServiceInstance.equals(node.getServiceInstance)).map(_._2) persistenceLabel.foreach(labelList => node.setLabels(labelList.map(ManagerUtils.persistenceLabelToRealLabel))) } nodes From a9834cddbfa0d5903e3b9a7d6fdcb1ea03cbde1c Mon Sep 17 00:00:00 2001 From: casionone Date: Wed, 8 Dec 2021 17:21:03 +0800 Subject: [PATCH 4/8] Optimize the logic of tag update and delete, and solve the problem of deleting public tags --- .../impl/DefaultNodeLabelService.scala | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/label-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala b/linkis-computation-governance/linkis-manager/label-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala index 3d6e9a4ece..765c630393 100644 --- a/linkis-computation-governance/linkis-manager/label-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala +++ b/linkis-computation-governance/linkis-manager/label-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala @@ -119,17 +119,29 @@ class DefaultNodeLabelService extends NodeLabelService with Logging { if(!CollectionUtils.isEmpty(willBeDelete)){ nodeLabels.foreach(nodeLabel => { if(modifiableKeyList.contains(nodeLabel.getLabelKey) && willBeDelete.contains(nodeLabel.getLabelKey)){ - labelManagerPersistence.removeLabel(nodeLabel.getId) + val labelIds = new util.ArrayList[Integer]() + labelIds.add(nodeLabel.getId) + labelManagerPersistence.removeNodeLabels(instance, labelIds) } }) } - if(!CollectionUtils.isEmpty(willBeUpdate)){ + /** + * update step: + * 1.delete relations of old labels + * 2.add new relation between new labels and instance + */ + if (!CollectionUtils.isEmpty(willBeUpdate)) { labels.foreach(label => { if(modifiableKeyList.contains(label.getLabelKey) && willBeUpdate.contains(label.getLabelKey)){ nodeLabels.filter(_.getLabelKey.equals(label.getLabelKey)).foreach(oldLabel => { val persistenceLabel = LabelManagerUtils.convertPersistenceLabel(label) - persistenceLabel.setId(oldLabel.getId) - labelManagerPersistence.updateLabel(persistenceLabel.getId, persistenceLabel) + val labelIds = new util.ArrayList[Integer]() + labelIds.add(oldLabel.getId) + labelManagerPersistence.removeNodeLabels(instance, labelIds) + val newLabelId = tryToAddLabel(persistenceLabel) + labelIds.remove(oldLabel.getId) + labelIds.add(newLabelId) + labelManagerPersistence.addLabelToNode(instance, labelIds) }) } }) From a0c420e2c3b0e5ab78085c5b3fcc172c6ad3aa83 Mon Sep 17 00:00:00 2001 From: casionone Date: Wed, 8 Dec 2021 17:37:10 +0800 Subject: [PATCH 5/8] fix NPE for engine plugin --- .../python/executor/PythonEngineConnExecutor.scala | 3 +++ .../engineplugin/shell/executor/ShellEngineConnExecutor.scala | 3 +++ 2 files changed, 6 insertions(+) diff --git a/linkis-engineconn-plugins/engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala index 30c455c137..32e312b7a2 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonEngineConnExecutor.scala @@ -81,6 +81,9 @@ class PythonEngineConnExecutor(id: Int, pythonSession: PythonSession, outputPrin override def getProgressInfo: Array[JobProgressInfo] = { val jobProgressInfo = new ArrayBuffer[JobProgressInfo]() + if (null == this.engineExecutionContext) { + return jobProgressInfo.toArray + } if (0.0f == progress()) { jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 1, 0, 0) } else { diff --git a/linkis-engineconn-plugins/engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala index 932f32cc8a..21002c7a4f 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala @@ -109,6 +109,9 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging override def getProgressInfo: Array[JobProgressInfo] = { val jobProgressInfo = new ArrayBuffer[JobProgressInfo]() + if (null == this.engineExecutionContext) { + return jobProgressInfo.toArray + } if (0.0f == progress()) { jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 1, 0, 0) } else { From 3de644bb8adaa7217ff31f6b9e8d046741fcf19b Mon Sep 17 00:00:00 2001 From: casionone Date: Wed, 8 Dec 2021 17:49:05 +0800 Subject: [PATCH 6/8] Increase the rpc method of configuration --- .../protocol/conf/RequestQueryGlobalConfig.scala | 9 ++++++++- .../configuration/receiver/ConfigurationReceiver.scala | 3 ++- .../configuration/service/ConfigurationService.scala | 10 ++++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/RequestQueryGlobalConfig.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/RequestQueryGlobalConfig.scala index c409880687..42a1b5d51b 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/RequestQueryGlobalConfig.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/RequestQueryGlobalConfig.scala @@ -32,10 +32,17 @@ case class RequestQueryGlobalConfig(username: String) extends CacheableProtocol case class RequestQueryEngineConfig(userCreatorLabel: UserCreatorLabel, engineTypeLabel: EngineTypeLabel, filter: String = null) extends CacheableProtocol with RetryableProtocol with ConfigProtocol{ override def toString: String = { - RequestQueryGlobalConfig.getClass.getName + "," + userCreatorLabel.getStringValue + "," + engineTypeLabel.getStringValue + RequestQueryEngineConfig.getClass.getName + "," + userCreatorLabel.getStringValue + "," + engineTypeLabel.getStringValue } } +case class RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel: UserCreatorLabel, engineTypeLabel: EngineTypeLabel, filter: String = null) extends CacheableProtocol with RetryableProtocol with ConfigProtocol{ + override def toString: String = { + RequestQueryEngineConfigWithGlobalConfig.getClass.getName + "," + userCreatorLabel.getStringValue + "," + engineTypeLabel.getStringValue + } +} + + case class RequestQueryEngineTypeDefault(engineTypeLabel: EngineTypeLabel) extends CacheableProtocol with RetryableProtocol with ConfigProtocol case class RequestConfigByLabel(labels: java.util.List[Label[_]], isMerge: Boolean = true) extends CacheableProtocol with RetryableProtocol with ConfigProtocol diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/receiver/ConfigurationReceiver.scala b/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/receiver/ConfigurationReceiver.scala index 118e4780a2..bb6107967c 100644 --- a/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/receiver/ConfigurationReceiver.scala +++ b/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/receiver/ConfigurationReceiver.scala @@ -18,7 +18,7 @@ package org.apache.linkis.configuration.receiver import org.apache.linkis.configuration.service.ConfigurationService -import org.apache.linkis.governance.common.protocol.conf.{RequestConfigByLabel, RequestQueryEngineConfig, RequestQueryEngineTypeDefault, RequestQueryGlobalConfig} +import org.apache.linkis.governance.common.protocol.conf.{RequestConfigByLabel, RequestQueryEngineConfig, RequestQueryEngineConfigWithGlobalConfig, RequestQueryEngineTypeDefault, RequestQueryGlobalConfig} import org.apache.linkis.rpc.{Receiver, Sender} import scala.concurrent.duration.Duration @@ -37,6 +37,7 @@ class ConfigurationReceiver extends Receiver{ override def receiveAndReply(message: Any, sender: Sender): Any = message match { case RequestQueryGlobalConfig(username) => configurationService.queryGlobalConfig(username) case RequestQueryEngineTypeDefault(engineType) => configurationService.queryDefaultEngineConfig(engineType) + case RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel,engineTypeLabel,filter) => configurationService.queryConfigWithGlobal(userCreatorLabel,engineTypeLabel,filter) case RequestQueryEngineConfig(userCreatorLabel,engineTypeLabel,filter) => configurationService.queryConfig(userCreatorLabel,engineTypeLabel,filter) case RequestConfigByLabel(labelList,isMerge) => configurationService.queryConfigByLabel(labelList, isMerge) } diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/ConfigurationService.scala b/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/ConfigurationService.scala index 889f2e94c3..a63fc2149e 100644 --- a/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/ConfigurationService.scala +++ b/linkis-public-enhancements/linkis-publicservice/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/ConfigurationService.scala @@ -379,6 +379,16 @@ class ConfigurationService extends Logging { queryConfigByLabel(labelList, true,filter) } + def queryConfigWithGlobal(userCreatorLabel: UserCreatorLabel, engineTypeLabel: EngineTypeLabel, filter: String): ResponseQueryConfig ={ + val globalConfig = queryGlobalConfig(userCreatorLabel.getUser) + val engineConfig = queryConfig(userCreatorLabel, engineTypeLabel, filter) + globalConfig.getKeyAndValue.asScala.foreach(keyAndValue => { + if(!engineConfig.getKeyAndValue.containsKey(keyAndValue._1)){ + engineConfig.getKeyAndValue.put(keyAndValue._1, keyAndValue._2) + } + }) + engineConfig + } private def getMap(all: util.List[ConfigTree], user: util.List[ConfigTree], filter: String = null): util.Map[String, String] = { val map = new util.HashMap[String, String]() From 36c202ff55fec7bcf8f0a1dbad0bf038ab7776cb Mon Sep 17 00:00:00 2001 From: casionone Date: Wed, 8 Dec 2021 17:52:06 +0800 Subject: [PATCH 7/8] fix the problem of the upper limit of user concurrent tasks --- .../EntranceUserParallelOrchestratorPlugin.scala | 10 ++++++++-- .../entrance/scheduler/EntranceGroupFactory.scala | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/orchestrator/plugin/EntranceUserParallelOrchestratorPlugin.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/orchestrator/plugin/EntranceUserParallelOrchestratorPlugin.scala index e36758f1d5..50ce2c2c5b 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/orchestrator/plugin/EntranceUserParallelOrchestratorPlugin.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/orchestrator/plugin/EntranceUserParallelOrchestratorPlugin.scala @@ -24,7 +24,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.linkis.common.conf.Configuration import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.entrance.conf.EntranceConfiguration -import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, ResponseQueryConfig} +import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig} import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext import org.apache.linkis.manager.label.constant.LabelKeyConstant import org.apache.linkis.manager.label.entity.Label @@ -32,6 +32,7 @@ import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreat import org.apache.linkis.orchestrator.plugin.UserParallelOrchestratorPlugin import org.apache.linkis.rpc.Sender import org.apache.commons.lang.StringUtils +import org.apache.linkis.server.BDPJettyServerHelper import scala.collection.JavaConverters._ @@ -52,14 +53,19 @@ class EntranceUserParallelOrchestratorPlugin extends UserParallelOrchestratorPlu override def load(key: String): Integer = { val (userCreatorLabel, engineTypeLabel) = fromKeyGetLabels(key) val keyAndValue = Utils.tryAndWarnMsg { - sender.ask(RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue + sender.ask(RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue }("Get user configurations from configuration server failed! Next use the default value to continue.") + if(null == keyAndValue || !keyAndValue.containsKey(EntranceConfiguration.WDS_LINKIS_INSTANCE.key)){ + error(s"cannot found user configuration key:${EntranceConfiguration.WDS_LINKIS_INSTANCE.key}," + + s"will use default value ${EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue}。All config map: ${BDPJettyServerHelper.gson.toJson(keyAndValue)}") + } val maxRunningJobs = EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue) maxRunningJobs } }) + override def getUserMaxRunningJobs(user: String, labels: util.List[Label[_]]): Int = { diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala index b5720fa182..6e925711f0 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala @@ -24,7 +24,7 @@ import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.entrance.conf.EntranceConfiguration import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException} import org.apache.linkis.entrance.execute.EntranceJob -import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, ResponseQueryConfig} +import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig} import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{ConcurrentEngineConnLabel, EngineTypeLabel, UserCreatorLabel} import org.apache.linkis.protocol.constants.TaskConstant @@ -76,7 +76,7 @@ class EntranceGroupFactory extends GroupFactory with Logging { } info(s"Getting user configurations for $groupName(正在为 $groupName 获取参数) userCreatorLabel: ${userCreatorLabel.getStringValue}, engineTypeLabel:${engineTypeLabel.getStringValue}.") val keyAndValue = Utils.tryAndWarnMsg { - sender.ask(RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue + sender.ask(RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue }("Get user configurations from configuration server failed! Next use the default value to continue.") val maxRunningJobs = EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue) val initCapacity = GROUP_INIT_CAPACITY.getValue(keyAndValue) From cd52dfc8197762edf61803a7ab9183da8e3d3df9 Mon Sep 17 00:00:00 2001 From: casionone Date: Wed, 15 Dec 2021 15:45:39 +0800 Subject: [PATCH 8/8] ddl:annotations adjusted to English --- db/linkis_ddl.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/linkis_ddl.sql b/db/linkis_ddl.sql index 8861684b1b..0d345f3d92 100644 --- a/db/linkis_ddl.sql +++ b/db/linkis_ddl.sql @@ -532,7 +532,7 @@ create table if not exists linkis_ps_bml_project_user( `id` int(10) NOT NULL AUTO_INCREMENT, `project_id` int(10) NOT NULL, `username` varchar(64) DEFAULT NULL, - `priv` int(10) not null default 7, -- rwx 421 相加, 8是管理员,可以为其他用户授权 + `priv` int(10) not null default 7, -- rwx 421 The permission value is 7. 8 is the administrator, which can authorize other users `creator` varchar(128) not null, `create_time` datetime DEFAULT now(), `expire_time` datetime default null,