Skip to content

Commit

Permalink
fix(jans-auth-server): different fixes for cluster node management
Browse files Browse the repository at this point in the history
#8562
Signed-off-by: YuriyZ <yzabrovarniy@gmail.com>
  • Loading branch information
yuriyz committed Jun 20, 2024
1 parent 3e7e322 commit c763ed9
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package io.jans.as.server.service.cluster;

import com.google.common.base.Preconditions;
import io.jans.as.server.service.cdi.event.TokenPoolUpdateEvent;
import io.jans.model.cluster.ClusterNode;
import io.jans.service.cdi.async.Asynchronous;
Expand Down Expand Up @@ -41,16 +42,19 @@ public class ClusterNodeManager {

private AtomicBoolean isActive;

private ClusterNode clusterNode;
private ClusterNode node;

@PostConstruct
public void init() {
log.info("Initializing Cluster Manager ...");
log.info("Initializing Cluster Node Manager ...");
this.isActive = new AtomicBoolean(false);

this.clusterNode = clusterNodeService.allocate();

log.info("Assigned cluster node id '{}' for this instance", clusterNode.getId());
this.node = clusterNodeService.allocate();
if (node != null) {
log.info("Assigned cluster node id '{}' for this instance", node.getId());
} else {
log.error("Failed to initialize Cluster Node Manager.");
}
}

public void initTimer() {
Expand All @@ -74,26 +78,30 @@ public void reloadNodesTimerEvent(@Observes @Scheduled TokenPoolUpdateEvent toke
}

try {
updateClusterNode();
updateNode();
} catch (Throwable ex) {
log.error("Exception happened while reloading nodes", ex);
} finally {
this.isActive.set(false);
}
}

private void updateClusterNode() {
clusterNodeService.refresh(clusterNode);
private void updateNode() {
checkNodeNotNull();
clusterNodeService.refresh(node);
}


public void destroy(@Observes @BeforeDestroyed(ApplicationScoped.class) ServletContext init) {
log.info("Stopping cluster manager...");
clusterNodeService.release(clusterNode);
log.info("Stopped cluster manager");
}

public Integer getClusterNodeId() {
return clusterNode.getId();
checkNodeNotNull();
return node.getId();
}

private void checkNodeNotNull() {
Preconditions.checkNotNull(node, "Failed to allocate cluster node.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,14 @@
import io.jans.model.cluster.ClusterNode;
import io.jans.orm.PersistenceEntryManager;
import io.jans.orm.exception.EntryPersistenceException;
import io.jans.orm.model.PagedResult;
import io.jans.orm.model.SortOrder;
import io.jans.orm.search.filter.Filter;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.tika.utils.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.*;

/**
* @author Yuriy Movchan
Expand All @@ -38,7 +33,7 @@ public class ClusterNodeService {
public static final String CLUSTER_TYPE_JANS_AUTH = "jans-auth";
public static final String JANS_TYPE_ATTR_NAME = "jansType";

public static final String lockKey = UUID.randomUUID().toString();
public static final String LOCK_KEY = UUID.randomUUID().toString();

@Inject
private Logger log;
Expand Down Expand Up @@ -99,12 +94,18 @@ public List<String> getClusterNodesDns(List<Integer> nodeIds) {
public ClusterNode getClusterNodeLast() {
String clusterNodesBaseDn = staticConfiguration.getBaseDn().getNode();

PagedResult<ClusterNode> pagedResult = entryManager.findPagedEntries(clusterNodesBaseDn, ClusterNode.class, getTypeFilter(), null, "jansNum", SortOrder.DESCENDING, 0, 1, 1);
if (pagedResult.getEntriesCount() >= 1) {
return pagedResult.getEntries().get(0);
}

return null;
final List<ClusterNode> allNodes = entryManager.findEntries(clusterNodesBaseDn, ClusterNode.class, getTypeFilter(), new String[0]);
final ClusterNode max = Collections.max(allNodes, Comparator.comparing(ClusterNode::getId));
log.debug("Last node: {}", max);
return max;

// todo - we need to use paged version when it is fixed in entry manager
// PagedResult<ClusterNode> pagedResult = entryManager.findPagedEntries(clusterNodesBaseDn, ClusterNode.class, getTypeFilter(), null, "jansNum", SortOrder.DESCENDING, 0, 1, 1);
// if (pagedResult.getEntriesCount() >= 1) {
// return pagedResult.getEntries().get(0);
// }
//
// return null;
}

/**
Expand All @@ -118,10 +119,10 @@ public List<ClusterNode> getClusterNodesExpired() {
throw new ConfigurationException("ou=node is not configured in static configuration of AS (jansConfStatic).");
}

Date expirationDate = new Date(System.currentTimeMillis() + DELAY_AFTER_EXPIRATION);
Date expirationDate = new Date(System.currentTimeMillis() - DELAY_AFTER_EXPIRATION);

Filter filter = Filter.createANDFilter(getTypeFilter(),
Filter.createGreaterOrEqualFilter("jansLastUpd", entryManager.encodeTime(clusterNodesBaseDn, expirationDate)));
Filter.createLessOrEqualFilter("jansLastUpd", entryManager.encodeTime(clusterNodesBaseDn, expirationDate)));

return entryManager.findEntries(clusterNodesBaseDn, ClusterNode.class, filter);
}
Expand All @@ -139,12 +140,14 @@ public void update(ClusterNode clusterNode) {
entryManager.merge(clusterNode);
}

// all logs must be INFO here, because allocate method is called during initialization before
// LoggerService set loggingLevel from config.
public ClusterNode allocate() {
log.debug("Allocation, lockKey {}... ", lockKey);
log.info("Allocation, LOCK_KEY {}... ", LOCK_KEY);

// Try to use existing expired entry (node is expired if not used for 3 minutes)
List<ClusterNode> expiredNodes = getClusterNodesExpired();
log.debug("Allocation - found {} expired nodes.", expiredNodes.size());
log.info("Allocation - found {} expired nodes.", expiredNodes.size());

for (ClusterNode expiredNode : expiredNodes) {
// Do lock operation in try/catch for safety and do not throw error to upper levels
Expand All @@ -153,20 +156,20 @@ public ClusterNode allocate() {

expiredNode.setCreationDate(currentTime);
expiredNode.setLastUpdate(currentTime);
expiredNode.setLockKey(lockKey);
expiredNode.setLockKey(LOCK_KEY);

update(expiredNode);

// Load node after update
ClusterNode lockedNode = getClusterNodeByDn(expiredNode.getDn());

// If lock is ours reset entry and return it
if (lockKey.equals(lockedNode.getLockKey())) {
log.debug("Re-using existing node {}, lockKey {}", lockedNode.getId(), lockKey);
if (LOCK_KEY.equals(lockedNode.getLockKey())) {
log.info("Re-using existing node {}, LOCK_KEY {}", lockedNode.getId(), LOCK_KEY);
return lockedNode;
}

log.debug("Failed to lock node {}, lockKey {}", lockedNode.getId(), lockKey);
log.info("Failed to lock node {}, LOCK_KEY {}", lockedNode.getId(), LOCK_KEY);
} catch (EntryPersistenceException ex) {
log.debug("Unexpected error happened during entry lock", ex);
}
Expand All @@ -175,10 +178,10 @@ public ClusterNode allocate() {
// There are no free entries. server need to add new one with next index
int attempt = 1;
do {
log.debug("Attempting to persist new node. Attempt {} out of {} ...", attempt, ATTEMPT_LIMIT);
log.info("Attempting to persist new node. Attempt {} out of {} ...", attempt, ATTEMPT_LIMIT);

ClusterNode lastClusterNode = getClusterNodeLast();
log.debug("lastClusterNode - {}, lockKey {}", lastClusterNode != null ? lastClusterNode.getId() : -1, lockKey);
log.info("lastClusterNode - {}, LOCK_KEY {}", lastClusterNode != null ? lastClusterNode.getId() : -1, LOCK_KEY);

Integer lastClusterNodeIndex = lastClusterNode == null ? 0 : lastClusterNode.getId() + 1;

Expand All @@ -190,7 +193,7 @@ public ClusterNode allocate() {
node.setCreationDate(currentTime);
node.setLastUpdate(currentTime);
node.setType(CLUSTER_TYPE_JANS_AUTH);
node.setLockKey(lockKey);
node.setLockKey(LOCK_KEY);

// Do persist operation in try/catch for safety and do not throw error to upper levels
try {
Expand All @@ -200,43 +203,38 @@ public ClusterNode allocate() {
ClusterNode lockedNode = getClusterNodeByDn(node.getDn());

// if lock is ours return it
if (lockKey.equals(lockedNode.getLockKey())) {
log.debug("Successfully create new cluster node {}, lockKey {}", node.getId(), lockKey);
return node;
}
if (LOCK_KEY.equals(lockedNode.getLockKey())) {
log.info("Successfully created new cluster node {}", node);
return lockedNode;
} else {
log.info("Locked key does not match. nodeLockKey {} of node {}", lockedNode.getLockKey(), lockedNode.getId());
}
} catch (EntryPersistenceException ex) {
log.debug("Unexpected error happened during entry lock, lockKey " + lockKey, ex);
log.debug("Unexpected error happened during entry lock, LOCK_KEY " + LOCK_KEY, ex);
}

attempt++;
} while (attempt <= ATTEMPT_LIMIT);

// This should not happen
throw new EntryPersistenceException("Failed to allocate ClusterNode!!! lockKey: " + lockKey);
}

public void release(ClusterNode clusterNode) {
clusterNode.setLastUpdate(null);
clusterNode.setCreationDate(null);
clusterNode.setLockKey(null);

update(clusterNode);
return null;
}

public void refresh(ClusterNode clusterNode) {
clusterNode.setLastUpdate(new Date());
public void refresh(ClusterNode node) {
node.setLastUpdate(new Date());

update(clusterNode);
log.trace("Refreshing node: {}", node);
update(node);
}

public ClusterNode reset(ClusterNode clusterNode) {
public ClusterNode reset(ClusterNode node) {
Date currentTime = new Date();
clusterNode.setCreationDate(currentTime);
clusterNode.setLastUpdate(currentTime);
node.setCreationDate(currentTime);
node.setLastUpdate(currentTime);

update(clusterNode);
log.trace("Reseting node: {}", node);
update(node);

return clusterNode;
return node;
}

public String getDnForClusterNode(Integer id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public StatusTokenPool updateWithLock(StatusTokenPool tokenPool) {

if (loadedTokenPool.getLockKey() == null) {
// No lock found
// Attempt to set random value in lockKey
// Attempt to set random value in LOCK_KEY
String lockKey = UUID.randomUUID().toString();
tokenPool.setLockKey(lockKey);

Expand Down Expand Up @@ -224,7 +224,7 @@ public StatusTokenPool allocate(Integer nodeId) {
List<StatusTokenPool> tokenPools = getTokenPoolsExpired();

for (StatusTokenPool tokenPool : tokenPools) {
// Attempt to set random value in lockKey
// Attempt to set random value in LOCK_KEY
String lockKey = UUID.randomUUID().toString();
tokenPool.setLockKey(lockKey);

Expand Down Expand Up @@ -257,7 +257,7 @@ public StatusTokenPool allocate(Integer nodeId) {
tokenPool.setNodeId(nodeId);
tokenPool.setLastUpdate(new Date());

// Attempt to set random value in lockKey
// Attempt to set random value in LOCK_KEY
String lockKey = UUID.randomUUID().toString();
tokenPool.setLockKey(lockKey);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.jans.model.cluster;

import java.util.Date;

import io.jans.orm.annotation.AttributeName;
import io.jans.orm.annotation.DataEntry;
import io.jans.orm.annotation.ObjectClass;
import io.jans.orm.model.base.BaseEntry;

import java.util.Date;

/**
* @author Yuriy Movchan
* @version 1.0, 06/03/2024
Expand Down Expand Up @@ -40,6 +40,18 @@ public void setId(Integer id) {
this.id = id;
}

// workaround: io.jans.orm.exception.PropertyNotFoundException: Could not find a getter for jansNum in class io.jans.model.cluster.ClusterNode
// at io.jans.orm.reflect.property.BasicPropertyAccessor.createGetter(BasicPropertyAccessor.java:214)
public Integer getJansNum() {
return getId();
}

// workaround: io.jans.orm.exception.PropertyNotFoundException: Could not find a getter for jansNum in class io.jans.model.cluster.ClusterNode
// at io.jans.orm.reflect.property.BasicPropertyAccessor.createGetter(BasicPropertyAccessor.java:214)
public void setJansNum(Integer id) {
setId(id);
}

public String getType() {
return type;
}
Expand Down Expand Up @@ -72,4 +84,15 @@ public void setLockKey(String lockKey) {
this.lockKey = lockKey;
}

@Override
public String toString() {
return "ClusterNode{" +
"id=" + id +
", type='" + type + '\'' +
", creationDate=" + creationDate +
", lastUpdate=" + lastUpdate +
", lockKey='" + lockKey + '\'' +
", dn='" + getDn() + '\'' +
"} ";
}
}

0 comments on commit c763ed9

Please sign in to comment.