Skip to content

Commit

Permalink
Merge pull request #1203 from casionone/bugfix_0
Browse files Browse the repository at this point in the history
optimize the logic of tag update/delete and fix ddl bug
  • Loading branch information
peacewong committed Dec 15, 2021
2 parents 04a5253 + cd33e61 commit 1db0bd6
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 23 deletions.
20 changes: 12 additions & 8 deletions db/linkis_ddl.sql
Expand Up @@ -174,8 +174,10 @@ CREATE TABLE `linkis_ps_udf_tree` (
-- ----------------------------
DROP TABLE IF EXISTS `linkis_ps_udf_user_load_info`;
CREATE TABLE `linkis_ps_udf_user_load_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`udf_id` int(11) NOT NULL,
`user_name` varchar(50) NOT NULL
`user_name` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


Expand Down Expand Up @@ -530,7 +532,7 @@ create table if not exists linkis_ps_bml_project_user(
`id` int(10) NOT NULL AUTO_INCREMENT,
`project_id` int(10) NOT NULL,
`username` varchar(64) DEFAULT NULL,
`priv` int(10) not null default 7,
`priv` int(10) not null default 7, -- rwx 421 The permission value is 7. 8 is the administrator, which can authorize other users
`creator` varchar(128) not null,
`create_time` datetime DEFAULT now(),
`expire_time` datetime default null,
Expand Down Expand Up @@ -564,11 +566,13 @@ CREATE TABLE `linkis_ps_instance_label` (

DROP TABLE IF EXISTS `linkis_ps_instance_label_value_relation`;
CREATE TABLE `linkis_ps_instance_label_value_relation` (
`id` int(20) NOT NULL AUTO_INCREMENT,
`label_value_key` varchar(255) COLLATE utf8_bin NOT NULL COMMENT 'value key',
`label_value_content` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT 'value content',
`label_id` int(20) DEFAULT NULL COMMENT 'id reference linkis_ps_instance_label -> id',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'update unix timestamp',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'create unix timestamp',
PRIMARY KEY (`id`),
UNIQUE KEY `label_value_key_label_id` (`label_value_key`,`label_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

Expand Down Expand Up @@ -688,11 +692,13 @@ CREATE TABLE `linkis_cg_manager_label` (
DROP TABLE IF EXISTS `linkis_cg_manager_label_value_relation`;

CREATE TABLE `linkis_cg_manager_label_value_relation` (
`id` int(20) NOT NULL AUTO_INCREMENT,
`label_value_key` varchar(255) COLLATE utf8_bin NOT NULL,
`label_value_content` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`label_id` int(20) DEFAULT NULL,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `label_value_key_label_id` (`label_value_key`,`label_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

Expand Down Expand Up @@ -732,6 +738,7 @@ CREATE TABLE `linkis_cg_manager_label_user` (
DROP TABLE IF EXISTS `linkis_cg_manager_metrics_history`;

CREATE TABLE `linkis_cg_manager_metrics_history` (
`id` int(20) NOT NULL AUTO_INCREMENT,
`instance_status` int(20) DEFAULT NULL,
`overload` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`heartbeat_msg` varchar(255) COLLATE utf8_bin DEFAULT NULL,
Expand All @@ -740,7 +747,8 @@ CREATE TABLE `linkis_cg_manager_metrics_history` (
`creator` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`ticketID` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`serviceName` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`instance` varchar(255) COLLATE utf8_bin DEFAULT NULL
`instance` varchar(255) COLLATE utf8_bin DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

DROP TABLE IF EXISTS `linkis_cg_manager_service_instance_metrics`;
Expand Down Expand Up @@ -769,8 +777,4 @@ CREATE TABLE `linkis_cg_engine_conn_plugin_bml_resources` (
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'created time',
`last_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'updated time',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;




) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
Expand Up @@ -42,8 +42,8 @@ case class RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel: UserCreato
}
}

case class RequestQueryEngineTypeDefault(engineTypeLabel: EngineTypeLabel) extends CacheableProtocol with RetryableProtocol with ConfigProtocol

case class RequestQueryEngineTypeDefault(engineTypeLabel: EngineTypeLabel) extends CacheableProtocol with RetryableProtocol with ConfigProtocol

case class RequestConfigByLabel(labels: java.util.List[Label[_]], isMerge: Boolean = true) extends CacheableProtocol with RetryableProtocol with ConfigProtocol

Expand Up @@ -24,14 +24,15 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, ResponseQueryConfig}
import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig}
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
import org.apache.linkis.orchestrator.plugin.UserParallelOrchestratorPlugin
import org.apache.linkis.rpc.Sender
import org.apache.commons.lang.StringUtils
import org.apache.linkis.server.BDPJettyServerHelper

