diff --git a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala index 2d2cf556b0..250e20fa6f 100644 --- a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala +++ b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala @@ -63,28 +63,7 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin override def applyMark(markReq: MarkReq): Mark = { if (null == markReq) return null - val mark = MARK_CACHE_LOCKER.synchronized { - val markCache = getMarkCache().asScala.keys - val maybeMark = markCache.find(_.getMarkReq.equals(markReq)) - maybeMark.orNull - } - if (null == mark) { - if (markReq.getLabels.containsKey(LabelKeyConstant.BIND_ENGINE_KEY)) { - val bindEngineLabel = MarkReq.getLabelBuilderFactory.createLabel[BindEngineLabel]( - LabelKeyConstant.BIND_ENGINE_KEY, - markReq.getLabels.get(LabelKeyConstant.BIND_ENGINE_KEY) - ) - if (!bindEngineLabel.getIsJobGroupHead) { - val msg = - s"Cannot find mark related to bindEngineLabel : ${bindEngineLabel.getStringValue}" - logger.error(msg) - throw new ECMPluginErrorException(ECMPluginConf.ECM_MARK_CACHE_ERROR_CODE, msg) - } - } - createMark(markReq) - } else { - mark - } + createMark(markReq) } override def createMark(markReq: MarkReq): Mark = { diff --git a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala index 128a915b1f..de996a3532 100644 --- a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala +++ b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala @@ -26,9 +26,12 @@ import org.apache.linkis.orchestrator.ecm.entity.{Mark, MarkReq, Policy} import org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor +import org.apache.commons.collections.CollectionUtils + import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer /** */ @@ -97,9 +100,8 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging private val engineConnExecutorCache = new util.HashMap[ServiceInstance, EngineConnExecutor]() - private val markCache = new util.HashMap[Mark, util.List[ServiceInstance]]() - - protected val MARK_CACHE_LOCKER = new Object() + private val markCache: util.Map[Mark, util.List[ServiceInstance]] = + new util.concurrent.ConcurrentHashMap[Mark, util.List[ServiceInstance]]() override def setEngineConnApplyAttempts(attemptNumber: Int): Unit = this.attemptNumber = attemptNumber @@ -148,7 +150,7 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging val executors = Utils.tryAndWarn { instances.asScala.map(getEngineConnExecutorCache().get(_)).filter(null != _).sortBy { executor => - if (null == executor.getRunningTaskCount) { + if (executor.getRunningTaskCount < 0) { 0 } else { executor.getRunningTaskCount @@ -162,7 +164,6 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging return Some(executors.headOption.get) } for (executor <- executors) { - // todo check if (executor.useEngineConn) { logger.info( s"mark ${mark.getMarkId()} Finished to getAvailableEngineConnExecutor by reuse" @@ -189,31 +190,28 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging } protected def addMark(mark: Mark, instances: util.List[ServiceInstance]): Unit = - MARK_CACHE_LOCKER.synchronized { - if (null != mark && !getMarkCache().containsKey(mark)) { - logger.info(s"add mark ${mark.getMarkId()}") - getMarkCache().put(mark, instances) - } + if (null != mark && !getMarkCache().containsKey(mark)) { + logger.info(s"add mark ${mark.getMarkId()}") + getMarkCache().put(mark, instances) } - protected def getInstances(mark: Mark): util.List[ServiceInstance] = - MARK_CACHE_LOCKER.synchronized { - if (null != mark && getMarkCache().containsKey(mark)) { - getMarkCache().get(mark) - } else { - null - } - } + protected def getInstances(mark: Mark): util.List[ServiceInstance] = if (null != mark) { + getMarkCache().get(mark) + } else { + null + } - protected def getMarksByInstance(serviceInstance: ServiceInstance): Array[Mark] = - MARK_CACHE_LOCKER.synchronized { - getMarkCache().asScala - .filter { keyValue => - keyValue._2.asScala.exists(serviceInstance.equals(_)) - } - .keys - .toArray + protected def getMarksByInstance(serviceInstance: ServiceInstance): Array[Mark] = { + val markAndInstance = getMarkCache().entrySet().iterator() + val buffer = new ArrayBuffer[Mark]() + while (markAndInstance.hasNext) { + val next = markAndInstance.next() + if (next.getValue != null && next.getValue.indexOf(serviceInstance) >= 0) { + buffer.append(next.getKey) + } } + buffer.toArray + } override def releaseEngineConnExecutor( engineConnExecutor: EngineConnExecutor, @@ -251,8 +249,8 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging } } - protected def removeMark(mark: Mark): Unit = MARK_CACHE_LOCKER.synchronized { - if (null != mark && getMarkCache().containsKey(mark)) { + protected def removeMark(mark: Mark): Unit = { + if (null != mark) { getMarkCache().remove(mark) } } diff --git a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala index c19692ed65..622c8813b5 100644 --- a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala +++ b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala @@ -228,7 +228,7 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit } } - protected def getAllInstances(): Array[String] = MARK_CACHE_LOCKER.synchronized { + protected def getAllInstances(): Array[String] = { val instances = new ArrayBuffer[String] getMarkCache() .values()