Skip to content

Commit

Permalink
Started adding cluster support to the different services.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Dec 17, 2007
1 parent edff081 commit 8f677ab
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 26 deletions.
Expand Up @@ -45,6 +45,7 @@
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IUpgradeService;
import org.jumpmind.symmetric.service.LockAction;
import org.jumpmind.symmetric.transport.ITransportManager;
import org.jumpmind.symmetric.util.RandomTimeSlot;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -60,7 +61,7 @@ public class BootstrapService extends AbstractService implements IBootstrapServi
private IParameterService parameterService;

private IConfigurationService configurationService;

private IClusterService clusterService;

private INodeService nodeService;
Expand All @@ -80,7 +81,7 @@ public class BootstrapService extends AbstractService implements IBootstrapServi
private boolean autoUpgrade = true;

private String triggerPrefix;

private boolean initialized = false;

public void init() {
Expand All @@ -89,7 +90,7 @@ public void init() {
if (autoConfigureDatabase) {
logger.info("Initializing symmetric database.");
dbDialect.initConfigDb(tablePrefix);
populateDefautGlobalParametersIfNeeded();
populateDefautGlobalParametersIfNeeded();
logger.info("Done initializing symmetric database.");
} else {
logger.info("Symmetric is not configured to auto create the database.");
Expand All @@ -105,7 +106,7 @@ public void init() {
}
initialized = true;
}

// lets do this every time init is called.
clusterService.initLockTable();
}
Expand All @@ -116,11 +117,17 @@ public void init() {
* should we auto-resync data?
*/
public void syncTriggers() {
logger.info("Synchronizing triggers.");
configurationService.initTriggerRowsForConfigChannel();
removeInactiveTriggers();
updateOrCreateTriggers();
logger.info("Done synchronizing triggers.");
if (clusterService.lock(LockAction.SYNCTRIGGERS)) {
try {
logger.info("Synchronizing triggers.");
configurationService.initTriggerRowsForConfigChannel();
removeInactiveTriggers();
updateOrCreateTriggers();
} finally {
clusterService.unlock(LockAction.SYNCTRIGGERS);
logger.info("Done synchronizing triggers.");
}
}
}

private void removeInactiveTriggers() {
Expand Down
Expand Up @@ -31,7 +31,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IPurgeService;
import org.jumpmind.symmetric.service.LockAction;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.support.JdbcUtils;
Expand Down Expand Up @@ -67,23 +69,33 @@ public class PurgeService extends AbstractService implements IPurgeService {

private TransactionTemplate transactionTemplate;

private IClusterService clusterService;

@SuppressWarnings("unchecked")
public void purge() {
logger.info("The purge process is about to run.");
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, -retentionInMinutes);

purgeBatchesOlderThan(calendar);
purgeDataRows();

for (String sql : otherPurgeSql) {
int count = jdbcTemplate.update(sql, new Object[] { calendar.getTime() });
if (count > 0) {
logger.info("Purged " + count + " rows after running: " + cleanSql(sql));
if (clusterService.lock(LockAction.PURGE)) {
try {
logger.info("The purge process is about to run.");
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, -retentionInMinutes);

purgeBatchesOlderThan(calendar);
purgeDataRows();

for (String sql : otherPurgeSql) {
int count = jdbcTemplate.update(sql, new Object[] { calendar.getTime() });
if (count > 0) {
logger.info("Purged " + count + " rows after running: " + cleanSql(sql));
}
}
} finally {
clusterService.unlock(LockAction.PURGE);
logger.info("The purge process has completed.");
}

} else {
logger.info("Could not get a lock to run a purge.");
}

logger.info("The purge process has completed.");
}

private void purgeDataRows() {
Expand Down Expand Up @@ -218,4 +230,8 @@ public void setSelectDataIdToPurgeSql(String selectDataIdToDeleteSql) {
this.selectDataIdToPurgeSql = selectDataIdToDeleteSql;
}

public void setClusterService(IClusterService clusterService) {
this.clusterService = clusterService;
}

}
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -618,6 +618,7 @@
<bean id="purgeService" class="org.jumpmind.symmetric.service.impl.PurgeService">
<property name="jdbcTemplate" ref="jdbcTemplate" />
<property name="runtimeConfiguration" ref="runtimeConfiguration" />
<property name="clusterService" ref="clusterService" />
<property name="dbDialect" ref="dbDialect" />
<property name="transactionTemplate" ref="transactionTemplate" />
<property name="retentionInMinutes" value="${symmetric.runtime.purge.retention.minutes}" />
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

@Test(sequential=true)
public class IntegrationTest extends AbstractIntegrationTest implements ITest {

private JdbcTemplate rootJdbcTemplate;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void init() {

}

@Test(groups="continuous")
@Test(groups="continuous", timeOut=60000)
public void testRegistration() {
getRootEngine().openRegistration(TestConstants.TEST_CLIENT_NODE_GROUP, TestConstants.TEST_CLIENT_EXTERNAL_ID);
getClientEngine().start();
Expand Down Expand Up @@ -183,7 +184,7 @@ public void testPurge() throws Exception {

}

@Test(groups="continuous")
@Test(groups="continuous", dependsOnMethods="testRegistration")
public void testHeartbeat() throws Exception {
long ts = System.currentTimeMillis();
Thread.sleep(1000);
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.TestConstants;
import org.jumpmind.symmetric.db.DbTriggerTest;
import org.jumpmind.symmetric.load.DataLoaderTest;
import org.testng.annotations.Factory;

Expand Down Expand Up @@ -116,9 +117,14 @@ protected void addAbstractDatabaseTests(final File rootFile, List<AbstractDataba
File getSymmetricFile() {
return rootFile;
}

});


tests2Run.add(new DbTriggerTest() {
@Override
File getSymmetricFile() {
return rootFile;
}
});
}

protected static File writeTempPropertiesFileFor(String databaseType, DatabaseRole databaseRole) {
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.testng.ITest;
import org.testng.annotations.Test;

@Test(sequential=true)
public class DbTriggerTest extends AbstractDatabaseTest implements ITest {

private static final String TEST_TRIGGERS_TABLE = "test_triggers_table";
Expand Down

0 comments on commit 8f677ab

Please sign in to comment.