From 0dc914d65d1c14a9dcb82297968a7107b5296a3a Mon Sep 17 00:00:00 2001 From: mmichalek Date: Wed, 29 Nov 2017 23:31:09 -0500 Subject: [PATCH] 0003325: Support cluster lock keep-alive for routing --- .../symmetric/common/ParameterConstants.java | 1 + .../symmetric/service/IClusterService.java | 2 + .../service/impl/ClusterService.java | 46 +++++++++++++++++-- .../symmetric/service/impl/RouterService.java | 9 +++- .../resources/symmetric-default.properties | 8 +++- 5 files changed, 61 insertions(+), 5 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 8c28e4657d..7da56236c4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -280,6 +280,7 @@ private ParameterConstants() { public final static String CLUSTER_SERVER_ID = "cluster.server.id"; public final static String CLUSTER_LOCKING_ENABLED = "cluster.lock.enabled"; public final static String CLUSTER_LOCK_TIMEOUT_MS = "cluster.lock.timeout.ms"; + public final static String CLUSTER_LOCK_REFRESH_MS = "cluster.lock.refresh.ms"; public final static String LOCK_TIMEOUT_MS = "lock.timeout.ms"; public final static String LOCK_WAIT_RETRY_MILLIS = "lock.wait.retry.ms"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java index 273773988b..cecd4a6b97 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java @@ -35,6 +35,8 @@ public interface IClusterService { public boolean lock(String action); + public boolean refreshLock(String action); + public boolean lock(String action, String lockType); public boolean lock(String action, String lockType, long waitMillis); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java index 8534602030..60729a557f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java @@ -27,6 +27,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; @@ -55,7 +56,7 @@ public class ClusterService extends AbstractService implements IClusterService { private String serverId = null; - private Map lockCache; + private Map lockCache = new ConcurrentHashMap(); public ClusterService(IParameterService parameterService, ISymmetricDialect dialect) { super(parameterService, dialect); @@ -113,7 +114,7 @@ protected void initLockTable(final String action, final String lockType) { } protected void initCache() { - lockCache = new HashMap(); + lockCache .clear(); for (String action : actions) { Lock lock = new Lock(); lock.setLockAction(action); @@ -166,8 +167,13 @@ protected boolean lockCluster(String action, Date timeToBreakLock, Date timeLock String serverId) { if (isClusteringEnabled()) { try { - return sqlTemplate.update(getSql("acquireClusterLockSql"), new Object[] { serverId, + + boolean lockAcquired = sqlTemplate.update(getSql("acquireClusterLockSql"), new Object[] { serverId, timeLockAcquired, action, TYPE_CLUSTER, timeToBreakLock, serverId }) == 1; + if (lockAcquired) { + updateCacheLockTime(action, timeLockAcquired); + } + return lockAcquired; } catch (ConcurrencySqlException ex) { log.debug("Ignoring concurrency error and reporting that we failed to get the cluster lock: {}", ex.getMessage()); } @@ -187,6 +193,15 @@ protected boolean lockCluster(String action, Date timeToBreakLock, Date timeLock return false; } + protected void updateCacheLockTime(String action, Date timeLockAcquired) { + Lock lock = lockCache.get(action); + if (lock != null) { + synchronized (lock) { + lock.setLockTime(timeLockAcquired); + } + } + } + protected boolean lockShared(final String action) { final Date timeout = DateUtils.addMilliseconds(new Date(), (int) -parameterService.getLong(ParameterConstants.LOCK_TIMEOUT_MS)); @@ -348,6 +363,7 @@ public void unlock(final String action) { protected boolean unlockCluster(String action, String serverId) { if (isClusteringEnabled()) { + updateCacheLockTime(action, null); return sqlTemplate.update(getSql("releaseClusterLockSql"), new Object[] { action, TYPE_CLUSTER, serverId }) > 0; } else { @@ -462,4 +478,28 @@ public void clearInfiniteLock(String action) { } } + @Override + public boolean refreshLock(String action) { + if (isLockRefreshNeeded(action)) { + return lock(action); + } + return true; + } + + protected boolean isLockRefreshNeeded(String action) { + if (isClusteringEnabled()) { + Lock lock = lockCache.get(action); + long clusterLockRefreshMs = this.parameterService.getLong(ParameterConstants.CLUSTER_LOCK_REFRESH_MS); + long refreshTime = new Date().getTime() - clusterLockRefreshMs; + + if (lock != null && lock.getLockTime() != null && lock.getLockTime().getTime() < refreshTime) { + return true; + } else { + return false; + } + } else { + return false; + } + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 7888f9a0eb..b83dce767e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -205,9 +205,13 @@ synchronized public long routeData(boolean force) { } } insertInitialLoadEvents(); + engine.getClusterService().refreshLock(ClusterConstants.ROUTE); long ts = System.currentTimeMillis(); gapDetector.beforeRouting(); + + engine.getClusterService().refreshLock(ClusterConstants.ROUTE); + dataCount = routeDataForEachChannel(); ts = System.currentTimeMillis() - ts; if (dataCount > 0 || ts > Constants.LONG_OPERATION_THRESHOLD) { @@ -474,6 +478,7 @@ protected int routeDataForEachChannel() { readyChannels = getReadyChannels(); } for (NodeChannel nodeChannel : channels) { + engine.getClusterService().refreshLock(ClusterConstants.ROUTE); if (nodeChannel.isEnabled() && (readyChannels == null || readyChannels.contains(nodeChannel.getChannelId()))) { processInfo.setCurrentChannelId(nodeChannel.getChannelId()); dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode); @@ -753,6 +758,7 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod } finally { long totalTime = System.currentTimeMillis() - ts; if (context != null) { + engine.getClusterService().refreshLock(ClusterConstants.ROUTE); context.incrementStat(totalTime, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME); context.logStats(log, totalTime); context.cleanup(); @@ -919,7 +925,8 @@ protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext c } long routeTs = System.currentTimeMillis() - ts; - if (routeTs > LOG_PROCESS_SUMMARY_THRESHOLD && context != null) { + if (routeTs > LOG_PROCESS_SUMMARY_THRESHOLD) { + engine.getClusterService().refreshLock(ClusterConstants.ROUTE); log.info( "Routing for channel '{}' has been processing for {} seconds. The following stats have been gathered: " + "totalDataRoutedCount={}, totalDataEventCount={}, startDataId={}, endDataId={}, dataReadCount={}, peekAheadFillCount={}, transactions={}, dataGaps={}", diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 58b0f36945..3755fc52c4 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -702,7 +702,7 @@ datareload.batch.insert.transactional=true # Enables clustering of jobs. # -# DatabaseOverridable: false +# DatabaseOverridable: true # Tags: jobs # Type: boolean cluster.lock.enabled=false @@ -719,6 +719,12 @@ cluster.server.id= # Tags: jobs cluster.lock.timeout.ms=7200000 +# Period of time that certain locks will get refreshed while long processing happens. This value should be a small fraction of cluster.lock.timeout.ms +# +# DatabaseOverridable: true +# Tags: jobs +cluster.lock.refresh.ms=1200000 + # The amount of time a thread can hold a shared or exclusive lock before another thread can break the lock. # The timeout is a safeguard in case an unexpected exception causes a lock to be abandoned. # Restarting the service will clear all locks.