Skip to content

Commit

Permalink
0003325: Support cluster lock keep-alive for routing
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Nov 30, 2017
1 parent b20758b commit 0dc914d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 5 deletions.
Expand Up @@ -280,6 +280,7 @@ private ParameterConstants() {
public final static String CLUSTER_SERVER_ID = "cluster.server.id";
public final static String CLUSTER_LOCKING_ENABLED = "cluster.lock.enabled";
public final static String CLUSTER_LOCK_TIMEOUT_MS = "cluster.lock.timeout.ms";
public final static String CLUSTER_LOCK_REFRESH_MS = "cluster.lock.refresh.ms";
public final static String LOCK_TIMEOUT_MS = "lock.timeout.ms";
public final static String LOCK_WAIT_RETRY_MILLIS = "lock.wait.retry.ms";

Expand Down
Expand Up @@ -35,6 +35,8 @@ public interface IClusterService {

public boolean lock(String action);

public boolean refreshLock(String action);

public boolean lock(String action, String lockType);

public boolean lock(String action, String lockType, long waitMillis);
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class ClusterService extends AbstractService implements IClusterService {

private String serverId = null;

private Map<String, Lock> lockCache;
private Map<String, Lock> lockCache = new ConcurrentHashMap<String, Lock>();

public ClusterService(IParameterService parameterService, ISymmetricDialect dialect) {
super(parameterService, dialect);
Expand Down Expand Up @@ -113,7 +114,7 @@ protected void initLockTable(final String action, final String lockType) {
}

protected void initCache() {
lockCache = new HashMap<String, Lock>();
lockCache .clear();
for (String action : actions) {
Lock lock = new Lock();
lock.setLockAction(action);
Expand Down Expand Up @@ -166,8 +167,13 @@ protected boolean lockCluster(String action, Date timeToBreakLock, Date timeLock
String serverId) {
if (isClusteringEnabled()) {
try {
return sqlTemplate.update(getSql("acquireClusterLockSql"), new Object[] { serverId,

boolean lockAcquired = sqlTemplate.update(getSql("acquireClusterLockSql"), new Object[] { serverId,
timeLockAcquired, action, TYPE_CLUSTER, timeToBreakLock, serverId }) == 1;
if (lockAcquired) {
updateCacheLockTime(action, timeLockAcquired);
}
return lockAcquired;
} catch (ConcurrencySqlException ex) {
log.debug("Ignoring concurrency error and reporting that we failed to get the cluster lock: {}", ex.getMessage());
}
Expand All @@ -187,6 +193,15 @@ protected boolean lockCluster(String action, Date timeToBreakLock, Date timeLock
return false;
}

protected void updateCacheLockTime(String action, Date timeLockAcquired) {
Lock lock = lockCache.get(action);
if (lock != null) {
synchronized (lock) {
lock.setLockTime(timeLockAcquired);
}
}
}

protected boolean lockShared(final String action) {
final Date timeout = DateUtils.addMilliseconds(new Date(),
(int) -parameterService.getLong(ParameterConstants.LOCK_TIMEOUT_MS));
Expand Down Expand Up @@ -348,6 +363,7 @@ public void unlock(final String action) {

protected boolean unlockCluster(String action, String serverId) {
if (isClusteringEnabled()) {
updateCacheLockTime(action, null);
return sqlTemplate.update(getSql("releaseClusterLockSql"), new Object[] { action,
TYPE_CLUSTER, serverId }) > 0;
} else {
Expand Down Expand Up @@ -462,4 +478,28 @@ public void clearInfiniteLock(String action) {
}
}

@Override
public boolean refreshLock(String action) {
if (isLockRefreshNeeded(action)) {
return lock(action);
}
return true;
}

protected boolean isLockRefreshNeeded(String action) {
if (isClusteringEnabled()) {
Lock lock = lockCache.get(action);
long clusterLockRefreshMs = this.parameterService.getLong(ParameterConstants.CLUSTER_LOCK_REFRESH_MS);
long refreshTime = new Date().getTime() - clusterLockRefreshMs;

if (lock != null && lock.getLockTime() != null && lock.getLockTime().getTime() < refreshTime) {
return true;
} else {
return false;
}
} else {
return false;
}
}

}
Expand Up @@ -205,9 +205,13 @@ synchronized public long routeData(boolean force) {
}
}
insertInitialLoadEvents();
engine.getClusterService().refreshLock(ClusterConstants.ROUTE);

long ts = System.currentTimeMillis();
gapDetector.beforeRouting();

engine.getClusterService().refreshLock(ClusterConstants.ROUTE);

dataCount = routeDataForEachChannel();
ts = System.currentTimeMillis() - ts;
if (dataCount > 0 || ts > Constants.LONG_OPERATION_THRESHOLD) {
Expand Down Expand Up @@ -474,6 +478,7 @@ protected int routeDataForEachChannel() {
readyChannels = getReadyChannels();
}
for (NodeChannel nodeChannel : channels) {
engine.getClusterService().refreshLock(ClusterConstants.ROUTE);
if (nodeChannel.isEnabled() && (readyChannels == null || readyChannels.contains(nodeChannel.getChannelId()))) {
processInfo.setCurrentChannelId(nodeChannel.getChannelId());
dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode);
Expand Down Expand Up @@ -753,6 +758,7 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
} finally {
long totalTime = System.currentTimeMillis() - ts;
if (context != null) {
engine.getClusterService().refreshLock(ClusterConstants.ROUTE);
context.incrementStat(totalTime, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
context.logStats(log, totalTime);
context.cleanup();
Expand Down Expand Up @@ -919,7 +925,8 @@ protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext c
}

long routeTs = System.currentTimeMillis() - ts;
if (routeTs > LOG_PROCESS_SUMMARY_THRESHOLD && context != null) {
if (routeTs > LOG_PROCESS_SUMMARY_THRESHOLD) {
engine.getClusterService().refreshLock(ClusterConstants.ROUTE);
log.info(
"Routing for channel '{}' has been processing for {} seconds. The following stats have been gathered: "
+ "totalDataRoutedCount={}, totalDataEventCount={}, startDataId={}, endDataId={}, dataReadCount={}, peekAheadFillCount={}, transactions={}, dataGaps={}",
Expand Down
Expand Up @@ -702,7 +702,7 @@ datareload.batch.insert.transactional=true

# Enables clustering of jobs.
#
# DatabaseOverridable: false
# DatabaseOverridable: true
# Tags: jobs
# Type: boolean
cluster.lock.enabled=false
Expand All @@ -719,6 +719,12 @@ cluster.server.id=
# Tags: jobs
cluster.lock.timeout.ms=7200000

# Period of time that certain locks will get refreshed while long processing happens. This value should be a small fraction of cluster.lock.timeout.ms
#
# DatabaseOverridable: true
# Tags: jobs
cluster.lock.refresh.ms=1200000

# The amount of time a thread can hold a shared or exclusive lock before another thread can break the lock.
# The timeout is a safeguard in case an unexpected exception causes a lock to be abandoned.
# Restarting the service will clear all locks.
Expand Down

0 comments on commit 0dc914d

Please sign in to comment.