From 8f677abc8674530748c30cb82d2072cef3443bad Mon Sep 17 00:00:00 2001 From: chenson42 Date: Mon, 17 Dec 2007 13:33:52 +0000 Subject: [PATCH] Started adding cluster support to the different services. --- .../service/impl/BootstrapService.java | 25 +++++++---- .../symmetric/service/impl/PurgeService.java | 42 +++++++++++++------ .../src/main/resources/symmetric-services.xml | 1 + .../jumpmind/symmetric/IntegrationTest.java | 5 ++- .../symmetric/MultiDatabaseTestFactory.java | 10 ++++- .../jumpmind/symmetric/db/DbTriggerTest.java | 1 + 6 files changed, 58 insertions(+), 26 deletions(-) diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java index bef5701788..7f4d5f18e8 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java @@ -45,6 +45,7 @@ import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.service.IUpgradeService; +import org.jumpmind.symmetric.service.LockAction; import org.jumpmind.symmetric.transport.ITransportManager; import org.jumpmind.symmetric.util.RandomTimeSlot; import org.springframework.transaction.annotation.Transactional; @@ -60,7 +61,7 @@ public class BootstrapService extends AbstractService implements IBootstrapServi private IParameterService parameterService; private IConfigurationService configurationService; - + private IClusterService clusterService; private INodeService nodeService; @@ -80,7 +81,7 @@ public class BootstrapService extends AbstractService implements IBootstrapServi private boolean autoUpgrade = true; private String triggerPrefix; - + private boolean initialized = false; public void init() { @@ -89,7 +90,7 @@ public void init() { if (autoConfigureDatabase) { logger.info("Initializing symmetric database."); dbDialect.initConfigDb(tablePrefix); - populateDefautGlobalParametersIfNeeded(); + populateDefautGlobalParametersIfNeeded(); logger.info("Done initializing symmetric database."); } else { logger.info("Symmetric is not configured to auto create the database."); @@ -105,7 +106,7 @@ public void init() { } initialized = true; } - + // lets do this every time init is called. clusterService.initLockTable(); } @@ -116,11 +117,17 @@ public void init() { * should we auto-resync data? */ public void syncTriggers() { - logger.info("Synchronizing triggers."); - configurationService.initTriggerRowsForConfigChannel(); - removeInactiveTriggers(); - updateOrCreateTriggers(); - logger.info("Done synchronizing triggers."); + if (clusterService.lock(LockAction.SYNCTRIGGERS)) { + try { + logger.info("Synchronizing triggers."); + configurationService.initTriggerRowsForConfigChannel(); + removeInactiveTriggers(); + updateOrCreateTriggers(); + } finally { + clusterService.unlock(LockAction.SYNCTRIGGERS); + logger.info("Done synchronizing triggers."); + } + } } private void removeInactiveTriggers() { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index b90d7d9d31..028a43586d 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -31,7 +31,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jumpmind.symmetric.db.IDbDialect; +import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IPurgeService; +import org.jumpmind.symmetric.service.LockAction; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.support.JdbcUtils; @@ -67,23 +69,33 @@ public class PurgeService extends AbstractService implements IPurgeService { private TransactionTemplate transactionTemplate; + private IClusterService clusterService; + @SuppressWarnings("unchecked") public void purge() { - logger.info("The purge process is about to run."); - Calendar calendar = Calendar.getInstance(); - calendar.add(Calendar.MINUTE, -retentionInMinutes); - - purgeBatchesOlderThan(calendar); - purgeDataRows(); - - for (String sql : otherPurgeSql) { - int count = jdbcTemplate.update(sql, new Object[] { calendar.getTime() }); - if (count > 0) { - logger.info("Purged " + count + " rows after running: " + cleanSql(sql)); + if (clusterService.lock(LockAction.PURGE)) { + try { + logger.info("The purge process is about to run."); + Calendar calendar = Calendar.getInstance(); + calendar.add(Calendar.MINUTE, -retentionInMinutes); + + purgeBatchesOlderThan(calendar); + purgeDataRows(); + + for (String sql : otherPurgeSql) { + int count = jdbcTemplate.update(sql, new Object[] { calendar.getTime() }); + if (count > 0) { + logger.info("Purged " + count + " rows after running: " + cleanSql(sql)); + } + } + } finally { + clusterService.unlock(LockAction.PURGE); + logger.info("The purge process has completed."); } + + } else { + logger.info("Could not get a lock to run a purge."); } - - logger.info("The purge process has completed."); } private void purgeDataRows() { @@ -218,4 +230,8 @@ public void setSelectDataIdToPurgeSql(String selectDataIdToDeleteSql) { this.selectDataIdToPurgeSql = selectDataIdToDeleteSql; } + public void setClusterService(IClusterService clusterService) { + this.clusterService = clusterService; + } + } diff --git a/symmetric/src/main/resources/symmetric-services.xml b/symmetric/src/main/resources/symmetric-services.xml index 4b4443ffe1..5f6830f9a7 100644 --- a/symmetric/src/main/resources/symmetric-services.xml +++ b/symmetric/src/main/resources/symmetric-services.xml @@ -618,6 +618,7 @@ + diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java index 800e062248..ab816c2b60 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java @@ -25,6 +25,7 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; +@Test(sequential=true) public class IntegrationTest extends AbstractIntegrationTest implements ITest { private JdbcTemplate rootJdbcTemplate; @@ -62,7 +63,7 @@ public void init() { } - @Test(groups="continuous") + @Test(groups="continuous", timeOut=60000) public void testRegistration() { getRootEngine().openRegistration(TestConstants.TEST_CLIENT_NODE_GROUP, TestConstants.TEST_CLIENT_EXTERNAL_ID); getClientEngine().start(); @@ -183,7 +184,7 @@ public void testPurge() throws Exception { } - @Test(groups="continuous") + @Test(groups="continuous", dependsOnMethods="testRegistration") public void testHeartbeat() throws Exception { long ts = System.currentTimeMillis(); Thread.sleep(1000); diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/MultiDatabaseTestFactory.java b/symmetric/src/test/java/org/jumpmind/symmetric/MultiDatabaseTestFactory.java index cd45088a15..c4a2b1cb07 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/MultiDatabaseTestFactory.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/MultiDatabaseTestFactory.java @@ -36,6 +36,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jumpmind.symmetric.common.TestConstants; +import org.jumpmind.symmetric.db.DbTriggerTest; import org.jumpmind.symmetric.load.DataLoaderTest; import org.testng.annotations.Factory; @@ -116,9 +117,14 @@ protected void addAbstractDatabaseTests(final File rootFile, List