From d38bc0a947a93444da4a2135ea4dff27ccf93aaf Mon Sep 17 00:00:00 2001 From: peacewong Date: Thu, 27 Oct 2022 20:45:24 +0800 Subject: [PATCH] RM resource operation optimization lock bit segment lock close #3719 --- .../manager/rm/domain/RMLabelContainer.java | 18 +- .../exception/RMLockFailedRetryException.java | 27 + .../monitor/NodeHeartbeatMonitor.scala | 44 +- .../manager/rm/entity/LabelResourceMap.scala | 52 ++ .../rm/entity/ResourceOperationType.scala | 26 + .../rm/service/ResourceLockService.scala | 72 +- .../service/impl/DefaultResourceManager.scala | 793 +++++++++--------- .../manager/common/utils/ResourceUtils.scala | 88 +- .../linkis/manager/dao/LockManagerMapper.java | 13 +- .../dao/impl/ECResourceRecordMapper.xml | 2 +- .../manager/dao/impl/LockManagerMapper.xml | 89 +- .../persistence/LockManagerPersistence.java | 3 + .../impl/DefaultLockManagerPersistence.java | 87 +- .../manager/util/PersistenceManagerConf.java | 31 + .../manager/dao/LockManagerMapperTest.java | 6 +- 15 files changed, 767 insertions(+), 584 deletions(-) create mode 100644 linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMLockFailedRetryException.java create mode 100644 linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/entity/LabelResourceMap.scala create mode 100644 linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/entity/ResourceOperationType.scala create mode 100644 linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/util/PersistenceManagerConf.java diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java index e7dbf59e94..22f88b9286 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java @@ -30,6 +30,8 @@ import org.apache.linkis.manager.label.utils.LabelUtils; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -74,9 +76,19 @@ public List> getLabels() { public List> getResourceLabels() { if (null != labels) { - return labels.stream() - .filter(label -> label instanceof ResourceLabel) - .collect(Collectors.toList()); + List> resourceLabels = + labels.stream() + .filter(label -> label instanceof ResourceLabel) + .sorted( + new Comparator>() { + @Override + public int compare(Label label1, Label label12) { + return label1.getLabelKey().compareTo(label12.getLabelKey()); + } + }) + .collect(Collectors.toList()); + Collections.reverse(resourceLabels); + return resourceLabels; } return new ArrayList<>(); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMLockFailedRetryException.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMLockFailedRetryException.java new file mode 100644 index 0000000000..3776b79444 --- /dev/null +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMLockFailedRetryException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.manager.rm.exception; + +import org.apache.linkis.common.exception.LinkisRetryException; + +public class RMLockFailedRetryException extends LinkisRetryException { + + public RMLockFailedRetryException(int errCode, String desc) { + super(errCode, desc); + } +} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/monitor/NodeHeartbeatMonitor.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/monitor/NodeHeartbeatMonitor.scala index 08938ebcca..cdb5c033c4 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/monitor/NodeHeartbeatMonitor.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/monitor/NodeHeartbeatMonitor.scala @@ -46,6 +46,7 @@ import java.util import java.util.concurrent.ExecutorService import scala.collection.JavaConverters._ +import scala.collection.mutable @Component class NodeHeartbeatMonitor extends ManagerMonitor with Logging { @@ -140,14 +141,13 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging { */ private def dealECNodes(engineNodes: util.List[Node]): Unit = { val existingEngineInstances = Sender.getInstances(ecName) + val clearECSet = new mutable.HashSet[ServiceInstance]() engineNodes.asScala.foreach { engineNode => if (NodeStatus.isCompleted(engineNode.getNodeStatus)) { logger.info( s"${engineNode.getServiceInstance} is completed ${engineNode.getNodeStatus}, will be remove" ) - Utils.tryAndWarnMsg(clearEngineNode(engineNode.getServiceInstance))( - "clear engine node failed" - ) + clearECSet.add(engineNode.getServiceInstance) } else { val engineIsStarted = (System.currentTimeMillis() - engineNode.getStartTime.getTime) > maxCreateInterval @@ -162,18 +162,23 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging { logger.warn( s"Failed to find instance ${engineNode.getServiceInstance} from eureka prepare to kill, engineIsStarted" ) - Utils.tryAndWarnMsg(clearEngineNode(engineNode.getServiceInstance))( - "engineIsStarted clear failed" - ) + clearECSet.add(engineNode.getServiceInstance) } } else if (updateOverdue) { logger.warn(s" ${engineNode.getServiceInstance} heartbeat updateOverdue") - Utils.tryAndWarnMsg(clearEngineNode(engineNode.getServiceInstance))( - "updateOverdue clear failed" - ) + clearECSet.add(engineNode.getServiceInstance) } } } + clearECSet.foreach(clearEngineNode) + } + + private def updateMetrics(node: Node): Unit = { + val metric = nodeMetricManagerPersistence.getNodeMetrics(node) + if (null != metric) { + node.setNodeStatus(NodeStatus.values()(metric.getStatus)) + node.setUpdateTime(metric.getUpdateTime) + } } private def dealECMNotExistsInRegistry(ecmNodes: util.List[Node]): Unit = { @@ -186,12 +191,21 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging { } val updateOverdue = (System.currentTimeMillis() - updateTime) > ecmHeartBeatTime if (!existingECMInstances.contains(ecm.getServiceInstance) && updateOverdue) { - logger.warn( - s"Failed to find ecm instance ${ecm.getServiceInstance} from eureka Registry to kill" - ) - Utils.tryAndWarnMsg(triggerEMSuicide(ecm.getServiceInstance))( - s"ecm ${ecm.getServiceInstance} clear failed" - ) + Utils.tryAndWarn(updateMetrics(ecm)) + val isUpdateOverdue = if (null == ecm.getUpdateTime) { + (System.currentTimeMillis() - ecm.getStartTime.getTime) > ecmHeartBeatTime + } else { + (System.currentTimeMillis() - ecm.getUpdateTime.getTime) > ecmHeartBeatTime + } + val isExistingECMInstances = Sender.getInstances(ecmName).contains(ecm.getServiceInstance) + if (!isExistingECMInstances && isUpdateOverdue) { + logger.warn( + s"Failed to find ecm instance ${ecm.getServiceInstance} from eureka Registry to kill" + ) + Utils.tryAndWarnMsg(triggerEMSuicide(ecm.getServiceInstance))( + s"ecm ${ecm.getServiceInstance} clear failed" + ) + } } } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/entity/LabelResourceMap.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/entity/LabelResourceMap.scala new file mode 100644 index 0000000000..30c49a2b1a --- /dev/null +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/entity/LabelResourceMap.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.manager.rm.entity + +import org.apache.linkis.manager.common.entity.resource.Resource +import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.rm.entity.ResourceOperationType.ResourceOperationType + +class LabelResourceMapping( + label: Label[_], + resource: Resource, + resourceOperationType: ResourceOperationType +) { + + override def equals(obj: Any): Boolean = { + obj match { + case labelResourceMap: LabelResourceMapping => + labelResourceMap.getLabel().getStringValue.equals(label.getStringValue) + case _ => false + } + } + + override def hashCode(): Int = { + super.hashCode() + } + + def getLabel(): Label[_] = label + + def getResource(): Resource = resource + + def getResourceOperationType: ResourceOperationType = resourceOperationType + + override def toString: String = { + s"Label ${label.getStringValue} mapping resource ${resource}" + } + +} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/entity/ResourceOperationType.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/entity/ResourceOperationType.scala new file mode 100644 index 0000000000..699816ca35 --- /dev/null +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/entity/ResourceOperationType.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.manager.rm.entity + +object ResourceOperationType extends Enumeration { + + type ResourceOperationType = Value + + val LOCK, USED, USED_RELEASE, LOCKER_RELEASE = Value + +} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceLockService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceLockService.scala index 32550ef3b5..18d328fcbd 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceLockService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceLockService.scala @@ -17,11 +17,9 @@ package org.apache.linkis.manager.rm.service -import org.apache.linkis.common.utils.Logging +import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.manager.common.entity.persistence.PersistenceLock -import org.apache.linkis.manager.label.entity.ResourceLabel import org.apache.linkis.manager.persistence.LockManagerPersistence -import org.apache.linkis.manager.rm.domain.RMLabelContainer import org.apache.commons.lang3.StringUtils @@ -40,72 +38,42 @@ class ResourceLockService extends Logging { @Autowired var lockManagerPersistence: LockManagerPersistence = _ - def tryLock(labelContainer: RMLabelContainer): Boolean = tryLock(labelContainer, Long.MaxValue) - - def tryLock(labelContainer: RMLabelContainer, timeout: Long): Boolean = { - if ( - StringUtils.isBlank(labelContainer.getCurrentLabel.getStringValue) - || !labelContainer.getCurrentLabel.isInstanceOf[ResourceLabel] - || labelContainer.getLockedLabels.contains(labelContainer.getCurrentLabel) - ) { + def tryLock(persistenceLock: PersistenceLock, timeout: Long): Boolean = { + if (StringUtils.isBlank(persistenceLock.getLockObject)) { return true } - val lockedBy = - if (labelContainer.getUserCreatorLabel == null) DEFAULT_LOCKED_BY - else labelContainer.getUserCreatorLabel.getUser - val persistenceLock = new PersistenceLock - persistenceLock.setLockObject(labelContainer.getCurrentLabel.getStringValue) - persistenceLock.setCreateTime(new Date) - persistenceLock.setCreator(lockedBy) - persistenceLock.setUpdateTime(new Date) - persistenceLock.setUpdator(lockedBy) - try { + Utils.tryCatch { val isLocked: Boolean = if (timeout > 0) { lockManagerPersistence.lock(persistenceLock, timeout) } else { lockManagerPersistence.lock(persistenceLock, Long.MaxValue) } if (isLocked) { - logger.info( - labelContainer.getCurrentLabel + " successfully locked label" + persistenceLock.getLockObject - ) - labelContainer.getLockedLabels.add(labelContainer.getCurrentLabel) + logger.info("successfully locked label" + persistenceLock.getLockObject) } isLocked - } catch { - case t: Throwable => - logger.error(s"failed to lock label [${persistenceLock.getLockObject}]", t) - false + } { case t: Throwable => + logger.error(s"failed to lock label [${persistenceLock.getLockObject}]", t) + false } } - def unLock(labelContainer: RMLabelContainer): Unit = { - val labelIterator = labelContainer.getLockedLabels.iterator - while (labelIterator.hasNext) { - val label = labelIterator.next - if (!StringUtils.isBlank(label.getStringValue)) { - val persistenceLock = new PersistenceLock - persistenceLock.setLockObject(label.getStringValue) - try { - lockManagerPersistence.unlock(persistenceLock) - logger.info("unlocked " + persistenceLock.getLockObject) - } catch { - case t: Throwable => - logger.error(s"failed to unlock label [${persistenceLock.getLockObject}]", t) - throw t - } - labelIterator.remove - } + def unLock(persistenceLock: PersistenceLock): Unit = { + Utils.tryCatch { + lockManagerPersistence.unlock(persistenceLock) + logger.info("unlocked " + persistenceLock.getLockObject) + } { case t: Throwable => + logger.error(s"failed to unlock label [${persistenceLock.getLockObject}]", t) } } def clearTimeoutLock(timeout: Long): Unit = { - val currentTime = System.currentTimeMillis - lockManagerPersistence.getAll.asScala.foreach { lock => - if (currentTime - lock.getCreateTime.getTime > timeout) { - lockManagerPersistence.unlock(lock) - logger.warn("timeout force unlock " + lock.getLockObject) - } + val endDate = new Date(System.currentTimeMillis() - timeout) + val locks = lockManagerPersistence.getTimeOutLocks(endDate) + if (null == locks) return + locks.asScala.foreach { lock => + Utils.tryAndWarn(lockManagerPersistence.unlock(lock)) + logger.warn("timeout force unlock " + lock.getLockObject) } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala index 25cc082cd7..8358c21ec3 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala @@ -18,15 +18,19 @@ package org.apache.linkis.manager.rm.service.impl import org.apache.linkis.common.ServiceInstance -import org.apache.linkis.common.exception.LinkisRetryException import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.governance.common.conf.GovernanceCommonConf +import org.apache.linkis.manager.am.service.engine.EngineStopService import org.apache.linkis.manager.common.conf.RMConfiguration import org.apache.linkis.manager.common.entity.enumeration.NodeStatus import org.apache.linkis.manager.common.entity.node.{AMEMNode, AMEngineNode, InfoRMNode} -import org.apache.linkis.manager.common.entity.persistence.{PersistenceLabel, PersistenceResource} +import org.apache.linkis.manager.common.entity.persistence.{ + PersistenceLabel, + PersistenceLock, + PersistenceResource +} import org.apache.linkis.manager.common.entity.resource._ -import org.apache.linkis.manager.common.errorcode.ManagerCommonErrorCodeSummary._ +import org.apache.linkis.manager.common.errorcode.ManagerCommonErrorCodeSummary import org.apache.linkis.manager.common.exception.{RMErrorException, RMWarnException} import org.apache.linkis.manager.common.utils.{ManagerUtils, ResourceUtils} import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext @@ -47,8 +51,9 @@ import org.apache.linkis.manager.rm.{ ResourceInfo, ResultResource } -import org.apache.linkis.manager.rm.domain.RMLabelContainer -import org.apache.linkis.manager.rm.exception.RMErrorCode +import org.apache.linkis.manager.rm.entity.{LabelResourceMapping, ResourceOperationType} +import org.apache.linkis.manager.rm.entity.ResourceOperationType.{LOCK, USED} +import org.apache.linkis.manager.rm.exception.{RMErrorCode, RMLockFailedRetryException} import org.apache.linkis.manager.rm.external.service.ExternalResourceService import org.apache.linkis.manager.rm.service.{ LabelResourceService, @@ -57,24 +62,26 @@ import org.apache.linkis.manager.rm.service.{ ResourceManager } import org.apache.linkis.manager.rm.utils.RMUtils -import org.apache.linkis.server.BDPJettyServerHelper import org.springframework.beans.factory.InitializingBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Component -import java.text.MessageFormat import java.util -import java.util.UUID +import java.util.{Date, UUID} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import com.google.common.collect.Lists @Component class DefaultResourceManager extends ResourceManager with Logging with InitializingBean { + private val labelFactory = LabelBuilderFactoryContext.getLabelBuilderFactory + @Autowired private var resourceManagerPersistence: ResourceManagerPersistence = _ @@ -102,11 +109,10 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ @Autowired private var nodeLabelService: NodeLabelService = _ - private var requestResourceServices: Array[RequestResourceService] = _ - - private val gson = BDPJettyServerHelper.gson + @Autowired + private var engineStopService: EngineStopService = _ - private val labelFactory = LabelBuilderFactoryContext.getLabelBuilderFactory + private var requestResourceServices: Array[RequestResourceService] = _ override def afterPropertiesSet(): Unit = { requestResourceServices = Array( @@ -118,7 +124,10 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ new Runnable { override def run(): Unit = { logger.info("Start force release timeout locks") - resourceLockService.clearTimeoutLock(RMConfiguration.LOCK_RELEASE_TIMEOUT.getValue.toLong) + Utils.tryAndWarn( + resourceLockService + .clearTimeoutLock(RMConfiguration.LOCK_RELEASE_TIMEOUT.getValue.toLong) + ) } }, RMConfiguration.LOCK_RELEASE_CHECK_INTERVAL.getValue.toLong, @@ -137,55 +146,37 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EMInstanceLabel]) eMInstanceLabel.setServiceName(serviceInstance.getApplicationName) eMInstanceLabel.setInstance(serviceInstance.getInstance) - val labelContainer = new RMLabelContainer(Lists.newArrayList(eMInstanceLabel)) - Utils.tryFinally { - // lock labels - labelContainer.getResourceLabels.asScala.foreach { - case label: Label[_] => - labelContainer.setCurrentLabel(label.asInstanceOf[Label[_]]) - resourceLockService.tryLock(labelContainer) - case _ => - } - val emResource = labelResourceService.getLabelResource(eMInstanceLabel) - if (emResource != null) { - logger.warn(s"${serviceInstance} has been registered, now update resource.") - if (!emResource.getResourceType.equals(resource.getResourceType)) { - throw new RMErrorException( - RMErrorCode.LABEL_DUPLICATED.getCode, - s"${serviceInstance} has been registered in ${emResource.getResourceType}, cannot be updated to ${resource.getResourceType}" - ) - } - } - // TODO get ID Label set label resource - Utils.tryCatch { - labelResourceService.setLabelResource( - eMInstanceLabel, - resource, - eMInstanceLabel.getStringValue + val emResource = labelResourceService.getLabelResource(eMInstanceLabel) + if (emResource != null) { + logger.warn(s"${serviceInstance} has been registered, now update resource.") + if (!emResource.getResourceType.equals(resource.getResourceType)) { + throw new RMErrorException( + RMErrorCode.LABEL_DUPLICATED.getCode, + s"${serviceInstance} has been registered in ${emResource.getResourceType}, cannot be updated to ${resource.getResourceType}" ) - resourceLogService.success( + } + } + val lock = tryLockOneLabel(eMInstanceLabel, -1, Utils.getJvmUser) + try { + labelResourceService.setLabelResource( + eMInstanceLabel, + resource, + eMInstanceLabel.getStringValue + ) + } catch { + case exception: Exception => + resourceLogService.failed( ChangeType.ECM_INIT, resource.getMaxResource, null, - eMInstanceLabel + eMInstanceLabel, + exception ) - } { - case exception: Exception => - resourceLogService.failed( - ChangeType.ECM_INIT, - resource.getMaxResource, - null, - eMInstanceLabel, - exception - ) - throw exception - case _ => - } - } { - // 5.Release lock(释放锁) - resourceLockService.unLock(labelContainer) - logger.info(s"finished processing registration of ${serviceInstance}") + throw exception + case _ => + } finally { + resourceLockService.unLock(lock) } } @@ -194,48 +185,36 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ * instance is offline. 该注册方法,主要是用于通知所有的RM节点(包括本节点),下线该实例 */ override def unregister(serviceInstance: ServiceInstance): Unit = { - // TODO get ID Label + val eMInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EMInstanceLabel]) eMInstanceLabel.setServiceName(serviceInstance.getApplicationName) eMInstanceLabel.setInstance(serviceInstance.getInstance) - val labelContainer = new RMLabelContainer(Lists.newArrayList(eMInstanceLabel)) - - Utils.tryFinally { - Utils.tryCatch { - nodeManagerPersistence.getEngineNodeByEM(serviceInstance).asScala.foreach { node => - val engineInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory - .createLabel(classOf[EngineInstanceLabel]) - engineInstanceLabel.setInstance(node.getServiceInstance.getInstance) - engineInstanceLabel.setServiceName(node.getServiceInstance.getApplicationName) - labelResourceService.removeResourceByLabel(engineInstanceLabel) - } - labelResourceService.removeResourceByLabel(eMInstanceLabel) - labelContainer.setCurrentLabel(eMInstanceLabel) - resourceLockService.unLock(labelContainer) - resourceLogService.success( + val ecNodes = nodeManagerPersistence.getEngineNodeByEM(serviceInstance).asScala + val lock = tryLockOneLabel(eMInstanceLabel, -1, Utils.getJvmUser) + try { + labelResourceService.removeResourceByLabel(eMInstanceLabel) + } catch { + case exception: Exception => + resourceLogService.failed( ChangeType.ECM_CLEAR, Resource.initResource(ResourceType.LoadInstance), null, - eMInstanceLabel + eMInstanceLabel, + exception ) - } { - case exception: Exception => - resourceLogService.failed( - ChangeType.ECM_CLEAR, - Resource.initResource(ResourceType.LoadInstance), - null, - eMInstanceLabel, - exception - ) - case _ => + case _ => + } finally { + resourceLockService.unLock(lock) + logger.info(s"Finished to clear ecm resource:${serviceInstance}") + } + ecNodes.foreach { engineNode => + Utils.tryAndWarn { + engineNode.setLabels(nodeLabelService.getNodeLabels(engineNode.getServiceInstance)) + engineStopService.engineConnInfoClear(engineNode) } - - } { - logger.info( - s"ECMResourceClear:${serviceInstance}, usedResource:${Resource.initResource(ResourceType.Default).toJson}" - ) } + logger.info(s"Finished to clear ec for ecm ${serviceInstance}") } /** @@ -267,138 +246,121 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ wait: Long ): ResultResource = { val labelContainer = labelResourceService.enrichLabels(labels) - logger.debug( - "start processing request resource for labels [" + labelContainer + "] and resource [" + resource + "]" - ) + // check resource with lock + val requestResourceService = getRequestResourceService(resource.getResourceType) + resource.setLockedResource(resource.getMinResource) + val resourceLabels = labelContainer.getResourceLabels.asScala + val persistenceLocks = new ArrayBuffer[PersistenceLock]() + val emInstanceLabel = labelContainer.getEMInstanceLabel + val userCreatorEngineTypeLabel = labelContainer.getCombinedUserCreatorEngineTypeLabel Utils.tryFinally { - // lock labels - if (wait <= 0) { - tryLock(labelContainer) - } else { - tryLock(labelContainer, wait) - } - - // check resource with lock - val requestResourceService = getRequestResourceService(resource.getResourceType) - resource.setLockedResource(resource.getMinResource) - - val labelResourceList = new util.HashMap[String, NodeResource]() - labelContainer.getResourceLabels.asScala.foreach(label => { - // check all resource of label - Utils.tryCatch { - labelContainer.setCurrentLabel(label) - if (!requestResourceService.canRequest(labelContainer, resource)) { - return NotEnoughResource(s"Labels:${label.getStringValue} not enough resource") - } - } { case exception: RMWarnException => - return NotEnoughResource(exception.getMessage) + // check ecm resource if not enough return + Utils.tryCatch { + labelContainer.setCurrentLabel(emInstanceLabel) + if (!requestResourceService.canRequest(labelContainer, resource)) { + return NotEnoughResource(s"Labels:${emInstanceLabel.getStringValue} not enough resource") } - val usedResource = labelResourceService.getLabelResource(label) - if (usedResource == null) { - val msg = - s"Resource label: ${label.getStringValue} has no usedResource, please check, refuse request usedResource" - logger.info(msg) - throw new RMErrorException( - REFUSE_REQUEST.getErrorCode, - MessageFormat.format(REFUSE_REQUEST.getErrorDesc, label.getStringValue) + } { + case exception: RMWarnException => return NotEnoughResource(exception.getMessage) + case exception: Exception => + throw exception + } + // lock userCreatorEngineTypeLabel + persistenceLocks.append( + tryLockOneLabel( + userCreatorEngineTypeLabel, + wait, + labelContainer.getUserCreatorLabel.getUser + ) + ) + Utils.tryCatch { + labelContainer.setCurrentLabel(userCreatorEngineTypeLabel) + if (!requestResourceService.canRequest(labelContainer, resource)) { + return NotEnoughResource( + s"Labels:${userCreatorEngineTypeLabel.getStringValue} not enough resource" ) } - labelResourceList.put(label.getStringValue, usedResource) - }) - val tickedId = UUID.randomUUID().toString - labelContainer.getResourceLabels.asScala.foreach { - case label: Label[_] => - val labelResource = labelResourceList.get(label.getStringValue) - if (labelResource != null) { - labelResource.setLeftResource( - labelResource.getLeftResource - resource.getLockedResource - ) - labelResource.setLockedResource( - labelResource.getLockedResource + resource.getLockedResource - ) - labelResourceService.setLabelResource( - label, - labelResource, - labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue - ) - logger.info(s"ResourceChanged:${label.getStringValue} --> ${labelResource}") - resourceCheck(label, labelResource) - if ( - label.getClass.isAssignableFrom( - labelContainer.getCombinedUserCreatorEngineTypeLabel.getClass - ) - ) { - resourceLogService.recordUserResourceAction( - labelContainer, - tickedId, - ChangeType.ENGINE_REQUEST, - resource.getLockedResource - ) - } - } - case _ => + } { + case exception: RMWarnException => return NotEnoughResource(exception.getMessage) + case exception: Exception => + throw exception } - // record engine locked resource - val emNode = new AMEMNode - emNode.setServiceInstance(labelContainer.getEMInstanceLabel.getServiceInstance) - val engineNode = new AMEngineNode - engineNode.setEMNode(emNode) - engineNode.setServiceInstance(ServiceInstance(labelContainer.getEngineServiceName, tickedId)) - engineNode.setNodeResource(resource) - nodeManagerPersistence.addEngineNode(engineNode) - - val engineInstanceLabel = - LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EngineInstanceLabel]) - engineInstanceLabel.setServiceName(labelContainer.getEngineServiceName) - engineInstanceLabel.setInstance(tickedId) - - nodeLabelService.addLabelToNode(engineNode.getServiceInstance, engineInstanceLabel) - - labelResourceService.setEngineConnLabelResource( - engineInstanceLabel, - resource, - labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue + // lock ecmLabel + persistenceLocks.append( + tryLockOneLabel(emInstanceLabel, wait, labelContainer.getUserCreatorLabel.getUser) ) - - val persistenceLabel = - labelFactory.convertLabel(engineInstanceLabel, classOf[PersistenceLabel]) - val persistenceEngineLabel = labelManagerPersistence.getLabelByKeyValue( - persistenceLabel.getLabelKey, - persistenceLabel.getStringValue - ) - - // fire timeout check scheduled job - if (RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue > 0) { - Utils.defaultScheduler.schedule( - new UnlockTimeoutResourceRunnable(labels, persistenceEngineLabel, tickedId), - RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue, - TimeUnit.MILLISECONDS - ) + resourceLabels.foreach { label => + labelContainer.setCurrentLabel(label) + val labelResource = labelResourceService.getLabelResource(label) + if (labelResource != null) { + labelResource.setLeftResource(labelResource.getLeftResource - resource.getLockedResource) + labelResource.setLockedResource( + labelResource.getLockedResource + resource.getLockedResource + ) + labelResourceService.setLabelResource( + label, + labelResource, + labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue + ) + logger.info(s"ResourceChanged:${label.getStringValue} --> ${labelResource}") + resourceCheck(label, labelResource) + } } - AvailableResource(tickedId) } { - // 5.Release lock(释放锁) - resourceLockService.unLock(labelContainer) - logger.debug( - s"finished processing requestResource of labels ${labels} and resource ${resource}" + persistenceLocks.foreach(resourceLockService.unLock) + } + // record engine locked resource + val tickedId = UUID.randomUUID().toString + resourceLogService.recordUserResourceAction( + labelContainer, + tickedId, + ChangeType.ENGINE_REQUEST, + resource.getLockedResource + ) + val emNode = new AMEMNode + emNode.setServiceInstance(labelContainer.getEMInstanceLabel.getServiceInstance) + val engineNode = new AMEngineNode + engineNode.setEMNode(emNode) + engineNode.setServiceInstance(ServiceInstance(labelContainer.getEngineServiceName, tickedId)) + engineNode.setNodeResource(resource) + nodeManagerPersistence.addEngineNode(engineNode) + + val engineInstanceLabel = + LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EngineInstanceLabel]) + engineInstanceLabel.setServiceName(labelContainer.getEngineServiceName) + engineInstanceLabel.setInstance(tickedId) + + nodeLabelService.addLabelToNode(engineNode.getServiceInstance, engineInstanceLabel) + + labelResourceService.setEngineConnLabelResource( + engineInstanceLabel, + resource, + labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue + ) + + val persistenceLabel = labelFactory.convertLabel(engineInstanceLabel, classOf[PersistenceLabel]) + val persistenceEngineLabel = labelManagerPersistence.getLabelByKeyValue( + persistenceLabel.getLabelKey, + persistenceLabel.getStringValue + ) + + // fire timeout check scheduled job + if (RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue > 0) { + Utils.defaultScheduler.schedule( + new UnlockTimeoutResourceRunnable(labels, persistenceEngineLabel, tickedId), + RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue, + TimeUnit.MILLISECONDS ) } + AvailableResource(tickedId) } - def resourceCheck(label: Label[_], labelResource: NodeResource): Unit = { - if (labelResource != null && label != null) { - val resourceInit = Resource.initResource(labelResource.getResourceType) - if ( - labelResource.getLockedResource < resourceInit || - labelResource.getUsedResource < resourceInit || - labelResource.getLeftResource < resourceInit - ) { - logger.error( - s"found error resource! resource label:${label.getStringValue}, resource:${labelResource}, please check!" - ) - } - } + def getRequestResourceService(resourceType: ResourceType): RequestResourceService = { + val requestResourceService = requestResourceServices.find(_.resourceType == resourceType) + requestResourceService.getOrElse( + requestResourceServices.find(_.resourceType == ResourceType.Default).get + ) } /** @@ -453,49 +415,39 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ logger.info( s"resourceUsed ready:${labelContainer.getEMInstanceLabel.getServiceInstance}, used resource ${lockedResource.getLockedResource}" ) - Utils.tryFinally { - // lock labels - tryLock(labelContainer) - // check again after lock resource - lockedResource = labelResourceService.getLabelResource(labelContainer.getEngineInstanceLabel) - if ( - lockedResource == null || lockedResource.getLockedResource <= Resource.initResource( - lockedResource.getResourceType - ) - ) { - throw new RMErrorException( - RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode, - s"No locked resource found by engine ${labelContainer.getEngineInstanceLabel}, current label resource ${lockedResource}" - ) - } - val addedResource = - Resource.initResource(lockedResource.getResourceType) + lockedResource.getLockedResource - labelContainer.getResourceLabels.asScala.foreach { - case engineInstanceLabel: EngineInstanceLabel => - Utils.tryCatch { - lockedResource.setUsedResource(lockedResource.getLockedResource) - updateYarnApplicationID(usedResource, lockedResource) - lockedResource.setLockedResource( - Resource.getZeroResource(lockedResource.getLockedResource) - ) - labelResourceService.setLabelResource( - engineInstanceLabel, - lockedResource, - labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue - ) - resourceLogService.success( - ChangeType.ENGINE_INIT, - lockedResource.getLockedResource, - engineInstanceLabel - ) - } { case exception: Exception => - logger.error( - s"${engineInstanceLabel.getStringValue} used resource failed!, resource: ${lockedResource}", - exception - ) - } - case label: Label[_] => - Utils.tryCatch { + val addedResource = + Resource.initResource(lockedResource.getResourceType) + lockedResource.getLockedResource + + val engineInstanceLabel: EngineInstanceLabel = labelContainer.getEngineInstanceLabel + Utils.tryCatch { + lockedResource.setUsedResource(lockedResource.getLockedResource) + updateYarnApplicationID(usedResource, lockedResource) + lockedResource.setLockedResource(Resource.getZeroResource(lockedResource.getLockedResource)) + labelResourceService.setLabelResource( + engineInstanceLabel, + lockedResource, + labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue + ) + resourceLogService.success( + ChangeType.ENGINE_INIT, + lockedResource.getLockedResource, + engineInstanceLabel + ) + } { case exception: Exception => + logger.error( + s"${engineInstanceLabel.getStringValue} used resource failed!, resource: ${lockedResource}", + exception + ) + } + val labelResourceSet = new mutable.HashSet[LabelResourceMapping]() + Utils.tryCatch { + labelContainer.getResourceLabels.asScala + .filter(!_.isInstanceOf[EngineInstanceLabel]) + .foreach { label => + val persistenceLock = + tryLockOneLabel(label, -1, labelContainer.getUserCreatorLabel.getUser) + Utils.tryFinally { + labelContainer.setCurrentLabel(label) val labelResource = labelResourceService.getLabelResource(label) if (labelResource != null) { labelResource.setLockedResource(labelResource.getLockedResource - addedResource) @@ -508,41 +460,49 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ labelResource, labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue ) - label match { - case emLabel: EMInstanceLabel => - resourceLogService.success( - ChangeType.ECM_RESOURCE_ADD, - lockedResource.getUsedResource, - null, - emLabel - ) - case _ => - } - if ( - label.getClass.isAssignableFrom( - labelContainer.getCombinedUserCreatorEngineTypeLabel.getClass - ) - ) { - resourceLogService.recordUserResourceAction( - labelContainer, - persistenceResource.getTicketId, - ChangeType.ENGINE_INIT, - addedResource, - NodeStatus.Running - ) - } + labelResourceSet.add( + new LabelResourceMapping(label, addedResource, ResourceOperationType.USED) + ) resourceCheck(label, labelResource) } - } { case exception: Exception => - logger.error( - s"${label.getStringValue} used resource failed!, resource: ${lockedResource}", - exception + } { + resourceLockService.unLock(persistenceLock) + } + if ( + label.getClass.isAssignableFrom( + labelContainer.getCombinedUserCreatorEngineTypeLabel.getClass + ) + ) { + resourceLogService.recordUserResourceAction( + labelContainer, + persistenceResource.getTicketId, + ChangeType.ENGINE_INIT, + addedResource, + NodeStatus.Running ) } - case _ => + } + } { case exception: Exception => + resourceRollback(labelResourceSet, labelContainer.getUserCreatorLabel.getUser) + logger.error( + s"${labelContainer.getEngineInstanceLabel.getStringValue} used resource failed!, resource: ${lockedResource}", + exception + ) + } + } + + def resourceCheck(label: Label[_], labelResource: NodeResource): Unit = { + if (labelResource != null && label != null) { + val resourceInit = Resource.initResource(labelResource.getResourceType) + if ( + labelResource.getLockedResource < resourceInit || + labelResource.getUsedResource < resourceInit || + labelResource.getLeftResource < resourceInit + ) { + logger.info( + s"found error resource! resource label:${label.getStringValue}, resource:${labelResource}, please check!" + ) } - } { - resourceLockService.unLock(labelContainer) } } @@ -591,16 +551,60 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ } } - def timeCheck(labelResource: NodeResource, usedResource: NodeResource): Unit = { - if (labelResource.getCreateTime != null && usedResource.getCreateTime != null) { - if (labelResource.getCreateTime.getTime > usedResource.getCreateTime.getTime) { - throw new RMErrorException( - RESOURCE_LATER_CREATED.getErrorCode, - s"no need to clear this labelResource, labelResource:${labelResource} created time is after than usedResource:${usedResource}" + - s"无需清理该标签的资源,该标签资源的创建时间晚于已用资源的创建时间" + private def resourceRollback( + labelResourceSet: mutable.Set[LabelResourceMapping], + user: String + ): Unit = { + labelResourceSet.foreach { labelResourceMapping => + val persistenceLock = tryLockOneLabel(labelResourceMapping.getLabel(), -1, user) + Utils.tryCatch { + val resource = labelResourceService.getLabelResource(labelResourceMapping.getLabel()) + labelResourceMapping.getResourceOperationType match { + case LOCK => + resource.setLeftResource(resource.getLeftResource + labelResourceMapping.getResource()) + resource.setLockedResource( + resource.getLockedResource - labelResourceMapping.getResource() + ) + case USED => + resource.setLockedResource( + resource.getLeftResource + labelResourceMapping.getResource() + ) + resource.setUsedResource( + resource.getLockedResource - labelResourceMapping.getResource() + ) + case _ => + } + labelResourceService.setLabelResource( + labelResourceMapping.getLabel(), + resource, + labelResourceMapping.getResourceOperationType.toString ) + } { case e: Exception => + logger.error(s"Failed to roll back resource " + labelResourceSet.mkString("\n"), e) } + resourceLockService.unLock(persistenceLock) + } + } + + private def tryLockOneLabel( + label: Label[_], + timeOut: Long = -1, + user: String + ): PersistenceLock = { + val persistenceLock = new PersistenceLock + persistenceLock.setLockObject(label.getStringValue) + persistenceLock.setCreateTime(new Date) + persistenceLock.setCreator(user) + persistenceLock.setUpdateTime(new Date) + persistenceLock.setUpdator(user) + val locked = resourceLockService.tryLock(persistenceLock, timeOut) + if (!locked) { + throw new RMLockFailedRetryException( + RMErrorCode.LOCK_LABEL_FAILED.getCode, + s"${RMErrorCode.LOCK_LABEL_FAILED.getMessage} + ${label.getStringValue} over $timeOut ms, please wait a moment and try again!" + ) } + persistenceLock } /** @@ -610,47 +614,42 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ */ override def resourceReleased(labels: util.List[Label[_]]): Unit = { val labelContainer = labelResourceService.enrichLabels(labels) - val persistenceResource: PersistenceResource = - labelResourceService.getPersistenceResource(labelContainer.getEngineInstanceLabel) - val usedResource = ResourceUtils.fromPersistenceResource(persistenceResource) - if (usedResource == null) { + if (null == labelContainer.getEngineInstanceLabel) { throw new RMErrorException( RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode, - s"No used resource found by engine ${labelContainer.getEngineInstanceLabel}" + "engine instance label is null" ) } - logger.info( - s"resourceRelease ready:${labelContainer.getEngineInstanceLabel.getServiceInstance},current node resource${usedResource}" + val instanceLock = tryLockOneLabel( + labelContainer.getEngineInstanceLabel, + RMUtils.RM_RESOURCE_LOCK_WAIT_TIME.getValue, + labelContainer.getUserCreatorLabel.getUser ) Utils.tryFinally { - // lock labels - tryLock(labelContainer, RMUtils.RM_RESOURCE_LOCK_WAIT_TIME.getValue) - - // To avoid concurrent problem, check resource again after lock label - val usedResource = - labelResourceService.getLabelResource(labelContainer.getEngineInstanceLabel) + val persistenceResource: PersistenceResource = + labelResourceService.getPersistenceResource(labelContainer.getEngineInstanceLabel) + val usedResource = ResourceUtils.fromPersistenceResource(persistenceResource) if (usedResource == null) { throw new RMErrorException( RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode, s"No used resource found by engine ${labelContainer.getEngineInstanceLabel}" ) } - val node = new AMEngineNode() - node.setServiceInstance(labelContainer.getEngineInstanceLabel.getServiceInstance) - val metrics = nodeMetricManagerPersistence.getNodeMetrics(node) - val status = if (null != metrics) { - NodeStatus.values()(metrics.getStatus) - } else { - logger.warn( - "EC {} status unknown", - labelContainer.getEngineInstanceLabel.getServiceInstance - ) - NodeStatus.Failed - } - labelContainer.getResourceLabels.asScala.foreach { - case label: Label[_] => + logger.info( + s"resourceRelease ready:${labelContainer.getEngineInstanceLabel.getServiceInstance},current node resource${usedResource}" + ) + val status = getNodeStatus(labelContainer.getEngineInstanceLabel) + + labelContainer.getResourceLabels.asScala + .filter(!_.isInstanceOf[EngineInstanceLabel]) + .foreach { label => Utils.tryCatch { - if (!label.isInstanceOf[EngineInstanceLabel]) { + val persistenceLock = tryLockOneLabel( + label, + RMUtils.RM_RESOURCE_LOCK_WAIT_TIME.getValue, + labelContainer.getUserCreatorLabel.getUser + ) + Utils.tryFinally { val labelResource = labelResourceService.getLabelResource(label) if (labelResource != null) { if (label.isInstanceOf[EMInstanceLabel]) timeCheck(labelResource, usedResource) @@ -681,75 +680,94 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ labelResource, labelContainer.getCombinedUserCreatorEngineTypeLabel.getStringValue ) - if ( - label.getClass.isAssignableFrom( - labelContainer.getCombinedUserCreatorEngineTypeLabel.getClass - ) - ) { - if (usedResource.getUsedResource != null) { - resourceLogService.recordUserResourceAction( - labelContainer, - persistenceResource.getTicketId, - ChangeType.ENGINE_CLEAR, - usedResource.getUsedResource, - status - ) - } else if (usedResource.getLockedResource != null) { - resourceLogService.recordUserResourceAction( - labelContainer, - persistenceResource.getTicketId, - ChangeType.ENGINE_CLEAR, - usedResource.getLockedResource, - status - ) - } - } - label match { - case emLabel: EMInstanceLabel => - resourceLogService.success( - ChangeType.ECM_Resource_MINUS, - labelResource.getUsedResource, - null, - emLabel - ) - case _ => - } resourceCheck(label, labelResource) } + } { + resourceLockService.unLock(persistenceLock) + } + + val releasedResource = if (usedResource.getUsedResource != null) { + usedResource.getUsedResource + } else { + usedResource.getLockedResource + } + if ( + label.getClass.isAssignableFrom( + labelContainer.getCombinedUserCreatorEngineTypeLabel.getClass + ) + ) { + resourceLogService.recordUserResourceAction( + labelContainer, + persistenceResource.getTicketId, + ChangeType.ENGINE_CLEAR, + releasedResource, + status + ) } } { case exception: Exception => - logger.warn(s"Failed to release resource label ${label.getStringValue}", exception) + logger.error( + s"Failed to release resource label ${labelContainer.getEngineInstanceLabel.getStringValue}", + exception + ) } - case _ => - } - val tmpLabel = - labelContainer.getLabels.asScala.find(_.isInstanceOf[EngineInstanceLabel]).orNull - if (tmpLabel != null) { - val engineInstanceLabel = tmpLabel.asInstanceOf[EngineInstanceLabel] - Utils.tryCatch { - labelResourceService.removeResourceByLabel(engineInstanceLabel) - resourceLogService.success( + } + val engineInstanceLabel = labelContainer.getEngineInstanceLabel + Utils.tryCatch { + labelResourceService.removeResourceByLabel(engineInstanceLabel) + resourceLogService.success( + ChangeType.ENGINE_CLEAR, + usedResource.getUsedResource, + engineInstanceLabel, + null + ) + } { + case exception: Exception => + resourceLogService.failed( ChangeType.ENGINE_CLEAR, usedResource.getUsedResource, engineInstanceLabel, - null + null, + exception ) - } { - case exception: Exception => - resourceLogService.failed( - ChangeType.ENGINE_CLEAR, - usedResource.getUsedResource, - engineInstanceLabel, - null, - exception - ) - throw exception - case _ => - } + throw exception + case _ => } } { - resourceLockService.unLock(labelContainer) + logger.info( + s"Finished release instance ${labelContainer.getEngineInstanceLabel.getServiceInstance} resource" + ) + resourceLockService.unLock(instanceLock) + } + } + + def timeCheck(labelResource: NodeResource, usedResource: NodeResource): Unit = { + if (labelResource.getCreateTime != null && usedResource.getCreateTime != null) { + if (labelResource.getCreateTime.getTime > usedResource.getCreateTime.getTime) { + throw new RMErrorException( + ManagerCommonErrorCodeSummary.RESOURCE_LATER_CREATED.getErrorCode, + s"no need to clear this labelResource, labelResource:${labelResource} created time is after than usedResource:${usedResource}" + + s"无需清理该标签的资源,该标签资源的创建时间晚于已用资源的创建时间" + ) + } + } + } + + private def getNodeStatus(engineInstanceLabel: EngineInstanceLabel): NodeStatus = { + val node = new AMEngineNode() + node.setServiceInstance(engineInstanceLabel.getServiceInstance) + val metrics = nodeMetricManagerPersistence.getNodeMetrics(node) + val status = if (null != metrics) { + val timeStatus = NodeStatus.values()(metrics.getStatus) + if (!NodeStatus.isCompleted(timeStatus)) { + NodeStatus.Failed + } else { + timeStatus + } + } else { + logger.warn("EC {} status unknown", engineInstanceLabel.getServiceInstance) + NodeStatus.Failed } + status } /** @@ -762,7 +780,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ override def getResourceInfo(serviceInstances: Array[ServiceInstance]): ResourceInfo = { val resourceInfo = new ResourceInfo(Lists.newArrayList()) - serviceInstances.map { serviceInstance => + serviceInstances.foreach({ serviceInstance => val rmNode = new InfoRMNode var aggregatedResource: NodeResource = null serviceInstance.getApplicationName match { @@ -783,16 +801,11 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ rmNode.setServiceInstance(serviceInstance) rmNode.setNodeResource(aggregatedResource) resourceInfo.resourceInfo.add(rmNode) - } + }) resourceInfo } - def getRequestResourceService(resourceType: ResourceType): RequestResourceService = { - val requestResourceService = requestResourceServices.find(_.resourceType == resourceType) - requestResourceService.getOrElse( - requestResourceServices.find(_.resourceType == ResourceType.Default).get - ) - } + override def resourceReport(labels: util.List[Label[_]], reportResource: NodeResource): Unit = {} class UnlockTimeoutResourceRunnable( labels: util.List[Label[_]], @@ -842,24 +855,4 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ } - override def resourceReport(labels: util.List[Label[_]], reportResource: NodeResource): Unit = { - // TODO - - } - - private def tryLock(labelContainer: RMLabelContainer, timeOut: Long = -1): Unit = { - labelContainer.getResourceLabels.asScala.foreach { - case label: Label[_] => - labelContainer.setCurrentLabel(label) - val locked = resourceLockService.tryLock(labelContainer, timeOut) - if (!locked) { - throw new LinkisRetryException( - 110022, - s"try to lock resource label ${labelContainer.getCurrentLabel} over $timeOut ms, please wait a moment and try again!" - ) - } - case _ => - } - } - } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/utils/ResourceUtils.scala b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/utils/ResourceUtils.scala index dd09ec8cda..fcf5a7a18e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/utils/ResourceUtils.scala +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/utils/ResourceUtils.scala @@ -37,18 +37,27 @@ object ResourceUtils { def toPersistenceResource(nodeResource: NodeResource): PersistenceResource = { val persistenceResource = new PersistenceResource - if (nodeResource.getMaxResource != null) + if (nodeResource.getMaxResource != null) { persistenceResource.setMaxResource(serializeResource(nodeResource.getMaxResource)) - if (nodeResource.getMinResource != null) + } + if (nodeResource.getMinResource != null) { persistenceResource.setMinResource(serializeResource(nodeResource.getMinResource)) - if (nodeResource.getLockedResource != null) + } + if (nodeResource.getLockedResource != null) { persistenceResource.setLockedResource(serializeResource(nodeResource.getLockedResource)) - if (nodeResource.getExpectedResource != null) + } + if (nodeResource.getExpectedResource != null) { persistenceResource.setExpectedResource(serializeResource(nodeResource.getExpectedResource)) - if (nodeResource.getLeftResource != null) + } + if (nodeResource.getLeftResource != null) { persistenceResource.setLeftResource(serializeResource(nodeResource.getLeftResource)) - if (nodeResource.getUsedResource != null) + } + if (nodeResource.getUsedResource != null) { persistenceResource.setUsedResource(serializeResource(nodeResource.getUsedResource)) + } + if (nodeResource.getId != null && nodeResource.getId > 0) { + persistenceResource.setId(nodeResource.getId) + } persistenceResource.setResourceType(nodeResource.getResourceType.toString()) persistenceResource } @@ -56,23 +65,31 @@ object ResourceUtils { def fromPersistenceResource(persistenceResource: PersistenceResource): CommonNodeResource = { if (persistenceResource == null) return null val nodeResource = new CommonNodeResource - if (persistenceResource.getId != null) nodeResource.setId(persistenceResource.getId) - if (persistenceResource.getMaxResource != null) + if (persistenceResource.getId > 0) nodeResource.setId(persistenceResource.getId) + if (persistenceResource.getMaxResource != null) { nodeResource.setMaxResource(deserializeResource(persistenceResource.getMaxResource)) - if (persistenceResource.getMinResource != null) + } + if (persistenceResource.getMinResource != null) { nodeResource.setMinResource(deserializeResource(persistenceResource.getMinResource)) - if (persistenceResource.getLockedResource != null) + } + if (persistenceResource.getLockedResource != null) { nodeResource.setLockedResource(deserializeResource(persistenceResource.getLockedResource)) - if (persistenceResource.getExpectedResource != null) + } + if (persistenceResource.getExpectedResource != null) { nodeResource.setExpectedResource(deserializeResource(persistenceResource.getExpectedResource)) - if (persistenceResource.getLeftResource != null) + } + if (persistenceResource.getLeftResource != null) { nodeResource.setLeftResource(deserializeResource(persistenceResource.getLeftResource)) - if (persistenceResource.getUsedResource != null) + } + if (persistenceResource.getUsedResource != null) { nodeResource.setUsedResource(deserializeResource(persistenceResource.getUsedResource)) - if (persistenceResource.getCreateTime != null) + } + if (persistenceResource.getCreateTime != null) { nodeResource.setCreateTime(persistenceResource.getCreateTime) - if (persistenceResource.getUpdateTime != null) + } + if (persistenceResource.getUpdateTime != null) { nodeResource.setUpdateTime(persistenceResource.getUpdateTime) + } nodeResource.setResourceType(ResourceType.valueOf(persistenceResource.getResourceType)) nodeResource } @@ -80,23 +97,31 @@ object ResourceUtils { def fromPersistenceResourceAndUser(persistenceResource: PersistenceResource): UserResource = { if (persistenceResource == null) return null val nodeResource = new UserResource - if (persistenceResource.getId != null) nodeResource.setId(persistenceResource.getId) - if (persistenceResource.getMaxResource != null) + if (persistenceResource.getId > 0) nodeResource.setId(persistenceResource.getId) + if (persistenceResource.getMaxResource != null) { nodeResource.setMaxResource(deserializeResource(persistenceResource.getMaxResource)) - if (persistenceResource.getMinResource != null) + } + if (persistenceResource.getMinResource != null) { nodeResource.setMinResource(deserializeResource(persistenceResource.getMinResource)) - if (persistenceResource.getLockedResource != null) + } + if (persistenceResource.getLockedResource != null) { nodeResource.setLockedResource(deserializeResource(persistenceResource.getLockedResource)) - if (persistenceResource.getExpectedResource != null) + } + if (persistenceResource.getExpectedResource != null) { nodeResource.setExpectedResource(deserializeResource(persistenceResource.getExpectedResource)) - if (persistenceResource.getLeftResource != null) + } + if (persistenceResource.getLeftResource != null) { nodeResource.setLeftResource(deserializeResource(persistenceResource.getLeftResource)) - if (persistenceResource.getUsedResource != null) + } + if (persistenceResource.getUsedResource != null) { nodeResource.setUsedResource(deserializeResource(persistenceResource.getUsedResource)) - if (persistenceResource.getCreateTime != null) + } + if (persistenceResource.getCreateTime != null) { nodeResource.setCreateTime(persistenceResource.getCreateTime) - if (persistenceResource.getUpdateTime != null) + } + if (persistenceResource.getUpdateTime != null) { nodeResource.setUpdateTime(persistenceResource.getUpdateTime) + } nodeResource.setResourceType(ResourceType.valueOf(persistenceResource.getResourceType)) nodeResource } @@ -117,30 +142,35 @@ object ResourceUtils { if (resourceType.equals(ResourceType.LoadInstance)) { if (nodeResource.getResourceType.equals(ResourceType.DriverAndYarn)) { nodeResource.setResourceType(resourceType) - if (nodeResource.getMaxResource != null) + if (nodeResource.getMaxResource != null) { nodeResource.setMaxResource( nodeResource.getMaxResource.asInstanceOf[DriverAndYarnResource].loadInstanceResource ) - if (nodeResource.getMinResource != null) + } + if (nodeResource.getMinResource != null) { nodeResource.setMinResource( nodeResource.getMinResource.asInstanceOf[DriverAndYarnResource].loadInstanceResource ) - if (nodeResource.getUsedResource != null) + } + if (nodeResource.getUsedResource != null) { nodeResource.setUsedResource( nodeResource.getUsedResource.asInstanceOf[DriverAndYarnResource].loadInstanceResource ) - if (nodeResource.getLockedResource != null) + } + if (nodeResource.getLockedResource != null) { nodeResource.setLockedResource( nodeResource.getLockedResource .asInstanceOf[DriverAndYarnResource] .loadInstanceResource ) - if (nodeResource.getExpectedResource != null) + } + if (nodeResource.getExpectedResource != null) { nodeResource.setExpectedResource( nodeResource.getExpectedResource .asInstanceOf[DriverAndYarnResource] .loadInstanceResource ) + } if ( nodeResource.getLeftResource != null && nodeResource.getLeftResource .isInstanceOf[DriverAndYarnResource] diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/LockManagerMapper.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/LockManagerMapper.java index 023822f4e5..4d115d5a20 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/LockManagerMapper.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/LockManagerMapper.java @@ -21,16 +21,25 @@ import org.apache.ibatis.annotations.*; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Date; import java.util.List; @Mapper public interface LockManagerMapper { - void lock(@Param("lockObject") String lockObject, @Param("timeOut") Long timeOut); + @Transactional(rollbackFor = Exception.class) + int lock(PersistenceLock persistenceLock); + @Transactional(rollbackFor = Exception.class) void unlock(@Param("id") Integer id); - List getLockersByLockObject(@Param("lockObject") String lockObject); + Integer getMinimumOrder(@Param("lockObject") String lockObject, @Param("id") Integer id); + + List getLockersByLockObject(@Param("lock_object") String lock_object); List getAll(); + + List getTimeOutLocks(@Param("endDate") Date endDate); } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ECResourceRecordMapper.xml b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ECResourceRecordMapper.xml index 90d3165dc7..0ffdd9f4d1 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ECResourceRecordMapper.xml +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/ECResourceRecordMapper.xml @@ -25,7 +25,7 @@ diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/LockManagerMapper.xml b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/LockManagerMapper.xml index 7814ee77ad..8c0339ea2d 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/LockManagerMapper.xml +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/LockManagerMapper.xml @@ -15,75 +15,54 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - + - - - - - - - - - - - - - - - - - - - - - - - - - - + + lock_object, time_out, update_time, create_time + - - - - - - - - - - - - - - - - + + + + + + + + + - - - - - - - insert into linkis_cg_manager_lock (lock_object, time_out, update_time, create_time) - values(#{lockObject}, #{timeOut}, now(), now()) + + INSERT INTO linkis_cg_manager_lock() + VALUES ( + #{lockObject}, + #{timeOut}, + #{updateTime}, + #{createTime} + ) - delete from linkis_cg_manager_lock where id = #{id} + delete from linkis_cg_manager_lock where id = #{id} - + select * from linkis_cg_manager_lock where lock_object = #{lock_object} + + + - select * from linkis_cg_manager_lock + + \ No newline at end of file diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/LockManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/LockManagerPersistence.java index 1d83f416d4..f6e34ca29c 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/LockManagerPersistence.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/LockManagerPersistence.java @@ -19,6 +19,7 @@ import org.apache.linkis.manager.common.entity.persistence.PersistenceLock; +import java.util.Date; import java.util.List; public interface LockManagerPersistence { @@ -27,4 +28,6 @@ public interface LockManagerPersistence { void unlock(PersistenceLock persistenceLock); List getAll(); + + List getTimeOutLocks(Date endDate); } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultLockManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultLockManagerPersistence.java index 0236cd61a3..55281cbe7e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultLockManagerPersistence.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultLockManagerPersistence.java @@ -20,9 +20,11 @@ import org.apache.linkis.manager.common.entity.persistence.PersistenceLock; import org.apache.linkis.manager.dao.LockManagerMapper; import org.apache.linkis.manager.persistence.LockManagerPersistence; +import org.apache.linkis.manager.util.PersistenceManagerConf; -import org.springframework.dao.DataAccessException; +import org.apache.commons.lang3.StringUtils; +import java.util.Date; import java.util.List; import org.slf4j.Logger; @@ -44,45 +46,73 @@ public void setLockManagerMapper(LockManagerMapper lockManagerMapper) { @Override public Boolean lock(PersistenceLock persistenceLock, Long timeOut) { + try { + return tryQueueLock(persistenceLock, timeOut); + } catch (Exception e) { + logger.error("Failed to get queue lock", e); + if (persistenceLock.getId() > 0) { + unlock(persistenceLock); + } + } + return false; + } + + private boolean tryQueueLock(PersistenceLock persistenceLock, Long timeOut) { long startTime = System.currentTimeMillis(); - Boolean isLocked = tryLock(persistenceLock, timeOut); + if (StringUtils.isBlank(persistenceLock.getLockObject())) { + return true; + } + persistenceLock.setTimeOut(timeOut); + String syncLocker = persistenceLock.getLockObject().intern(); + synchronized (syncLocker) { + // insert lock The order is determined by the id auto-incrementing number + do { + lockManagerMapper.lock(persistenceLock); + } while (persistenceLock.getId() < 0); + } + boolean isLocked = isAcquireLock(persistenceLock); while (!isLocked && System.currentTimeMillis() - startTime < timeOut) { try { - Thread.sleep(1000); // TODO - isLocked = tryLock(persistenceLock, timeOut); - } catch (InterruptedException e) { - logger.warn("lock waiting interrupted", e); + if (PersistenceManagerConf.Distributed_lock_request_sync_enabled) { + synchronized (syncLocker) { + syncLocker.wait(PersistenceManagerConf.Distributed_lock_request_interval); + isLocked = isAcquireLock(persistenceLock); + if (isLocked) { + syncLocker.notifyAll(); + } + } + } else { + Thread.sleep(PersistenceManagerConf.Distributed_lock_request_interval); + isLocked = isAcquireLock(persistenceLock); + } + } catch (Exception e) { + logger.info("lock waiting failed", e); } } + if (!isLocked) { + logger.error( + "Failed to get lock by time out {} s", (System.currentTimeMillis() - startTime) / 1000); + unlock(persistenceLock); + } return isLocked; } - private boolean tryLock(PersistenceLock persistenceLock, Long timeOut) { - try { - List lockers = - lockManagerMapper.getLockersByLockObject(persistenceLock.getLockObject()); - if (lockers == null || lockers.isEmpty()) { - lockManagerMapper.lock(persistenceLock.getLockObject(), timeOut); - return true; - } else { - logger.info( - "Failed to obtain lock {} ,Because locker is exists", persistenceLock.getLockObject()); - return false; - } - } catch (DataAccessException e) { - logger.warn("Failed to obtain lock:" + persistenceLock.getLockObject()); + private boolean isAcquireLock(PersistenceLock persistenceLock) { + Integer minimumOrder = + lockManagerMapper.getMinimumOrder(persistenceLock.getLockObject(), persistenceLock.getId()); + if (null == minimumOrder || minimumOrder >= persistenceLock.getId()) { + return true; + } else { return false; } } @Override public void unlock(PersistenceLock persistenceLock) { - List lockers = - lockManagerMapper.getLockersByLockObject(persistenceLock.getLockObject()); - if (lockers != null && !lockers.isEmpty()) { - for (PersistenceLock lock : lockers) { - lockManagerMapper.unlock(lock.getId()); - } + if (persistenceLock.getId() > 0) { + lockManagerMapper.unlock(persistenceLock.getId()); + } else { + logger.error("Unlock{} id cannot be null", persistenceLock.getLockObject()); } } @@ -90,4 +120,9 @@ public void unlock(PersistenceLock persistenceLock) { public List getAll() { return lockManagerMapper.getAll(); } + + @Override + public List getTimeOutLocks(Date endDate) { + return lockManagerMapper.getTimeOutLocks(endDate); + } } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/util/PersistenceManagerConf.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/util/PersistenceManagerConf.java new file mode 100644 index 0000000000..b31d7cf4db --- /dev/null +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/util/PersistenceManagerConf.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.manager.util; + +import org.apache.linkis.common.conf.CommonVars$; + +public class PersistenceManagerConf { + + public static final Integer Distributed_lock_request_interval = + CommonVars$.MODULE$.apply("linkis.master.distributed.lock.request.interval", 200).getValue(); + + public static final Boolean Distributed_lock_request_sync_enabled = + CommonVars$.MODULE$ + .apply("linkis.master.distributed.lock.request.sync.enabled", true) + .getValue(); +} diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/java/org/apache/linkis/manager/dao/LockManagerMapperTest.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/java/org/apache/linkis/manager/dao/LockManagerMapperTest.java index df7f8591ba..aac734b18a 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/java/org/apache/linkis/manager/dao/LockManagerMapperTest.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/java/org/apache/linkis/manager/dao/LockManagerMapperTest.java @@ -33,7 +33,11 @@ class LockManagerMapperTest extends BaseDaoTest { @Test void lock() { - lockManagerMapper.lock("testjson", 1L); + PersistenceLock persistenceLock = new PersistenceLock(); + persistenceLock.setTimeOut(1L); + persistenceLock.setLockObject("testjson"); + persistenceLock.setCreator("hadoop"); + lockManagerMapper.lock(persistenceLock); List list = lockManagerMapper.getLockersByLockObject("testjson"); assertTrue(list.size() >= 1); }