From 54dc905c9a86eb7630cc0679a24d58e33a55440e Mon Sep 17 00:00:00 2001 From: erilong Date: Fri, 28 Feb 2014 01:25:52 +0000 Subject: [PATCH] 0001609: File sync mutex limits scaling multiple clients --- .../service/impl/JdbcClusterServiceTest.java | 25 +++ .../symmetric/common/ParameterConstants.java | 2 + .../org/jumpmind/symmetric/model/Lock.java | 27 +++ .../symmetric/service/ClusterConstants.java | 6 +- .../symmetric/service/IClusterService.java | 8 +- .../service/impl/ClusterService.java | 128 +++++++++++--- .../service/impl/ClusterServiceSqlMap.java | 56 ++++-- .../resources/symmetric-default.properties | 14 ++ .../src/main/resources/symmetric-schema.xml | 3 + .../impl/AbstractClusterServiceTest.java | 164 ++++++++++++++++++ .../service/impl/AbstractServiceTest.java | 5 + 11 files changed, 401 insertions(+), 37 deletions(-) create mode 100644 symmetric-client/src/test/java/org/jumpmind/symmetric/service/impl/JdbcClusterServiceTest.java create mode 100644 symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractClusterServiceTest.java diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/service/impl/JdbcClusterServiceTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/service/impl/JdbcClusterServiceTest.java new file mode 100644 index 0000000000..4221c94133 --- /dev/null +++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/service/impl/JdbcClusterServiceTest.java @@ -0,0 +1,25 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.service.impl; + +public class JdbcClusterServiceTest extends AbstractClusterServiceTest { + +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 0e6cea4f8a..ca74bdf273 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -198,6 +198,8 @@ private ParameterConstants() { public final static String CLUSTER_SERVER_ID = "cluster.server.id"; public final static String CLUSTER_LOCKING_ENABLED = "cluster.lock.enabled"; public final static String CLUSTER_LOCK_TIMEOUT_MS = "cluster.lock.timeout.ms"; + public final static String LOCK_TIMEOUT_MS = "lock.timeout.ms"; + public final static String LOCK_WAIT_RETRY_MILLIS = "lock.wait.retry.ms"; public final static String PURGE_RETENTION_MINUTES = "purge.retention.minutes"; public final static String PURGE_EXTRACT_REQUESTS_RETENTION_MINUTES = "purge.extract.request.retention.minutes"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Lock.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Lock.java index c3c0a25fb7..765c8874b6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Lock.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Lock.java @@ -30,8 +30,11 @@ public class Lock implements Serializable { private static final long serialVersionUID = 1L; private String lockAction; + private String lockType; private String lockingServerId; private Date lockTime; + private int sharedCount; + private boolean sharedEnable; private Date lastLockTime; private String lastLockingServerId; @@ -83,4 +86,28 @@ public void setLastLockingServerId(String lastLockingServerId) { this.lastLockingServerId = lastLockingServerId; } + public String getLockType() { + return lockType; + } + + public void setLockType(String lockType) { + this.lockType = lockType; + } + + public int getSharedCount() { + return sharedCount; + } + + public void setSharedCount(int sharedCount) { + this.sharedCount = sharedCount; + } + + public boolean isSharedEnable() { + return sharedEnable; + } + + public void setSharedEnable(boolean sharedEnable) { + this.sharedEnable = sharedEnable; + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java index 04ef6a6512..f583a076f2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java @@ -42,5 +42,9 @@ public class ClusterConstants { public static final String FILE_SYNC_TRACKER = "FILE_SYNC_TRACKER"; public static final String FILE_SYNC_PULL = "FILE_SYNC_PULL"; public static final String FILE_SYNC_PUSH = "FILE_SYNC_PUSH"; - + + public static final String TYPE_CLUSTER = "CLUSTER"; + public static final String TYPE_EXCLUSIVE = "EXCLUSIVE"; + public static final String TYPE_SHARED = "SHARED"; + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java index 03bfe3f79b..1fa9121401 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java @@ -21,7 +21,7 @@ package org.jumpmind.symmetric.service; import java.util.Map; - + import org.jumpmind.symmetric.model.Lock; @@ -37,8 +37,14 @@ public interface IClusterService { public boolean lock(String action); + public boolean lock(String action, String lockType); + + public boolean lock(String action, String lockType, long waitMillis); + public void unlock(String action); + public void unlock(String action, String lockType); + public void clearAllLocks(); public String getServerId(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java index 0a4e594b49..9df6917097 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java @@ -20,7 +20,11 @@ */ package org.jumpmind.symmetric.service.impl; +import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PULL; +import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PUSH; +import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_TRACKER; import static org.jumpmind.symmetric.service.ClusterConstants.HEARTBEAT; +import static org.jumpmind.symmetric.service.ClusterConstants.INITIAL_LOAD_EXTRACT; import static org.jumpmind.symmetric.service.ClusterConstants.PULL; import static org.jumpmind.symmetric.service.ClusterConstants.PURGE_DATA_GAPS; import static org.jumpmind.symmetric.service.ClusterConstants.PURGE_INCOMING; @@ -31,11 +35,10 @@ import static org.jumpmind.symmetric.service.ClusterConstants.STAGE_MANAGEMENT; import static org.jumpmind.symmetric.service.ClusterConstants.STATISTICS; import static org.jumpmind.symmetric.service.ClusterConstants.SYNCTRIGGERS; +import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_CLUSTER; +import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_EXCLUSIVE; +import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_SHARED; import static org.jumpmind.symmetric.service.ClusterConstants.WATCHDOG; -import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PULL; -import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PUSH; -import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_TRACKER; -import static org.jumpmind.symmetric.service.ClusterConstants.INITIAL_LOAD_EXTRACT; import java.util.Calendar; import java.util.Date; @@ -70,6 +73,7 @@ public ClusterService(IParameterService parameterService, ISymmetricDialect dial } public void init() { + sqlTemplate.update(getSql("initLockSql"), new Object[] { getServerId() }); initLockTable(ROUTE); initLockTable(PULL); initLockTable(PUSH); @@ -85,45 +89,104 @@ public void init() { initLockTable(FILE_SYNC_PULL); initLockTable(FILE_SYNC_PUSH); initLockTable(FILE_SYNC_TRACKER); + initLockTable(FILE_SYNC_TRACKER, TYPE_SHARED); initLockTable(INITIAL_LOAD_EXTRACT); } public void initLockTable(final String action) { + initLockTable(action, TYPE_CLUSTER); + } + + public void initLockTable(final String action, final String lockType) { try { - sqlTemplate.update(getSql("insertLockSql"), new Object[] { action }); - log.debug("Inserted into the NODE_LOCK table for {}", action); + sqlTemplate.update(getSql("insertLockSql"), new Object[] { action, lockType }); + log.debug("Inserted into the LOCK table for {}, {}", action, lockType); } catch (UniqueKeyException ex) { - log.debug( - "Failed to insert to the NODE_LOCK table for {}. Must be initialized already.", - action); + log.debug("Failed to insert to the LOCK table for {}, {}. Must be initialized already.", + action, lockType); } } public void clearAllLocks() { - sqlTemplate.update(getSql("clearAllLocksSql")); + sqlTemplate.update(getSql("initLockSql"), new Object[] { getServerId() }); + } + + public boolean lock(final String action, final String lockType) { + if (lockType.equals(TYPE_CLUSTER)) { + return lock(action); + } else if (lockType.equals(TYPE_SHARED)) { + return lockShared(action); + } else if (lockType.equals(TYPE_EXCLUSIVE)) { + return lockExclusive(action); + } else { + throw new UnsupportedOperationException("Lock type of " + lockType + " is not supported"); + } + } + + public boolean lock(final String action, final String lockType, long waitMillis) { + if (lockType.equals(TYPE_SHARED) || lockType.equals(TYPE_EXCLUSIVE)) { + return lockWait(action, lockType, waitMillis); + } else { + throw new UnsupportedOperationException("Lock type of " + lockType + " is not supported"); + } } public boolean lock(final String action) { if (isClusteringEnabled()) { final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND, (int) -parameterService.getLong(ParameterConstants.CLUSTER_LOCK_TIMEOUT_MS)); - return lock(action, timeout, new Date(), getServerId()); + return lockCluster(action, timeout, new Date(), getServerId()); } else { return true; } } - protected boolean lock(String action, Date timeToBreakLock, Date timeLockAquired, + protected boolean lockCluster(String action, Date timeToBreakLock, Date timeLockAcquired, String serverId) { try { - return sqlTemplate.update(getSql("aquireLockSql"), new Object[] { serverId, - timeLockAquired, action, timeToBreakLock, serverId }) == 1; + return sqlTemplate.update(getSql("acquireClusterLockSql"), new Object[] { serverId, + timeLockAcquired, action, TYPE_CLUSTER, timeToBreakLock, serverId }) == 1; } catch (ConcurrencySqlException ex) { log.debug("Ignoring concurrency error and reporting that we failed to get the cluster lock: {}", ex.getMessage()); return false; } } + protected boolean lockShared(final String action) { + final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND, + (int) -parameterService.getLong(ParameterConstants.LOCK_TIMEOUT_MS)); + return sqlTemplate.update(getSql("acquireSharedLockSql"), new Object[] { + TYPE_SHARED, getServerId(), new Date(), action, TYPE_SHARED, timeout }) == 1; + } + + protected boolean lockExclusive(final String action) { + final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND, + (int) -parameterService.getLong(ParameterConstants.LOCK_TIMEOUT_MS)); + return sqlTemplate.update(getSql("acquireExclusiveLockSql"), new Object[] { + TYPE_EXCLUSIVE, getServerId(), new Date(), action, TYPE_SHARED, timeout }) == 1; + } + + protected boolean lockWait(final String action, final String lockType, long waitMillis) { + boolean isLocked = false; + long endTime = System.currentTimeMillis() + waitMillis; + long sleepMillis = parameterService.getLong(ParameterConstants.LOCK_WAIT_RETRY_MILLIS); + do { + if (lockType.equals(TYPE_SHARED)) { + isLocked = lockShared(action); + } else if (lockType.equals(TYPE_EXCLUSIVE)) { + isLocked = lockExclusive(action); + if (!isLocked) { + sqlTemplate.update(getSql("disableSharedLockSql"), new Object[] { action, TYPE_SHARED }); + } + } + if (isLocked) { + break; + } + AppUtils.sleep(sleepMillis); + } while (waitMillis == 0 || System.currentTimeMillis() < endTime); + return isLocked; + } + public Map findLocks() { final Map locks = new HashMap(); if (isClusteringEnabled()) { @@ -131,8 +194,11 @@ public Map findLocks() { public Lock mapRow(Row rs) { Lock lock = new Lock(); lock.setLockAction(rs.getString("lock_action")); + lock.setLockType(rs.getString("lock_type")); lock.setLockingServerId(rs.getString("locking_server_id")); lock.setLockTime(rs.getDateTime("lock_time")); + lock.setSharedCount(rs.getInt("shared_count")); + lock.setSharedEnable(rs.getBoolean("shared_enable")); lock.setLastLockingServerId(rs.getString("last_locking_server_id")); lock.setLastLockTime(rs.getDateTime("last_lock_time")); locks.put(lock.getLockAction(), lock); @@ -179,18 +245,40 @@ public String getServerId() { return serverId; } + public void unlock(final String action, final String lockType) { + if (lockType.equals(TYPE_CLUSTER)) { + unlock(action); + } else if (lockType.equals(TYPE_SHARED)) { + unlockShared(action); + } else if (lockType.equals(TYPE_EXCLUSIVE)) { + unlockExclusive(action); + } else { + throw new UnsupportedOperationException("Lock type of " + lockType + " is not supported"); + } + } + public void unlock(final String action) { if (isClusteringEnabled()) { - if (!unlock(action, getServerId())) { + if (!unlockCluster(action, getServerId())) { log.warn("Failed to release lock for action:{} server:{}", action, getServerId()); } } } - protected boolean unlock(String action, String serverId) { + protected boolean unlockCluster(String action, String serverId) { String lastLockingServerId = serverId.equals(Lock.STOPPED) ? null : serverId; - return sqlTemplate.update(getSql("releaseLockSql"), new Object[] { new Date(), lastLockingServerId, action, - serverId }) > 0; + return sqlTemplate.update(getSql("releaseClusterLockSql"), new Object[] { new Date(), lastLockingServerId, action, + TYPE_CLUSTER, serverId }) > 0; + } + + protected boolean unlockShared(final String action) { + return sqlTemplate.update(getSql("releaseSharedLockSql"), new Object[] { + new Date(), getServerId(), action, TYPE_SHARED }) == 1; + } + + protected boolean unlockExclusive(final String action) { + return sqlTemplate.update(getSql("releaseExclusiveLockSql"), new Object[] { + new Date(), getServerId(), action, TYPE_EXCLUSIVE }) == 1; } public boolean isClusteringEnabled() { @@ -213,7 +301,7 @@ public void aquireInfiniteLock(String action) { int tries = 600; Date futureTime = DateUtils.add(new Date(), Calendar.YEAR, 100); while (tries > 0) { - if (!lock(action, new Date(), futureTime, Lock.STOPPED)) { + if (!lockCluster(action, new Date(), futureTime, Lock.STOPPED)) { AppUtils.sleep(50); tries--; } else { @@ -227,7 +315,7 @@ public void clearInfiniteLock(String action) { Map all = findLocks(); Lock lock = all.get(action); if (lock != null && Lock.STOPPED.equals(lock.getLockingServerId())) { - unlock(action, Lock.STOPPED); + unlockCluster(action, Lock.STOPPED); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterServiceSqlMap.java index 6c47489372..0af61b105a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterServiceSqlMap.java @@ -29,22 +29,48 @@ public class ClusterServiceSqlMap extends AbstractSqlMap { public ClusterServiceSqlMap(IDatabasePlatform platform, Map replacementTokens) { super(platform, replacementTokens); - putSql("aquireLockSql", - "" - + "update $(lock) set locking_server_id=?, lock_time=? where " - + " lock_action=? and (lock_time is null or lock_time < ? or locking_server_id=?) "); - - putSql("releaseLockSql", - "" - + "update $(lock) set locking_server_id=null, lock_time=null, last_lock_time=?, last_locking_server_id=? " - + " where lock_action=? and locking_server_id=? "); - - putSql("insertLockSql", "" + "insert into $(lock) (lock_action) values(?) "); - + putSql("acquireClusterLockSql", + "update $(lock) set locking_server_id=?, lock_time=? " + + "where lock_action=? and lock_type=? and (lock_time is null or lock_time < ? or locking_server_id=?)"); + + putSql("acquireSharedLockSql", + "update $(lock) set lock_type=?, locking_server_id=?, lock_time=?, shared_count=shared_count+1, " + + "shared_enable=(case when shared_count = 0 then 1 else shared_enable end) " + + "where lock_action=? and (lock_type=? or lock_time is null or lock_time < ?) " + + "and (shared_enable = 1 or shared_count = 0)"); + + putSql("disableSharedLockSql", + "update $(lock) set shared_enable=0 where lock_action=? and lock_type=?"); + + putSql("acquireExclusiveLockSql", + "update $(lock) set lock_type=?, locking_server_id=?, lock_time=? " + + "where lock_action=? and ((lock_type=? and shared_count = 0) or lock_time is null or lock_time < ?)"); + + putSql("releaseClusterLockSql", + "update $(lock) set locking_server_id=null, lock_time=null, last_lock_time=?, last_locking_server_id=? " + + "where lock_action=? and lock_type=? and locking_server_id=?"); + + putSql("releaseSharedLockSql", + "update $(lock) set last_lock_time=?, last_locking_server_id=?, shared_count=shared_count-1, " + + "shared_enable=(case when shared_count = 1 then 0 else shared_enable end), " + + "locking_server_id = (case when shared_count = 1 then null else locking_server_id end), " + + "lock_time = (case when shared_count = 1 then null else lock_time end) " + + "where lock_action=? and lock_type=?"); + + putSql("releaseExclusiveLockSql", + "update $(lock) set locking_server_id=null, lock_time=null, last_lock_time=?, last_locking_server_id=? " + + "where lock_action=? and lock_type=?"); + + putSql("initLockSql", + "update $(lock) set locking_server_id=null, lock_time=null, shared_count=0, shared_enable=0 " + + "where locking_server_id=?"); + + putSql("insertLockSql", "insert into $(lock) (lock_action, lock_type) values(?,?)"); + putSql("findLocksSql", - "" - + "select lock_action, locking_server_id, lock_time, last_locking_server_id, last_lock_time " - + " from $(lock) "); + "select lock_action, lock_type, locking_server_id, lock_time, shared_count, shared_enable, " + + "last_locking_server_id, last_lock_time " + + "from $(lock)"); } diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index dee1ea717f..1639163441 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -526,6 +526,20 @@ cluster.lock.timeout.ms=1800000 # Type: boolean cluster.lock.enabled=false +# The amount of time a thread can hold a shared or exclusive lock before another thread can break the lock. +# The timeout is a safeguard in case an unexpected exception causes a lock to be abandoned. +# Restarting the service will clear all locks. +# +# DatabaseOverridable: true +# Tags: jobs +lock.timeout.ms=1800000 + +# While waiting for a lock to be released, how often should we check the lock status +# in the sym_lock table in the database. +# +# DatabaseOverridable: true +# Tags: jobs +lock.wait.retry.ms=10000 # If jobs need to be synchronized so that only one job can run at a time, set this parameter to true # diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index 05e6f8d503..c64688a7e3 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -282,8 +282,11 @@ + + +
diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractClusterServiceTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractClusterServiceTest.java new file mode 100644 index 0000000000..9915ac3743 --- /dev/null +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractClusterServiceTest.java @@ -0,0 +1,164 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.service.impl; + +import junit.framework.Assert; + +import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.model.Lock; +import org.jumpmind.symmetric.service.ClusterConstants; +import org.junit.Before; +import org.junit.Test; + +public class AbstractClusterServiceTest extends AbstractServiceTest { + + @Before + public void setupForTest() { + getClusterService().init(); + getParameterService().saveParameter(ParameterConstants.LOCK_WAIT_RETRY_MILLIS, "1", "test"); + } + + @Test + public void testLockCluster() { + lock(ClusterConstants.PULL, ClusterConstants.TYPE_CLUSTER, 0); + // Should allow multiple cluster locks when on same server + lock(ClusterConstants.PULL, ClusterConstants.TYPE_CLUSTER, 0); + unlock(ClusterConstants.PULL, ClusterConstants.TYPE_CLUSTER, 0); + } + + @Test + public void testLockShare() { + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 1); + + // Should allow multiple shared locks and increase shared count + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 2); + + // Should prevent an exclusive lock + Assert.assertFalse(getClusterService().lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE)); + + // Releasing shared lock should decrease shared count + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 1); + + // Releasing final shared lock + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 0); + } + + @Test + public void testLockShareAfterExclusive() { + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 1); + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 0); + } + + @Test + public void testLockShareAbandoned() { + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 1); + getClusterService().init(); + checkUnlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 0, false); + } + + @Test + public void testLockExclusive() { + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + + // Should prevent a second exclusive lock + Assert.assertFalse(getClusterService().lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE)); + + // Should prevent a shared lock + Assert.assertFalse(getClusterService().lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED)); + + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + getClusterService().unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE); + } + + @Test + public void testLockExclusiveAfterShare() { + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 1); + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 0); + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + } + + @Test + public void testLockExclusiveAbandoned() { + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + getClusterService().init(); + checkUnlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0, false); + } + + @Test + public void testLockExclusiveWait() { + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 1); + Assert.assertFalse(getClusterService().lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 1)); + checkLock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 1, false); + Assert.assertFalse(getClusterService().lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED)); + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 0); + Assert.assertTrue(getClusterService().lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 1)); + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + } + + @Test + public void testLockSharedWait() { + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + Assert.assertFalse(getClusterService().lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 1)); + checkLock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0, false); + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_EXCLUSIVE, 0); + lock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 1); + unlock(ClusterConstants.FILE_SYNC_TRACKER, ClusterConstants.TYPE_SHARED, 0); + } + + private void lock(String action, String lockType, int expectedSharedCount) { + Assert.assertTrue("Expected to obtain lock", getClusterService().lock(action, lockType)); + checkLock(action, lockType, expectedSharedCount, expectedSharedCount > 0); + } + + private void unlock(String action, String lockType, int expectedSharedCount) { + getClusterService().unlock(action, lockType); + checkUnlock(action, lockType, expectedSharedCount, expectedSharedCount > 0); + } + + private Lock checkLock(String action, String lockType, int expectedSharedCount, boolean expectedSharedEnable) { + Lock lock = getClusterService().findLocks().get(action); + Assert.assertEquals(lockType, lock.getLockType()); + Assert.assertNotNull(lock.getLockingServerId()); + Assert.assertNotNull(lock.getLockTime()); + Assert.assertEquals(expectedSharedCount, lock.getSharedCount()); + if (expectedSharedCount > 0) { + Assert.assertEquals(expectedSharedEnable, lock.isSharedEnable()); + } + return lock; + } + + private void checkUnlock(String action, String lockType, int expectedSharedCount, boolean expectedSharedEnable) { + Lock lock = getClusterService().findLocks().get(action); + Assert.assertEquals(lockType, lock.getLockType()); + Assert.assertNotNull(lock.getLastLockingServerId()); + Assert.assertNotNull(lock.getLastLockTime()); + if (lockType != ClusterConstants.TYPE_SHARED || lock.getSharedCount() == 0) { + Assert.assertNull(lock.getLockingServerId()); + Assert.assertNull(lock.getLockTime()); + Assert.assertFalse(lock.isSharedEnable()); + } + Assert.assertEquals(expectedSharedCount, lock.getSharedCount()); + } + +} diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java index 16c19d79cd..d6f37fbe43 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java @@ -31,6 +31,7 @@ import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.IDataService; @@ -146,6 +147,10 @@ protected IIncomingBatchService getIncomingBatchService() { return getSymmetricEngine().getIncomingBatchService(); } + protected IClusterService getClusterService() { + return getSymmetricEngine().getClusterService(); + } + protected ISqlTemplate getSqlTemplate() { return getSymmetricEngine().getSymmetricDialect().getPlatform().getSqlTemplate(); }