Skip to content

Commit

Permalink
RM resource operation optimization lock bit segment lock close apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Oct 27, 2022
1 parent 885c9d3 commit d38bc0a
Show file tree
Hide file tree
Showing 15 changed files with 767 additions and 584 deletions.
Expand Up @@ -30,6 +30,8 @@
import org.apache.linkis.manager.label.utils.LabelUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -74,9 +76,19 @@ public List<Label<?>> getLabels() {

public List<Label<?>> getResourceLabels() {
if (null != labels) {
return labels.stream()
.filter(label -> label instanceof ResourceLabel)
.collect(Collectors.toList());
List<Label<?>> resourceLabels =
labels.stream()
.filter(label -> label instanceof ResourceLabel)
.sorted(
new Comparator<Label<?>>() {
@Override
public int compare(Label<?> label1, Label<?> label12) {
return label1.getLabelKey().compareTo(label12.getLabelKey());
}
})
.collect(Collectors.toList());
Collections.reverse(resourceLabels);
return resourceLabels;
}
return new ArrayList<>();
}
Expand Down
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.manager.rm.exception;

import org.apache.linkis.common.exception.LinkisRetryException;

public class RMLockFailedRetryException extends LinkisRetryException {

public RMLockFailedRetryException(int errCode, String desc) {
super(errCode, desc);
}
}
Expand Up @@ -46,6 +46,7 @@ import java.util
import java.util.concurrent.ExecutorService

import scala.collection.JavaConverters._
import scala.collection.mutable

@Component
class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
Expand Down Expand Up @@ -140,14 +141,13 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
*/
private def dealECNodes(engineNodes: util.List[Node]): Unit = {
val existingEngineInstances = Sender.getInstances(ecName)
val clearECSet = new mutable.HashSet[ServiceInstance]()
engineNodes.asScala.foreach { engineNode =>
if (NodeStatus.isCompleted(engineNode.getNodeStatus)) {
logger.info(
s"${engineNode.getServiceInstance} is completed ${engineNode.getNodeStatus}, will be remove"
)
Utils.tryAndWarnMsg(clearEngineNode(engineNode.getServiceInstance))(
"clear engine node failed"
)
clearECSet.add(engineNode.getServiceInstance)
} else {
val engineIsStarted =
(System.currentTimeMillis() - engineNode.getStartTime.getTime) > maxCreateInterval
Expand All @@ -162,18 +162,23 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
logger.warn(
s"Failed to find instance ${engineNode.getServiceInstance} from eureka prepare to kill, engineIsStarted"
)
Utils.tryAndWarnMsg(clearEngineNode(engineNode.getServiceInstance))(
"engineIsStarted clear failed"
)
clearECSet.add(engineNode.getServiceInstance)
}
} else if (updateOverdue) {
logger.warn(s" ${engineNode.getServiceInstance} heartbeat updateOverdue")
Utils.tryAndWarnMsg(clearEngineNode(engineNode.getServiceInstance))(
"updateOverdue clear failed"
)
clearECSet.add(engineNode.getServiceInstance)
}
}
}
clearECSet.foreach(clearEngineNode)
}

private def updateMetrics(node: Node): Unit = {
val metric = nodeMetricManagerPersistence.getNodeMetrics(node)
if (null != metric) {
node.setNodeStatus(NodeStatus.values()(metric.getStatus))
node.setUpdateTime(metric.getUpdateTime)
}
}

private def dealECMNotExistsInRegistry(ecmNodes: util.List[Node]): Unit = {
Expand All @@ -186,12 +191,21 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
}
val updateOverdue = (System.currentTimeMillis() - updateTime) > ecmHeartBeatTime
if (!existingECMInstances.contains(ecm.getServiceInstance) && updateOverdue) {
logger.warn(
s"Failed to find ecm instance ${ecm.getServiceInstance} from eureka Registry to kill"
)
Utils.tryAndWarnMsg(triggerEMSuicide(ecm.getServiceInstance))(
s"ecm ${ecm.getServiceInstance} clear failed"
)
Utils.tryAndWarn(updateMetrics(ecm))
val isUpdateOverdue = if (null == ecm.getUpdateTime) {
(System.currentTimeMillis() - ecm.getStartTime.getTime) > ecmHeartBeatTime
} else {
(System.currentTimeMillis() - ecm.getUpdateTime.getTime) > ecmHeartBeatTime
}
val isExistingECMInstances = Sender.getInstances(ecmName).contains(ecm.getServiceInstance)
if (!isExistingECMInstances && isUpdateOverdue) {
logger.warn(
s"Failed to find ecm instance ${ecm.getServiceInstance} from eureka Registry to kill"
)
Utils.tryAndWarnMsg(triggerEMSuicide(ecm.getServiceInstance))(
s"ecm ${ecm.getServiceInstance} clear failed"
)
}
}
}
}
Expand Down
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.manager.rm.entity

import org.apache.linkis.manager.common.entity.resource.Resource
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.rm.entity.ResourceOperationType.ResourceOperationType