import scala.collection.JavaConverters._

Expand All @@ -52,14 +53,19 @@ class EntranceUserParallelOrchestratorPlugin extends UserParallelOrchestratorPlu
override def load(key: String): Integer = {
val (userCreatorLabel, engineTypeLabel) = fromKeyGetLabels(key)
val keyAndValue = Utils.tryAndWarnMsg {
sender.ask(RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue
sender.ask(RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue
}("Get user configurations from configuration server failed! Next use the default value to continue.")
if(null == keyAndValue || !keyAndValue.containsKey(EntranceConfiguration.WDS_LINKIS_INSTANCE.key)){
error(s"cannot found user configuration key:${EntranceConfiguration.WDS_LINKIS_INSTANCE.key}," +
s"will use default value ${EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue}。All config map: ${BDPJettyServerHelper.gson.toJson(keyAndValue)}")
}
val maxRunningJobs = EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue)
maxRunningJobs
}
})



override def getUserMaxRunningJobs(user: String, labels: util.List[Label[_]]): Int = {


Expand Down
Expand Up @@ -24,7 +24,7 @@ import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
import org.apache.linkis.entrance.execute.EntranceJob
import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, ResponseQueryConfig}
import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig}
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{ConcurrentEngineConnLabel, EngineTypeLabel, UserCreatorLabel}
import org.apache.linkis.protocol.constants.TaskConstant
Expand Down Expand Up @@ -76,7 +76,7 @@ class EntranceGroupFactory extends GroupFactory with Logging {
}
info(s"Getting user configurations for $groupName(正在为 $groupName 获取参数) userCreatorLabel: ${userCreatorLabel.getStringValue}, engineTypeLabel:${engineTypeLabel.getStringValue}.")
val keyAndValue = Utils.tryAndWarnMsg {
sender.ask(RequestQueryEngineConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue
sender.ask(RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel, engineTypeLabel)).asInstanceOf[ResponseQueryConfig].getKeyAndValue
}("Get user configurations from configuration server failed! Next use the default value to continue.")
val maxRunningJobs = EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue)
val initCapacity = GROUP_INIT_CAPACITY.getValue(keyAndValue)
Expand Down
Expand Up @@ -119,17 +119,29 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
if(!CollectionUtils.isEmpty(willBeDelete)){
nodeLabels.foreach(nodeLabel => {
if(modifiableKeyList.contains(nodeLabel.getLabelKey) && willBeDelete.contains(nodeLabel.getLabelKey)){
labelManagerPersistence.removeLabel(nodeLabel.getId)
val labelIds = new util.ArrayList[Integer]()
labelIds.add(nodeLabel.getId)
labelManagerPersistence.removeNodeLabels(instance, labelIds)
}
})
}
if(!CollectionUtils.isEmpty(willBeUpdate)){
/**
* update step:
* 1.delete relations of old labels
* 2.add new relation between new labels and instance
*/
if (!CollectionUtils.isEmpty(willBeUpdate)) {
labels.foreach(label => {
if(modifiableKeyList.contains(label.getLabelKey) && willBeUpdate.contains(label.getLabelKey)){
nodeLabels.filter(_.getLabelKey.equals(label.getLabelKey)).foreach(oldLabel => {
val persistenceLabel = LabelManagerUtils.convertPersistenceLabel(label)
persistenceLabel.setId(oldLabel.getId)
labelManagerPersistence.updateLabel(persistenceLabel.getId, persistenceLabel)
val labelIds = new util.ArrayList[Integer]()
labelIds.add(oldLabel.getId)
labelManagerPersistence.removeNodeLabels(instance, labelIds)
val newLabelId = tryToAddLabel(persistenceLabel)
labelIds.remove(oldLabel.getId)
labelIds.add(newLabelId)
labelManagerPersistence.addLabelToNode(instance, labelIds)
})
}
})
Expand Down
Expand Up @@ -33,6 +33,7 @@ import org.apache.linkis.manager.label.entity.{EngineNodeLabel, Label}
import org.apache.linkis.manager.label.service.NodeLabelService
import org.apache.linkis.manager.service.common.label.LabelFilter
import org.apache.commons.collections.MapUtils
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service

Expand Down Expand Up @@ -91,12 +92,21 @@ class DefaultEMEngineService extends EMEngineService with Logging {
if (MapUtils.isEmpty(instanceAndLabels)) {
new AMErrorException(AMConstant.EM_ERROR_CODE, "No corresponding EM")
}
val nodes = getEMNodes(instanceAndLabels.keys.toArray)
// TODO add em select rule to do this
val emInstanceLabelOption = labels.find(_.isInstanceOf[EMInstanceLabel])
val filterInstanceAndLabel = if (emInstanceLabelOption.isDefined) {
val emInstanceLabel = emInstanceLabelOption.get.asInstanceOf[EMInstanceLabel]
info(s"use emInstanceLabel , will be route to ${emInstanceLabel.getServiceInstance}")
instanceAndLabels.filter(_._1.getServiceInstance.equals(emInstanceLabel.getServiceInstance))
} else {
instanceAndLabels.toMap
}
val nodes = getEMNodes(filterInstanceAndLabel.keys.toArray)
if (null == nodes) {
return null
}
nodes.foreach { node =>
val persistenceLabel = instanceAndLabels.find(_._1.getServiceInstance.equals(node.getServiceInstance)).map(_._2)
val persistenceLabel = filterInstanceAndLabel.find(_._1.getServiceInstance.equals(node.getServiceInstance)).map(_._2)
persistenceLabel.foreach(labelList => node.setLabels(labelList.map(ManagerUtils.persistenceLabelToRealLabel)))
}
nodes
Expand Down
Expand Up @@ -81,6 +81,9 @@ class PythonEngineConnExecutor(id: Int, pythonSession: PythonSession, outputPrin

override def getProgressInfo: Array[JobProgressInfo] = {
val jobProgressInfo = new ArrayBuffer[JobProgressInfo]()
if (null == this.engineExecutionContext) {
return jobProgressInfo.toArray
}
if (0.0f == progress()) {
jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 1, 0, 0)
} else {
Expand Down
Expand Up @@ -110,6 +110,9 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging

override def getProgressInfo: Array[JobProgressInfo] = {
val jobProgressInfo = new ArrayBuffer[JobProgressInfo]()
if (null == this.engineExecutionContext) {
return jobProgressInfo.toArray
}
if (0.0f == progress()) {
jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 1, 0, 0)
} else {
Expand Down
Expand Up @@ -28,4 +28,8 @@ object Configuration {
val ENGINE_TYPE = CommonVars.apply("wds.linkis.configuration.engine.type", EngineType.getAllEngineTypes.asScala.mkString(","))

val MANAGER_SPRING_NAME = CommonVars("wds.linkis.engineconn.manager.name", "linkis-cg-linkismanager")

val GLOBAL_CONF_CHN_NAME = "全局设置"

val GLOBAL_CONF_CHN_OLDNAME = "通用设置"
}
Expand Up @@ -18,7 +18,7 @@
package org.apache.linkis.configuration.receiver

import org.apache.linkis.configuration.service.ConfigurationService
import org.apache.linkis.governance.common.protocol.conf.{RequestConfigByLabel, RequestQueryEngineConfig, RequestQueryEngineTypeDefault, RequestQueryGlobalConfig}
import org.apache.linkis.governance.common.protocol.conf.{RequestConfigByLabel, RequestQueryEngineConfig, RequestQueryEngineConfigWithGlobalConfig, RequestQueryEngineTypeDefault, RequestQueryGlobalConfig}
import org.apache.linkis.rpc.{Receiver, Sender}

import scala.concurrent.duration.Duration
Expand All @@ -37,6 +37,7 @@ class ConfigurationReceiver extends Receiver{
override def receiveAndReply(message: Any, sender: Sender): Any = message match {
case RequestQueryGlobalConfig(username) => configurationService.queryGlobalConfig(username)
case RequestQueryEngineTypeDefault(engineType) => configurationService.queryDefaultEngineConfig(engineType)
case RequestQueryEngineConfigWithGlobalConfig(userCreatorLabel,engineTypeLabel,filter) => configurationService.queryConfigWithGlobal(userCreatorLabel,engineTypeLabel,filter)
case RequestQueryEngineConfig(userCreatorLabel,engineTypeLabel,filter) => configurationService.queryConfig(userCreatorLabel,engineTypeLabel,filter)
case RequestConfigByLabel(labelList,isMerge) => configurationService.queryConfigByLabel(labelList, isMerge)
}
Expand Down
Expand Up @@ -27,8 +27,8 @@ import org.apache.linkis.configuration.util.LabelEntityParser
import org.apache.linkis.manager.label.builder.CombinedLabelBuilder
import org.apache.linkis.manager.label.entity.CombinedLabel
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel}
import org.apache.linkis.manager.label.utils.LabelUtils
import org.apache.commons.lang.StringUtils
import org.apache.linkis.configuration.conf.Configuration
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
Expand Down Expand Up @@ -158,8 +158,8 @@ class CategoryService extends Logging{
throw new ConfigurationException(s"${engineType}-${version} is exist, cannot be created(${engineType}-${version}已经存在,无法创建)")
}
val creator = categoryList.getCategoryName match {
case "全局设置" => LabelUtils.COMMON_VALUE
case "通用设置" => LabelUtils.COMMON_VALUE
case Configuration.GLOBAL_CONF_CHN_NAME | Configuration.GLOBAL_CONF_CHN_OLDNAME =>
throw new ConfigurationException("Global setting do not allow the configuration of engines to be added(全局设置不允许添加引擎配置!)")
case _ => categoryList.getCategoryName
}
val combinedLabel = configurationService.generateCombinedLabel(engineType,version,null,creator)
Expand Down
Expand Up @@ -379,6 +379,16 @@ class ConfigurationService extends Logging {
queryConfigByLabel(labelList, true,filter)
}

def queryConfigWithGlobal(userCreatorLabel: UserCreatorLabel, engineTypeLabel: EngineTypeLabel, filter: String): ResponseQueryConfig ={
val globalConfig = queryGlobalConfig(userCreatorLabel.getUser)
val engineConfig = queryConfig(userCreatorLabel, engineTypeLabel, filter)
globalConfig.getKeyAndValue.asScala.foreach(keyAndValue => {
if(!engineConfig.getKeyAndValue.containsKey(keyAndValue._1)){
engineConfig.getKeyAndValue.put(keyAndValue._1, keyAndValue._2)
}
})
engineConfig
}

private def getMap(all: util.List[ConfigTree], user: util.List[ConfigTree], filter: String = null): util.Map[String, String] = {
val map = new util.HashMap[String, String]()
Expand Down

0 comments on commit 1db0bd6

Please sign in to comment.