Skip to content

Commit

Permalink
Performance optimization, remove the synchronization wait operation o…
Browse files Browse the repository at this point in the history
…f mark cache close apache#3717
  • Loading branch information
peacewong committed Oct 25, 2022
1 parent 885c9d3 commit faf9158
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 51 deletions.
Expand Up @@ -63,28 +63,7 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin

override def applyMark(markReq: MarkReq): Mark = {
if (null == markReq) return null
val mark = MARK_CACHE_LOCKER.synchronized {
val markCache = getMarkCache().asScala.keys
val maybeMark = markCache.find(_.getMarkReq.equals(markReq))
maybeMark.orNull
}
if (null == mark) {
if (markReq.getLabels.containsKey(LabelKeyConstant.BIND_ENGINE_KEY)) {
val bindEngineLabel = MarkReq.getLabelBuilderFactory.createLabel[BindEngineLabel](
LabelKeyConstant.BIND_ENGINE_KEY,
markReq.getLabels.get(LabelKeyConstant.BIND_ENGINE_KEY)
)
if (!bindEngineLabel.getIsJobGroupHead) {
val msg =
s"Cannot find mark related to bindEngineLabel : ${bindEngineLabel.getStringValue}"
logger.error(msg)
throw new ECMPluginErrorException(ECMPluginConf.ECM_MARK_CACHE_ERROR_CODE, msg)
}
}
createMark(markReq)
} else {
mark
}
createMark(markReq)
}

override def createMark(markReq: MarkReq): Mark = {
Expand Down
Expand Up @@ -26,9 +26,12 @@ import org.apache.linkis.orchestrator.ecm.entity.{Mark, MarkReq, Policy}
import org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException
import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor

import org.apache.commons.collections.CollectionUtils

import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

/**
*/
Expand Down Expand Up @@ -97,9 +100,8 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging

private val engineConnExecutorCache = new util.HashMap[ServiceInstance, EngineConnExecutor]()

private val markCache = new util.HashMap[Mark, util.List[ServiceInstance]]()

protected val MARK_CACHE_LOCKER = new Object()
private val markCache: util.Map[Mark, util.List[ServiceInstance]] =
new util.concurrent.ConcurrentHashMap[Mark, util.List[ServiceInstance]]()

override def setEngineConnApplyAttempts(attemptNumber: Int): Unit = this.attemptNumber =
attemptNumber
Expand Down Expand Up @@ -148,7 +150,7 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
val executors = Utils.tryAndWarn {
instances.asScala.map(getEngineConnExecutorCache().get(_)).filter(null != _).sortBy {
executor =>
if (null == executor.getRunningTaskCount) {
if (executor.getRunningTaskCount < 0) {
0
} else {
executor.getRunningTaskCount
Expand All @@ -162,7 +164,6 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
return Some(executors.headOption.get)
}
for (executor <- executors) {
// todo check
if (executor.useEngineConn) {
logger.info(
s"mark ${mark.getMarkId()} Finished to getAvailableEngineConnExecutor by reuse"
Expand All @@ -189,31 +190,28 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
}

protected def addMark(mark: Mark, instances: util.List[ServiceInstance]): Unit =
MARK_CACHE_LOCKER.synchronized {
if (null != mark && !getMarkCache().containsKey(mark)) {
logger.info(s"add mark ${mark.getMarkId()}")
getMarkCache().put(mark, instances)
}
if (null != mark && !getMarkCache().containsKey(mark)) {
logger.info(s"add mark ${mark.getMarkId()}")
getMarkCache().put(mark, instances)
}

protected def getInstances(mark: Mark): util.List[ServiceInstance] =
MARK_CACHE_LOCKER.synchronized {
if (null != mark && getMarkCache().containsKey(mark)) {
getMarkCache().get(mark)
} else {
null
}
}
protected def getInstances(mark: Mark): util.List[ServiceInstance] = if (null != mark) {
getMarkCache().get(mark)
} else {
null
}

protected def getMarksByInstance(serviceInstance: ServiceInstance): Array[Mark] =
MARK_CACHE_LOCKER.synchronized {
getMarkCache().asScala
.filter { keyValue =>
keyValue._2.asScala.exists(serviceInstance.equals(_))
}
.keys
.toArray
protected def getMarksByInstance(serviceInstance: ServiceInstance): Array[Mark] = {
val markAndInstance = getMarkCache().entrySet().iterator()
val buffer = new ArrayBuffer[Mark]()
while (markAndInstance.hasNext) {
val next = markAndInstance.next()
if (next.getValue != null && next.getValue.indexOf(serviceInstance) >= 0) {
buffer.append(next.getKey)
}
}
buffer.toArray
}

override def releaseEngineConnExecutor(
engineConnExecutor: EngineConnExecutor,
Expand Down Expand Up @@ -251,8 +249,8 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
}
}

protected def removeMark(mark: Mark): Unit = MARK_CACHE_LOCKER.synchronized {
if (null != mark && getMarkCache().containsKey(mark)) {
protected def removeMark(mark: Mark): Unit = {
if (null != mark) {
getMarkCache().remove(mark)
}
}
Expand Down
Expand Up @@ -228,7 +228,7 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
}
}

protected def getAllInstances(): Array[String] = MARK_CACHE_LOCKER.synchronized {
protected def getAllInstances(): Array[String] = {
val instances = new ArrayBuffer[String]
getMarkCache()
.values()
Expand Down

0 comments on commit faf9158

Please sign in to comment.