Skip to content

Commit

Permalink
0001609: File sync mutex limits scaling multiple clients
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Feb 28, 2014
1 parent f9b3c1c commit 54dc905
Show file tree
Hide file tree
Showing 11 changed files with 401 additions and 37 deletions.
@@ -0,0 +1,25 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.service.impl;

public class JdbcClusterServiceTest extends AbstractClusterServiceTest {

}
Expand Up @@ -198,6 +198,8 @@ private ParameterConstants() {
public final static String CLUSTER_SERVER_ID = "cluster.server.id";
public final static String CLUSTER_LOCKING_ENABLED = "cluster.lock.enabled";
public final static String CLUSTER_LOCK_TIMEOUT_MS = "cluster.lock.timeout.ms";
public final static String LOCK_TIMEOUT_MS = "lock.timeout.ms";
public final static String LOCK_WAIT_RETRY_MILLIS = "lock.wait.retry.ms";

public final static String PURGE_RETENTION_MINUTES = "purge.retention.minutes";
public final static String PURGE_EXTRACT_REQUESTS_RETENTION_MINUTES = "purge.extract.request.retention.minutes";
Expand Down
Expand Up @@ -30,8 +30,11 @@ public class Lock implements Serializable {
private static final long serialVersionUID = 1L;

private String lockAction;
private String lockType;
private String lockingServerId;
private Date lockTime;
private int sharedCount;
private boolean sharedEnable;
private Date lastLockTime;
private String lastLockingServerId;

Expand Down Expand Up @@ -83,4 +86,28 @@ public void setLastLockingServerId(String lastLockingServerId) {
this.lastLockingServerId = lastLockingServerId;
}

public String getLockType() {
return lockType;
}

public void setLockType(String lockType) {
this.lockType = lockType;
}

public int getSharedCount() {
return sharedCount;
}

public void setSharedCount(int sharedCount) {
this.sharedCount = sharedCount;
}

public boolean isSharedEnable() {
return sharedEnable;
}

public void setSharedEnable(boolean sharedEnable) {
this.sharedEnable = sharedEnable;
}

}
Expand Up @@ -42,5 +42,9 @@ public class ClusterConstants {
public static final String FILE_SYNC_TRACKER = "FILE_SYNC_TRACKER";
public static final String FILE_SYNC_PULL = "FILE_SYNC_PULL";
public static final String FILE_SYNC_PUSH = "FILE_SYNC_PUSH";


public static final String TYPE_CLUSTER = "CLUSTER";
public static final String TYPE_EXCLUSIVE = "EXCLUSIVE";
public static final String TYPE_SHARED = "SHARED";

}
Expand Up @@ -21,7 +21,7 @@
package org.jumpmind.symmetric.service;

import java.util.Map;


import org.jumpmind.symmetric.model.Lock;


Expand All @@ -37,8 +37,14 @@ public interface IClusterService {

public boolean lock(String action);

public boolean lock(String action, String lockType);

public boolean lock(String action, String lockType, long waitMillis);

public void unlock(String action);

public void unlock(String action, String lockType);

public void clearAllLocks();

public String getServerId();
Expand Down
Expand Up @@ -20,7 +20,11 @@
*/
package org.jumpmind.symmetric.service.impl;

import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PULL;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PUSH;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_TRACKER;
import static org.jumpmind.symmetric.service.ClusterConstants.HEARTBEAT;
import static org.jumpmind.symmetric.service.ClusterConstants.INITIAL_LOAD_EXTRACT;
import static org.jumpmind.symmetric.service.ClusterConstants.PULL;
import static org.jumpmind.symmetric.service.ClusterConstants.PURGE_DATA_GAPS;
import static org.jumpmind.symmetric.service.ClusterConstants.PURGE_INCOMING;
Expand All @@ -31,11 +35,10 @@
import static org.jumpmind.symmetric.service.ClusterConstants.STAGE_MANAGEMENT;
import static org.jumpmind.symmetric.service.ClusterConstants.STATISTICS;
import static org.jumpmind.symmetric.service.ClusterConstants.SYNCTRIGGERS;
import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_CLUSTER;
import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_EXCLUSIVE;
import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_SHARED;
import static org.jumpmind.symmetric.service.ClusterConstants.WATCHDOG;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PULL;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PUSH;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_TRACKER;
import static org.jumpmind.symmetric.service.ClusterConstants.INITIAL_LOAD_EXTRACT;

import java.util.Calendar;
import java.util.Date;
Expand Down Expand Up @@ -70,6 +73,7 @@ public ClusterService(IParameterService parameterService, ISymmetricDialect dial
}

public void init() {
sqlTemplate.update(getSql("initLockSql"), new Object[] { getServerId() });
initLockTable(ROUTE);
initLockTable(PULL);
initLockTable(PUSH);
Expand All @@ -85,54 +89,116 @@ public void init() {
initLockTable(FILE_SYNC_PULL);
initLockTable(FILE_SYNC_PUSH);
initLockTable(FILE_SYNC_TRACKER);
initLockTable(FILE_SYNC_TRACKER, TYPE_SHARED);
initLockTable(INITIAL_LOAD_EXTRACT);
}

public void initLockTable(final String action) {
initLockTable(action, TYPE_CLUSTER);
}

public void initLockTable(final String action, final String lockType) {
try {
sqlTemplate.update(getSql("insertLockSql"), new Object[] { action });
log.debug("Inserted into the NODE_LOCK table for {}", action);
sqlTemplate.update(getSql("insertLockSql"), new Object[] { action, lockType });
log.debug("Inserted into the LOCK table for {}, {}", action, lockType);
} catch (UniqueKeyException ex) {
log.debug(
"Failed to insert to the NODE_LOCK table for {}. Must be initialized already.",
action);
log.debug("Failed to insert to the LOCK table for {}, {}. Must be initialized already.",
action, lockType);
}
}

public void clearAllLocks() {
sqlTemplate.update(getSql("clearAllLocksSql"));
sqlTemplate.update(getSql("initLockSql"), new Object[] { getServerId() });
}

public boolean lock(final String action, final String lockType) {
if (lockType.equals(TYPE_CLUSTER)) {
return lock(action);
} else if (lockType.equals(TYPE_SHARED)) {
return lockShared(action);
} else if (lockType.equals(TYPE_EXCLUSIVE)) {
return lockExclusive(action);
} else {
throw new UnsupportedOperationException("Lock type of " + lockType + " is not supported");
}
}

public boolean lock(final String action, final String lockType, long waitMillis) {
if (lockType.equals(TYPE_SHARED) || lockType.equals(TYPE_EXCLUSIVE)) {
return lockWait(action, lockType, waitMillis);
} else {
throw new UnsupportedOperationException("Lock type of " + lockType + " is not supported");
}
}

public boolean lock(final String action) {
if (isClusteringEnabled()) {
final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND,
(int) -parameterService.getLong(ParameterConstants.CLUSTER_LOCK_TIMEOUT_MS));
return lock(action, timeout, new Date(), getServerId());
return lockCluster(action, timeout, new Date(), getServerId());
} else {
return true;
}
}

protected boolean lock(String action, Date timeToBreakLock, Date timeLockAquired,
protected boolean lockCluster(String action, Date timeToBreakLock, Date timeLockAcquired,
String serverId) {
try {
return sqlTemplate.update(getSql("aquireLockSql"), new Object[] { serverId,
timeLockAquired, action, timeToBreakLock, serverId }) == 1;
return sqlTemplate.update(getSql("acquireClusterLockSql"), new Object[] { serverId,
timeLockAcquired, action, TYPE_CLUSTER, timeToBreakLock, serverId }) == 1;
} catch (ConcurrencySqlException ex) {
log.debug("Ignoring concurrency error and reporting that we failed to get the cluster lock: {}", ex.getMessage());
return false;
}
}

protected boolean lockShared(final String action) {
final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND,
(int) -parameterService.getLong(ParameterConstants.LOCK_TIMEOUT_MS));
return sqlTemplate.update(getSql("acquireSharedLockSql"), new Object[] {
TYPE_SHARED, getServerId(), new Date(), action, TYPE_SHARED, timeout }) == 1;
}

protected boolean lockExclusive(final String action) {
final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND,
(int) -parameterService.getLong(ParameterConstants.LOCK_TIMEOUT_MS));
return sqlTemplate.update(getSql("acquireExclusiveLockSql"), new Object[] {
TYPE_EXCLUSIVE, getServerId(), new Date(), action, TYPE_SHARED, timeout }) == 1;
}

protected boolean lockWait(final String action, final String lockType, long waitMillis) {
boolean isLocked = false;
long endTime = System.currentTimeMillis() + waitMillis;
long sleepMillis = parameterService.getLong(ParameterConstants.LOCK_WAIT_RETRY_MILLIS);
do {
if (lockType.equals(TYPE_SHARED)) {
isLocked = lockShared(action);
} else if (lockType.equals(TYPE_EXCLUSIVE)) {
isLocked = lockExclusive(action);
if (!isLocked) {
sqlTemplate.update(getSql("disableSharedLockSql"), new Object[] { action, TYPE_SHARED });
}
}
if (isLocked) {
break;
}
AppUtils.sleep(sleepMillis);
} while (waitMillis == 0 || System.currentTimeMillis() < endTime);
return isLocked;
}

public Map<String, Lock> findLocks() {
final Map<String, Lock> locks = new HashMap<String, Lock>();
if (isClusteringEnabled()) {
sqlTemplate.query(getSql("findLocksSql"), new ISqlRowMapper<Lock>() {
public Lock mapRow(Row rs) {
Lock lock = new Lock();
lock.setLockAction(rs.getString("lock_action"));
lock.setLockType(rs.getString("lock_type"));
lock.setLockingServerId(rs.getString("locking_server_id"));
lock.setLockTime(rs.getDateTime("lock_time"));
lock.setSharedCount(rs.getInt("shared_count"));
lock.setSharedEnable(rs.getBoolean("shared_enable"));
lock.setLastLockingServerId(rs.getString("last_locking_server_id"));
lock.setLastLockTime(rs.getDateTime("last_lock_time"));
locks.put(lock.getLockAction(), lock);
Expand Down Expand Up @@ -179,18 +245,40 @@ public String getServerId() {
return serverId;
}

public void unlock(final String action, final String lockType) {
if (lockType.equals(TYPE_CLUSTER)) {
unlock(action);
} else if (lockType.equals(TYPE_SHARED)) {
unlockShared(action);
} else if (lockType.equals(TYPE_EXCLUSIVE)) {
unlockExclusive(action);
} else {
throw new UnsupportedOperationException("Lock type of " + lockType + " is not supported");
}
}

public void unlock(final String action) {
if (isClusteringEnabled()) {
if (!unlock(action, getServerId())) {
if (!unlockCluster(action, getServerId())) {
log.warn("Failed to release lock for action:{} server:{}", action, getServerId());
}
}
}

protected boolean unlock(String action, String serverId) {
protected boolean unlockCluster(String action, String serverId) {
String lastLockingServerId = serverId.equals(Lock.STOPPED) ? null : serverId;
return sqlTemplate.update(getSql("releaseLockSql"), new Object[] { new Date(), lastLockingServerId, action,
serverId }) > 0;
return sqlTemplate.update(getSql("releaseClusterLockSql"), new Object[] { new Date(), lastLockingServerId, action,
TYPE_CLUSTER, serverId }) > 0;
}

protected boolean unlockShared(final String action) {
return sqlTemplate.update(getSql("releaseSharedLockSql"), new Object[] {
new Date(), getServerId(), action, TYPE_SHARED }) == 1;
}

protected boolean unlockExclusive(final String action) {
return sqlTemplate.update(getSql("releaseExclusiveLockSql"), new Object[] {
new Date(), getServerId(), action, TYPE_EXCLUSIVE }) == 1;
}

public boolean isClusteringEnabled() {
Expand All @@ -213,7 +301,7 @@ public void aquireInfiniteLock(String action) {
int tries = 600;
Date futureTime = DateUtils.add(new Date(), Calendar.YEAR, 100);
while (tries > 0) {
if (!lock(action, new Date(), futureTime, Lock.STOPPED)) {
if (!lockCluster(action, new Date(), futureTime, Lock.STOPPED)) {
AppUtils.sleep(50);
tries--;
} else {
Expand All @@ -227,7 +315,7 @@ public void clearInfiniteLock(String action) {
Map<String, Lock> all = findLocks();
Lock lock = all.get(action);
if (lock != null && Lock.STOPPED.equals(lock.getLockingServerId())) {
unlock(action, Lock.STOPPED);
unlockCluster(action, Lock.STOPPED);
}
}

Expand Down

0 comments on commit 54dc905

Please sign in to comment.