Skip to content

Commit

Permalink
Made the purge of incoming and outgoing batches exclusive. Also made …
Browse files Browse the repository at this point in the history
…sure if one fails, the other will still be able to run.
  • Loading branch information
chenson42 committed Feb 10, 2008
1 parent e570677 commit f948305
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 35 deletions.
4 changes: 2 additions & 2 deletions symmetric/src/changes/changes.xml
Expand Up @@ -50,8 +50,8 @@
we get natural randomness in the push, pull cycles.
</action>
<action dev="chenson42" type="fix">
Transaction ids can be interleaved by different processes. The batching process needs to take this into
account. Also, cap the max number of batches that can be send per channel in one sweep.
Transaction ids can be interleaved by different processes. The batching algorithm needs to take this into
account. Also, cap the max number of batches that can be sent per channel in one sweep.
</action>
</release>
<release version="1.2.0" date="2008-01-23" description="New dialects and bug fix release">
Expand Down
Expand Up @@ -27,5 +27,5 @@ public interface IPurgeService
public void purge();

@Transactional
public void purgeAllIncomingEventForNode(String nodeId);
public void purgeAllIncomingEventsForNode(String nodeId);
}
Expand Up @@ -20,5 +20,5 @@
package org.jumpmind.symmetric.service;

public enum LockAction {
PUSH, PULL, PURGE, HEARTBEAT, SYNCTRIGGERS, OTHER
PUSH, PULL, PURGE_OUTGOING, PURGE_INCOMING, HEARTBEAT, SYNCTRIGGERS, OTHER
}
Expand Up @@ -67,7 +67,8 @@ public class ClusterService extends AbstractService implements IClusterService {
public void initLockTable() {
initLockTableForNodes(nodeService.findNodesToPull());
initLockTableForNodes(nodeService.findNodesToPushTo());
initLockTable(LockAction.PURGE, COMMON_LOCK_ID);
initLockTable(LockAction.PURGE_INCOMING, COMMON_LOCK_ID);
initLockTable(LockAction.PURGE_OUTGOING, COMMON_LOCK_ID);
initLockTable(LockAction.SYNCTRIGGERS, COMMON_LOCK_ID);
}

Expand Down Expand Up @@ -143,7 +144,8 @@ private boolean isClusteringEnabled(final LockAction action) {
return lockDuringPull;
case PUSH:
return lockDuringPush;
case PURGE:
case PURGE_INCOMING:
case PURGE_OUTGOING:
return lockDuringPurge;
case HEARTBEAT:
return lockDuringHeartbeat;
Expand Down
Expand Up @@ -188,7 +188,7 @@ public void insertReloadEvent(Node targetNode) {
insertNodeSecurityUpdate(targetNode);

// remove all incoming events from the node are starting a reload for.
purgeService.purgeAllIncomingEventForNode(targetNode.getNodeId());
purgeService.purgeAllIncomingEventsForNode(targetNode.getNodeId());
}

private void insertNodeSecurityUpdate(Node node) {
Expand Down
Expand Up @@ -51,7 +51,7 @@ public class PurgeService extends AbstractService implements IPurgeService {

private IDbDialect dbDialect;

private String[] otherPurgeSql;
private String[] incomingPurgeSql;

private String[] deleteIncomingBatchesByNodeIdSql;

Expand All @@ -77,32 +77,62 @@ public class PurgeService extends AbstractService implements IPurgeService {

@SuppressWarnings("unchecked")
public void purge() {
if (clusterService.lock(LockAction.PURGE)) {
try {
logger.info("The purge process is about to run.");
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, -retentionInMinutes);

purgeBatchesOlderThan(calendar);
purgeDataRows();

for (String sql : otherPurgeSql) {
int count = jdbcTemplate.update(sql, new Object[] { calendar.getTime() });
if (count > 0) {
logger.info("Purged " + count + " rows after running: " + cleanSql(sql));
}
Calendar retentionCutoff = Calendar.getInstance();
retentionCutoff.add(Calendar.MINUTE, -retentionInMinutes);

purgeIncoming(retentionCutoff);
purgeOutgoing(retentionCutoff);

}

private void purgeOutgoing(Calendar retentionCutoff) {
try {
if (clusterService.lock(LockAction.PURGE_OUTGOING)) {
try {
logger.info("The outgoing purge process is about to run.");

purgeBatchesOlderThan(retentionCutoff);
purgeDataRows();

} finally {
clusterService.unlock(LockAction.PURGE_OUTGOING);
logger.info("The outgoing purge process has completed.");
}
} finally {
clusterService.unlock(LockAction.PURGE);
logger.info("The purge process has completed.");

} else {
logger.info("Could not get a lock to run an outgoing purge.");
}
} catch (Exception ex) {
logger.error(ex, ex);
}
}

private void purgeIncoming(Calendar retentionCutoff) {
try {
if (clusterService.lock(LockAction.PURGE_INCOMING)) {
try {
logger.info("The incoming purge process is about to run.");

for (String sql : incomingPurgeSql) {
int count = jdbcTemplate.update(sql, new Object[] { retentionCutoff.getTime() });
if (count > 0) {
logger.info("Purged " + count + " rows after running: " + cleanSql(sql));
}
}
} finally {
clusterService.unlock(LockAction.PURGE_INCOMING);
logger.info("The incoming purge process has completed.");
}

} else {
logger.info("Could not get a lock to run a purge.");
} else {
logger.info("Could not get a lock to run an incoming purge.");
}
} catch (Exception ex) {
logger.error(ex, ex);
}
}

public void purgeAllIncomingEventForNode(String nodeId) {
public void purgeAllIncomingEventsForNode(String nodeId) {
if (deleteIncomingBatchesByNodeIdSql != null)
for (String sql : deleteIncomingBatchesByNodeIdSql) {
int count = jdbcTemplate.update(sql, new Object[] { nodeId });
Expand Down Expand Up @@ -214,8 +244,8 @@ private String cleanSql(String sql) {
return StringUtils.replace(StringUtils.replace(StringUtils.replace(sql, "\r", " "), "\n", " "), " ", "");
}

public void setOtherPurgeSql(String[] purgeSql) {
this.otherPurgeSql = purgeSql;
public void setIncomingPurgeSql(String[] purgeSql) {
this.incomingPurgeSql = purgeSql;
}

public void setRetentionInMinutes(int retentionInMinutes) {
Expand Down
2 changes: 1 addition & 1 deletion symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -764,7 +764,7 @@
<value>delete from ${sync.table.prefix}_incoming_batch where node_id=?</value>
</list>
</property>
<property name="otherPurgeSql">
<property name="incomingPurgeSql">
<list>
<value>
delete from ${sync.table.prefix}_incoming_batch_hist where batch_id in (select
Expand Down
Expand Up @@ -32,10 +32,10 @@ public class ClusterServiceTest extends AbstractDatabaseTest {
@Test(groups = "continuous")
public void testLock() throws Exception {
final IClusterService service = (IClusterService) getBeanFactory().getBean(Constants.CLUSTER_SERVICE);
Assert.assertTrue(service.lock(LockAction.PURGE), "Could not lock for PURGE");
Assert.assertTrue(service.lock(LockAction.PURGE_INCOMING), "Could not lock for PURGE");
Assert.assertEquals(countActivePurgeLocks(), 1, "Could not find the lock in the database.");
Assert.assertFalse(service.lock(LockAction.PURGE), "Should not have been able to lock for PURGE");
service.unlock(LockAction.PURGE);
Assert.assertFalse(service.lock(LockAction.PURGE_INCOMING), "Should not have been able to lock for PURGE");
service.unlock(LockAction.PURGE_INCOMING);
Assert.assertEquals(countActivePurgeLocks(), 0, "Could not find the lock in the database.");
}

Expand Down Expand Up @@ -63,6 +63,6 @@ public void testOtherNodeLock() throws Exception {
private int countActivePurgeLocks() {
return getJdbcTemplate().queryForInt(
"select count(*) from sym_lock where lock_id=? and lock_action=? and lock_time is not null",
new Object[] { ClusterService.COMMON_LOCK_ID, LockAction.PURGE.name() });
new Object[] { ClusterService.COMMON_LOCK_ID, LockAction.PURGE_INCOMING.name() });
}
}

0 comments on commit f948305

Please sign in to comment.