class LabelResourceMapping(
label: Label[_],
resource: Resource,
resourceOperationType: ResourceOperationType
) {

override def equals(obj: Any): Boolean = {
obj match {
case labelResourceMap: LabelResourceMapping =>
labelResourceMap.getLabel().getStringValue.equals(label.getStringValue)
case _ => false
}
}

override def hashCode(): Int = {
super.hashCode()
}

def getLabel(): Label[_] = label

def getResource(): Resource = resource

def getResourceOperationType: ResourceOperationType = resourceOperationType

override def toString: String = {
s"Label ${label.getStringValue} mapping resource ${resource}"
}

}
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.manager.rm.entity

object ResourceOperationType extends Enumeration {

type ResourceOperationType = Value

val LOCK, USED, USED_RELEASE, LOCKER_RELEASE = Value

}
Expand Up @@ -17,11 +17,9 @@

package org.apache.linkis.manager.rm.service

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.manager.common.entity.persistence.PersistenceLock
import org.apache.linkis.manager.label.entity.ResourceLabel
import org.apache.linkis.manager.persistence.LockManagerPersistence
import org.apache.linkis.manager.rm.domain.RMLabelContainer

import org.apache.commons.lang3.StringUtils

Expand All @@ -40,72 +38,42 @@ class ResourceLockService extends Logging {
@Autowired
var lockManagerPersistence: LockManagerPersistence = _

def tryLock(labelContainer: RMLabelContainer): Boolean = tryLock(labelContainer, Long.MaxValue)

def tryLock(labelContainer: RMLabelContainer, timeout: Long): Boolean = {
if (
StringUtils.isBlank(labelContainer.getCurrentLabel.getStringValue)
|| !labelContainer.getCurrentLabel.isInstanceOf[ResourceLabel]
|| labelContainer.getLockedLabels.contains(labelContainer.getCurrentLabel)
) {
def tryLock(persistenceLock: PersistenceLock, timeout: Long): Boolean = {
if (StringUtils.isBlank(persistenceLock.getLockObject)) {
return true
}
val lockedBy =
if (labelContainer.getUserCreatorLabel == null) DEFAULT_LOCKED_BY
else labelContainer.getUserCreatorLabel.getUser
val persistenceLock = new PersistenceLock
persistenceLock.setLockObject(labelContainer.getCurrentLabel.getStringValue)
persistenceLock.setCreateTime(new Date)
persistenceLock.setCreator(lockedBy)
persistenceLock.setUpdateTime(new Date)
persistenceLock.setUpdator(lockedBy)
try {
Utils.tryCatch {
val isLocked: Boolean = if (timeout > 0) {
lockManagerPersistence.lock(persistenceLock, timeout)
} else {
lockManagerPersistence.lock(persistenceLock, Long.MaxValue)
}
if (isLocked) {
logger.info(
labelContainer.getCurrentLabel + " successfully locked label" + persistenceLock.getLockObject
)
labelContainer.getLockedLabels.add(labelContainer.getCurrentLabel)
logger.info("successfully locked label" + persistenceLock.getLockObject)
}
isLocked
} catch {
case t: Throwable =>
logger.error(s"failed to lock label [${persistenceLock.getLockObject}]", t)
false
} { case t: Throwable =>
logger.error(s"failed to lock label [${persistenceLock.getLockObject}]", t)
false
}
}

def unLock(labelContainer: RMLabelContainer): Unit = {
val labelIterator = labelContainer.getLockedLabels.iterator
while (labelIterator.hasNext) {
val label = labelIterator.next
if (!StringUtils.isBlank(label.getStringValue)) {
val persistenceLock = new PersistenceLock
persistenceLock.setLockObject(label.getStringValue)
try {
lockManagerPersistence.unlock(persistenceLock)
logger.info("unlocked " + persistenceLock.getLockObject)
} catch {
case t: Throwable =>
logger.error(s"failed to unlock label [${persistenceLock.getLockObject}]", t)
throw t
}
labelIterator.remove
}
def unLock(persistenceLock: PersistenceLock): Unit = {
Utils.tryCatch {
lockManagerPersistence.unlock(persistenceLock)
logger.info("unlocked " + persistenceLock.getLockObject)
} { case t: Throwable =>
logger.error(s"failed to unlock label [${persistenceLock.getLockObject}]", t)
}
}

def clearTimeoutLock(timeout: Long): Unit = {
val currentTime = System.currentTimeMillis
lockManagerPersistence.getAll.asScala.foreach { lock =>
if (currentTime - lock.getCreateTime.getTime > timeout) {
lockManagerPersistence.unlock(lock)
logger.warn("timeout force unlock " + lock.getLockObject)
}
val endDate = new Date(System.currentTimeMillis() - timeout)
val locks = lockManagerPersistence.getTimeOutLocks(endDate)
if (null == locks) return
locks.asScala.foreach { lock =>
Utils.tryAndWarn(lockManagerPersistence.unlock(lock))
logger.warn("timeout force unlock " + lock.getLockObject)
}
}

Expand Down

0 comments on commit d38bc0a

Please sign in to comment.