diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 2923600564..cf659337f4 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -32,8 +32,15 @@ public class ParameterConstants { public final static String START_RUNTIME_REGISTRATION_URL = "symmetric.runtime.registration.url"; public final static String START_RUNTIME_MY_URL = "symmetric.runtime.my.url"; public final static String START_RUNTIME_ENGINE_NAME = "symmetric.runtime.engine.name"; + public final static String AUTO_REGISTER_ENABLED = "symmetric.auto.registration"; + public final static String AUTO_RELOAD_ENABLED = "symmetric.auto.reload"; + public final static String CONCURRENT_WORKERS = "symmetric.http.concurrent.workers.max"; + public final static String OUTGOING_BATCH_PEEK_AHEAD_WINDOW = "symmetric.runtime.outgoing.batches.peek.ahead.window.after.max.size"; + public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "symmetric.runtime.incoming.batches.skip.duplicates"; + public final static String DATA_LOADER_NUM_OF_ACK_RETRIES = "symmetric.runtime.num.of.ack.retries"; + public final static String DATA_LOADER_TIME_BETWEEN_ACK_RETRIES = "symmetric.runtime.time.between.ack.retries.ms"; public final static String DBPOOL_URL = "db.url"; public final static String DBPOOL_DRIVER ="db.driver"; diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IParameterService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IParameterService.java index 03a8d2d368..edec522c6f 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IParameterService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IParameterService.java @@ -32,6 +32,8 @@ public interface IParameterService { public static final String ALL = "ALL"; public BigDecimal getDecimal(String key); + + public boolean is(String key); public int getInt(String key); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java index 1a341928cc..b3e71f00a5 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java @@ -23,11 +23,14 @@ import java.util.Map; import org.jumpmind.symmetric.config.IRuntimeConfig; +import org.jumpmind.symmetric.service.IParameterService; import org.springframework.jdbc.core.JdbcTemplate; abstract class AbstractService { protected IRuntimeConfig runtimeConfiguration; + + protected IParameterService parameterService; protected JdbcTemplate jdbcTemplate; @@ -48,5 +51,9 @@ public void setSql(Map sql) { public String getSql(String key) { return sql.get(key); } + + public void setParameterService(IParameterService parameterService) { + this.parameterService = parameterService; + } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java index 88a5a183b4..1577d4e480 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java @@ -43,7 +43,6 @@ import org.jumpmind.symmetric.service.IDataLoaderService; import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.INodeService; -import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.service.IUpgradeService; import org.jumpmind.symmetric.service.LockAction; import org.jumpmind.symmetric.transport.ITransportManager; @@ -59,9 +58,6 @@ public class BootstrapService extends AbstractService implements IBootstrapServi private String tablePrefix; - @SuppressWarnings("unused") - private IParameterService parameterService; - private IConfigurationService configurationService; private IClusterService clusterService; @@ -342,10 +338,6 @@ private TriggerHistory rebuildTriggerIfNecessary(boolean forceRebuild, Trigger t return audit; } - public void setParameterService(IParameterService parameterService) { - this.parameterService = parameterService; - } - public void setConfigurationService(IConfigurationService configurationService) { this.configurationService = configurationService; } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index a8ec7aa63c..eff61b9be0 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ErrorConstants; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.IDbDialect; import org.jumpmind.symmetric.load.IColumnFilter; import org.jumpmind.symmetric.load.IDataLoader; @@ -85,10 +86,6 @@ public class DataLoaderService extends AbstractService implements IDataLoaderSer protected Map columnFilters = new HashMap(); - int numberOfStatusSendRetries = 5; - - long timeBetweenStatusSendRetriesMs = 5000; - /** * Connect to the remote node and pull data. The acknowledgment of commit/error status is sent separately * after the data is processed. @@ -118,6 +115,7 @@ public boolean loadData(Node remote, Node local) throws IOException { private void sendAck(Node remote, Node local, List list) throws IOException { Exception error = null; boolean sendAck = false; + int numberOfStatusSendRetries = parameterService.getInt(ParameterConstants.DATA_LOADER_NUM_OF_ACK_RETRIES); for (int i = 0; i < numberOfStatusSendRetries && !sendAck; i++) { try { sendAck = transportManager.sendAcknowledgement(remote, list, local); @@ -142,7 +140,7 @@ private void sendAck(Node remote, Node local, List list) t private final void sleepBetweenFailedAcks() { try { - Thread.sleep(timeBetweenStatusSendRetriesMs); + Thread.sleep(parameterService.getLong(ParameterConstants.DATA_LOADER_TIME_BETWEEN_ACK_RETRIES)); } catch (InterruptedException e) { } } @@ -349,14 +347,6 @@ public void addColumnFilter(String tableName, IColumnFilter filter) { this.columnFilters.put(tableName, filter); } - public void setNumberOfStatusSendRetries(int numberOfStatusSendRetries) { - this.numberOfStatusSendRetries = numberOfStatusSendRetries; - } - - public void setTimeBetweenStatusSendRetriesMs(long timeBetweenStatusSendRetriesMs) { - this.timeBetweenStatusSendRetriesMs = timeBetweenStatusSendRetriesMs; - } - public void setStatisticManager(IStatisticManager statisticManager) { this.statisticManager = statisticManager; } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java index 1c1a563fbc..1139ac6cec 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java @@ -29,6 +29,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.IDbDialect; import org.jumpmind.symmetric.model.IncomingBatch; import org.jumpmind.symmetric.model.IncomingBatchHistory; @@ -42,8 +43,6 @@ public class IncomingBatchService extends AbstractService implements IIncomingBa private static final Log logger = LogFactory.getLog(IncomingBatchService.class); - private boolean skipDuplicateBatches = true; - private IDbDialect dbDialect; public IncomingBatch findIncomingBatch(String batchId, String nodeId) { @@ -76,7 +75,7 @@ public boolean acquireIncomingBatch(final IncomingBatch status) { } catch (DataIntegrityViolationException e) { dbDialect.rollbackToSavepoint(savepoint); status.setRetry(true); - okayToProcess = updateIncomingBatch(status) > 0 || (!skipDuplicateBatches); + okayToProcess = updateIncomingBatch(status) > 0 || (!parameterService.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED)); if (okayToProcess) { logger.warn("Retrying batch " + status.getNodeBatchId()); } else { @@ -149,8 +148,4 @@ public void setDbDialect(IDbDialect dbDialect) { this.dbDialect = dbDialect; } - public void setSkipDuplicateBatches(boolean skipDuplicateBatches) { - this.skipDuplicateBatches = skipDuplicateBatches; - } - } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index ec32b93813..dad97ebd1a 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -34,6 +34,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.IDbDialect; import org.jumpmind.symmetric.model.BatchType; import org.jumpmind.symmetric.model.NodeChannel; @@ -57,8 +58,6 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa private INodeService nodeService; - private int batchSizePeekAhead = 100; - private IDbDialect dbDialect; /** @@ -78,6 +77,9 @@ public void buildOutgoingBatches(final String nodeId, final List ch @Transactional public void buildOutgoingBatches(final String nodeId, final NodeChannel channel) { + + final int batchSizePeekAhead = parameterService.getInt(ParameterConstants.OUTGOING_BATCH_PEEK_AHEAD_WINDOW); + jdbcTemplate.execute(new ConnectionCallback() { public Object doInConnection(Connection conn) throws SQLException, DataAccessException { @@ -324,10 +326,6 @@ public void setDbDialect(IDbDialect dbDialect) { this.dbDialect = dbDialect; } - public void setBatchSizePeekAhead(int batchSizePeekAhead) { - this.batchSizePeekAhead = batchSizePeekAhead; - } - public void setNodeService(INodeService nodeService) { this.nodeService = nodeService; } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ParameterService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ParameterService.java index 02654c9999..14ce6033d8 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ParameterService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ParameterService.java @@ -62,6 +62,19 @@ public BigDecimal getDecimal(String key) { return BigDecimal.ZERO; } + public boolean is(String key) { + String val = getString(key); + if (val != null) { + if (val.equals("1")) { + return true; + } else { + return Boolean.parseBoolean(val); + } + } else { + return false; + } + } + public int getInt(String key) { String val = getString(key); if (val != null) { @@ -81,10 +94,11 @@ public long getLong(String key) { public String getString(String key) { return getParameters().get(key); } - + public void saveParameter(String key, Object paramValue) { - this.saveParameter(runtimeConfiguration.getExternalId(), runtimeConfiguration.getNodeGroupId(), key, paramValue); - } + this.saveParameter(runtimeConfiguration.getExternalId(), runtimeConfiguration.getNodeGroupId(), key, + paramValue); + } public void saveParameter(String externalId, String nodeGroupId, String key, Object paramValue) { int count = jdbcTemplate.update(getSql("updateParameterSql"), new Object[] { paramValue, externalId, @@ -94,6 +108,8 @@ public void saveParameter(String externalId, String nodeGroupId, String key, Obj jdbcTemplate.update(getSql("insertParameterSql"), new Object[] { externalId, nodeGroupId, key, paramValue }); } + + rereadParameters(); } public void saveParameters(String externalId, String nodeGroupId, Map parameters) { @@ -152,7 +168,7 @@ private Map getParameters() { || (cacheTimeoutInMs > 0 && lastTimeParameterWereCached.getTime() < (System .currentTimeMillis() - cacheTimeoutInMs))) { lastTimeParameterWereCached = new Date(); - parameters = buildSystemParameters(); + parameters = buildSystemParameters(); } return parameters; } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index 9072f05a3e..7fe78c7c01 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.math.random.RandomDataImpl; import org.jumpmind.symmetric.common.Constants; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeSecurity; import org.jumpmind.symmetric.model.OutgoingBatch; @@ -63,10 +64,6 @@ public class RegistrationService extends AbstractService implements private IDataService dataService; - private boolean autoRegistration; - - private boolean autoReload; - /** * Register a node for the given domain name and domain ID if the * registration is open. @@ -80,7 +77,7 @@ public boolean registerNode(Node node, OutputStream out) throws IOException { } } String nodeId = findNodeToRegister(node.getNodeGroupId(), node.getExternalId()); - if (nodeId == null && autoRegistration) { + if (nodeId == null && parameterService.is(ParameterConstants.AUTO_REGISTER_ENABLED)) { openRegistration(node.getNodeGroupId(), node.getExternalId()); nodeId = findNodeToRegister(node.getNodeGroupId(), node.getExternalId()); } @@ -95,7 +92,7 @@ public boolean registerNode(Node node, OutputStream out) throws IOException { node.getSymmetricVersion(), node.getNodeId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }); boolean success = writeConfiguration(node, out); - if (success && autoReload) { + if (success && parameterService.is(ParameterConstants.AUTO_RELOAD_ENABLED)) { // only send automatic initial load once NodeSecurity security = nodeService.findNodeSecurity(node.getNodeId()); if (security != null && security.getInitialLoadTime() == null) { @@ -220,20 +217,12 @@ public void setClusterService(IClusterService clusterService) { this.clusterService = clusterService; } - public void setAutoRegistration(boolean autoRegistration) { - this.autoRegistration = autoRegistration; - } - - public void setAutoReload(boolean autoReload) { - this.autoReload = autoReload; - } - public void setDataService(IDataService dataService) { this.dataService = dataService; } public boolean isAutoRegistration() { - return autoRegistration; + return parameterService.is(ParameterConstants.AUTO_REGISTER_ENABLED); } } diff --git a/symmetric/src/main/resources/symmetric-default.properties b/symmetric/src/main/resources/symmetric-default.properties index 3f8150c2c8..f9ade37c44 100644 --- a/symmetric/src/main/resources/symmetric-default.properties +++ b/symmetric/src/main/resources/symmetric-default.properties @@ -45,9 +45,13 @@ symmetric.auto.config.database=true symmetric.auto.upgrade=true # If this is true, registration is opened automatically for nodes requesting it +# +# This property is override-able in the database. symmetric.auto.registration=false # If this is true, a reload is automatically sent to nodes when they register +# +# This property is override-able in the database. symmetric.auto.reload=false # This is the download rate for the HTTP symmetric transport. -1 means full throttle. @@ -55,6 +59,7 @@ symmetric.http.download.rate.kb=-1 # This is the number of HTTP concurrent push/pull requests symmetric will accept. This is controlled # by the NodeConcurrencyFilter. The number is per servlet the filter is applied to. +# # This property is override-able in the database. symmetric.http.concurrent.workers.max=20 @@ -70,21 +75,29 @@ symmetric.runtime.https.verified.server.names= # This is the maximum number of events that will be peeked at to look for additional transaction rows after # the max batch size is reached. The more concurrency in your db and the longer the transaction takes the -# bigger this value might have to be. +# bigger this value might have to be. +# +# This property is override-able in the database. symmetric.runtime.outgoing.batches.peek.ahead.window.after.max.size=100 # This instructs symmetric to attempt to skip duplicate batches that are received. Symmetric might # be more efficient when recovering from error conditions if this is set to true, but you run the # risk of missing data if the batch ids get reset (on one node, but not another) somehow (which is unlikely in production, but # fairly likely in lab or development setups). +# +# This property is override-able in the database. symmetric.runtime.incoming.batches.skip.duplicates=true # This is the number of times we will attempt to send an ACK back to the remote node # when pulling and loading data. +# +# This property is override-able in the database. symmetric.runtime.num.of.ack.retries=5 # This is the amount of time to wait between trying to send an ACK back to the remote node # when pulling and loading data. +# +# This property is override-able in the database. symmetric.runtime.time.between.ack.retries.ms=5000 diff --git a/symmetric/src/main/resources/symmetric-services.xml b/symmetric/src/main/resources/symmetric-services.xml index 5a8a843a23..1fc95385c2 100644 --- a/symmetric/src/main/resources/symmetric-services.xml +++ b/symmetric/src/main/resources/symmetric-services.xml @@ -20,6 +20,7 @@ + @@ -31,7 +32,7 @@ - + @@ -69,6 +70,7 @@ scope="singleton"> + @@ -90,8 +92,8 @@ scope="singleton"> - + @@ -101,6 +103,7 @@ + @@ -117,6 +120,7 @@ + @@ -127,6 +131,7 @@ + @@ -134,18 +139,17 @@ + - - + @@ -158,17 +162,17 @@ - + + + - - + @@ -301,6 +305,7 @@ + @@ -308,6 +313,7 @@ + @@ -324,6 +330,7 @@ + diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/ParameterServiceTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/ParameterServiceTest.java index 1ac31f9b3a..b9b9d4b7e8 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/ParameterServiceTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/ParameterServiceTest.java @@ -19,26 +19,25 @@ public void testParameterGetFromDefaults() { public void testParameterGetFromDatabase() { Assert.assertEquals(getParameterService().getInt(ParameterConstants.CONCURRENT_WORKERS), 2); getParameterService().saveParameter(TestConstants.TEST_CLIENT_EXTERNAL_ID, TestConstants.TEST_CLIENT_NODE_GROUP, ParameterConstants.CONCURRENT_WORKERS, 10); - getParameterService().rereadParameters(); // make sure we are not picking up someone else's parameter Assert.assertEquals(getParameterService().getInt(ParameterConstants.CONCURRENT_WORKERS), 2); getParameterService().saveParameter(IParameterService.ALL, TestConstants.TEST_ROOT_NODE_GROUP, ParameterConstants.CONCURRENT_WORKERS, 5); - // make sure the parameters are cached - Assert.assertEquals(getParameterService().getInt(ParameterConstants.CONCURRENT_WORKERS), 2); - - // make sure we pick up the new parameter for us - getParameterService().rereadParameters(); Assert.assertEquals(getParameterService().getInt(ParameterConstants.CONCURRENT_WORKERS), 5); getParameterService().saveParameter(ParameterConstants.CONCURRENT_WORKERS, 10); - // make sure we pick up the new parameter for us - getParameterService().rereadParameters(); Assert.assertEquals(getParameterService().getInt(ParameterConstants.CONCURRENT_WORKERS), 10); } + + @Test(groups="continuous") + public void testBooleanParameter() { + Assert.assertEquals(getParameterService().is("boolean.test"), false); + getParameterService().saveParameter("boolean.test", true); + Assert.assertEquals(getParameterService().is("boolean.test"), true); + } } diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RegistrationServiceTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RegistrationServiceTest.java index 7eff3f40cb..f248a2ef81 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RegistrationServiceTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RegistrationServiceTest.java @@ -25,6 +25,7 @@ import org.jumpmind.symmetric.AbstractDatabaseTest; import org.jumpmind.symmetric.common.Constants; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.TestConstants; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeSecurity; @@ -102,11 +103,12 @@ public void testRegisterNode() throws Exception { @Test(groups = "continuous") public void testRegisterNodeAutomatic() throws Exception { try { - ((RegistrationService) registrationService).setAutoRegistration(true); + + getParameterService().saveParameter(ParameterConstants.AUTO_REGISTER_ENABLED, true); doTestRegisterNodeAutomatic(); } finally { - ((RegistrationService) registrationService).setAutoRegistration(false); + getParameterService().saveParameter(ParameterConstants.AUTO_REGISTER_ENABLED, false); } }