Navigation Menu

Skip to content

Commit

Permalink
database enable parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 1, 2008
1 parent 295ec0d commit 9665990
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 73 deletions.
Expand Up @@ -32,8 +32,15 @@ public class ParameterConstants {
public final static String START_RUNTIME_REGISTRATION_URL = "symmetric.runtime.registration.url";
public final static String START_RUNTIME_MY_URL = "symmetric.runtime.my.url";
public final static String START_RUNTIME_ENGINE_NAME = "symmetric.runtime.engine.name";
public final static String AUTO_REGISTER_ENABLED = "symmetric.auto.registration";
public final static String AUTO_RELOAD_ENABLED = "symmetric.auto.reload";


public final static String CONCURRENT_WORKERS = "symmetric.http.concurrent.workers.max";
public final static String OUTGOING_BATCH_PEEK_AHEAD_WINDOW = "symmetric.runtime.outgoing.batches.peek.ahead.window.after.max.size";
public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "symmetric.runtime.incoming.batches.skip.duplicates";
public final static String DATA_LOADER_NUM_OF_ACK_RETRIES = "symmetric.runtime.num.of.ack.retries";
public final static String DATA_LOADER_TIME_BETWEEN_ACK_RETRIES = "symmetric.runtime.time.between.ack.retries.ms";

public final static String DBPOOL_URL = "db.url";
public final static String DBPOOL_DRIVER ="db.driver";
Expand Down
Expand Up @@ -32,6 +32,8 @@ public interface IParameterService {
public static final String ALL = "ALL";

public BigDecimal getDecimal(String key);

public boolean is(String key);

public int getInt(String key);

Expand Down
Expand Up @@ -23,11 +23,14 @@
import java.util.Map;

import org.jumpmind.symmetric.config.IRuntimeConfig;
import org.jumpmind.symmetric.service.IParameterService;
import org.springframework.jdbc.core.JdbcTemplate;

abstract class AbstractService {

protected IRuntimeConfig runtimeConfiguration;

protected IParameterService parameterService;

protected JdbcTemplate jdbcTemplate;

Expand All @@ -48,5 +51,9 @@ public void setSql(Map<String, String> sql) {
public String getSql(String key) {
return sql.get(key);
}

public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
}

}
Expand Up @@ -43,7 +43,6 @@
import org.jumpmind.symmetric.service.IDataLoaderService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IUpgradeService;
import org.jumpmind.symmetric.service.LockAction;
import org.jumpmind.symmetric.transport.ITransportManager;
Expand All @@ -59,9 +58,6 @@ public class BootstrapService extends AbstractService implements IBootstrapServi

private String tablePrefix;

@SuppressWarnings("unused")
private IParameterService parameterService;

private IConfigurationService configurationService;

private IClusterService clusterService;
Expand Down Expand Up @@ -342,10 +338,6 @@ private TriggerHistory rebuildTriggerIfNecessary(boolean forceRebuild, Trigger t
return audit;
}

public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
}

public void setConfigurationService(IConfigurationService configurationService) {
this.configurationService = configurationService;
}
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ErrorConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.load.IColumnFilter;
import org.jumpmind.symmetric.load.IDataLoader;
Expand Down Expand Up @@ -85,10 +86,6 @@ public class DataLoaderService extends AbstractService implements IDataLoaderSer

protected Map<String, IColumnFilter> columnFilters = new HashMap<String, IColumnFilter>();

int numberOfStatusSendRetries = 5;

long timeBetweenStatusSendRetriesMs = 5000;

/**
* Connect to the remote node and pull data. The acknowledgment of commit/error status is sent separately
* after the data is processed.
Expand Down Expand Up @@ -118,6 +115,7 @@ public boolean loadData(Node remote, Node local) throws IOException {
private void sendAck(Node remote, Node local, List<IncomingBatchHistory> list) throws IOException {
Exception error = null;
boolean sendAck = false;
int numberOfStatusSendRetries = parameterService.getInt(ParameterConstants.DATA_LOADER_NUM_OF_ACK_RETRIES);
for (int i = 0; i < numberOfStatusSendRetries && !sendAck; i++) {
try {
sendAck = transportManager.sendAcknowledgement(remote, list, local);
Expand All @@ -142,7 +140,7 @@ private void sendAck(Node remote, Node local, List<IncomingBatchHistory> list) t

private final void sleepBetweenFailedAcks() {
try {
Thread.sleep(timeBetweenStatusSendRetriesMs);
Thread.sleep(parameterService.getLong(ParameterConstants.DATA_LOADER_TIME_BETWEEN_ACK_RETRIES));
} catch (InterruptedException e) {
}
}
Expand Down Expand Up @@ -349,14 +347,6 @@ public void addColumnFilter(String tableName, IColumnFilter filter) {
this.columnFilters.put(tableName, filter);
}

public void setNumberOfStatusSendRetries(int numberOfStatusSendRetries) {
this.numberOfStatusSendRetries = numberOfStatusSendRetries;
}

public void setTimeBetweenStatusSendRetriesMs(long timeBetweenStatusSendRetriesMs) {
this.timeBetweenStatusSendRetriesMs = timeBetweenStatusSendRetriesMs;
}

public void setStatisticManager(IStatisticManager statisticManager) {
this.statisticManager = statisticManager;
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.IncomingBatchHistory;
Expand All @@ -42,8 +43,6 @@ public class IncomingBatchService extends AbstractService implements IIncomingBa

private static final Log logger = LogFactory.getLog(IncomingBatchService.class);

private boolean skipDuplicateBatches = true;

private IDbDialect dbDialect;

public IncomingBatch findIncomingBatch(String batchId, String nodeId) {
Expand Down Expand Up @@ -76,7 +75,7 @@ public boolean acquireIncomingBatch(final IncomingBatch status) {
} catch (DataIntegrityViolationException e) {
dbDialect.rollbackToSavepoint(savepoint);
status.setRetry(true);
okayToProcess = updateIncomingBatch(status) > 0 || (!skipDuplicateBatches);
okayToProcess = updateIncomingBatch(status) > 0 || (!parameterService.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED));
if (okayToProcess) {
logger.warn("Retrying batch " + status.getNodeBatchId());
} else {
Expand Down Expand Up @@ -149,8 +148,4 @@ public void setDbDialect(IDbDialect dbDialect) {
this.dbDialect = dbDialect;
}

public void setSkipDuplicateBatches(boolean skipDuplicateBatches) {
this.skipDuplicateBatches = skipDuplicateBatches;
}

}
Expand Up @@ -34,6 +34,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.model.BatchType;
import org.jumpmind.symmetric.model.NodeChannel;
Expand All @@ -57,8 +58,6 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa

private INodeService nodeService;

private int batchSizePeekAhead = 100;

private IDbDialect dbDialect;

/**
Expand All @@ -78,6 +77,9 @@ public void buildOutgoingBatches(final String nodeId, final List<NodeChannel> ch

@Transactional
public void buildOutgoingBatches(final String nodeId, final NodeChannel channel) {

final int batchSizePeekAhead = parameterService.getInt(ParameterConstants.OUTGOING_BATCH_PEEK_AHEAD_WINDOW);

jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {

Expand Down Expand Up @@ -324,10 +326,6 @@ public void setDbDialect(IDbDialect dbDialect) {
this.dbDialect = dbDialect;
}

public void setBatchSizePeekAhead(int batchSizePeekAhead) {
this.batchSizePeekAhead = batchSizePeekAhead;
}

public void setNodeService(INodeService nodeService) {
this.nodeService = nodeService;
}
Expand Down
Expand Up @@ -62,6 +62,19 @@ public BigDecimal getDecimal(String key) {
return BigDecimal.ZERO;
}

public boolean is(String key) {
String val = getString(key);
if (val != null) {
if (val.equals("1")) {
return true;
} else {
return Boolean.parseBoolean(val);
}
} else {
return false;
}
}

public int getInt(String key) {
String val = getString(key);
if (val != null) {
Expand All @@ -81,10 +94,11 @@ public long getLong(String key) {
public String getString(String key) {
return getParameters().get(key);
}

public void saveParameter(String key, Object paramValue) {
this.saveParameter(runtimeConfiguration.getExternalId(), runtimeConfiguration.getNodeGroupId(), key, paramValue);
}
this.saveParameter(runtimeConfiguration.getExternalId(), runtimeConfiguration.getNodeGroupId(), key,
paramValue);
}

public void saveParameter(String externalId, String nodeGroupId, String key, Object paramValue) {
int count = jdbcTemplate.update(getSql("updateParameterSql"), new Object[] { paramValue, externalId,
Expand All @@ -94,6 +108,8 @@ public void saveParameter(String externalId, String nodeGroupId, String key, Obj
jdbcTemplate.update(getSql("insertParameterSql"), new Object[] { externalId, nodeGroupId, key,
paramValue });
}

rereadParameters();
}

public void saveParameters(String externalId, String nodeGroupId, Map<String, Object> parameters) {
Expand Down Expand Up @@ -152,7 +168,7 @@ private Map<String, String> getParameters() {
|| (cacheTimeoutInMs > 0 && lastTimeParameterWereCached.getTime() < (System
.currentTimeMillis() - cacheTimeoutInMs))) {
lastTimeParameterWereCached = new Date();
parameters = buildSystemParameters();
parameters = buildSystemParameters();
}
return parameters;
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.random.RandomDataImpl;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.OutgoingBatch;
Expand Down Expand Up @@ -63,10 +64,6 @@ public class RegistrationService extends AbstractService implements

private IDataService dataService;

private boolean autoRegistration;

private boolean autoReload;

/**
* Register a node for the given domain name and domain ID if the
* registration is open.
Expand All @@ -80,7 +77,7 @@ public boolean registerNode(Node node, OutputStream out) throws IOException {
}
}
String nodeId = findNodeToRegister(node.getNodeGroupId(), node.getExternalId());
if (nodeId == null && autoRegistration) {
if (nodeId == null && parameterService.is(ParameterConstants.AUTO_REGISTER_ENABLED)) {
openRegistration(node.getNodeGroupId(), node.getExternalId());
nodeId = findNodeToRegister(node.getNodeGroupId(), node.getExternalId());
}
Expand All @@ -95,7 +92,7 @@ public boolean registerNode(Node node, OutputStream out) throws IOException {
node.getSymmetricVersion(), node.getNodeId() }, new int[] { Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
boolean success = writeConfiguration(node, out);
if (success && autoReload) {
if (success && parameterService.is(ParameterConstants.AUTO_RELOAD_ENABLED)) {
// only send automatic initial load once
NodeSecurity security = nodeService.findNodeSecurity(node.getNodeId());
if (security != null && security.getInitialLoadTime() == null) {
Expand Down Expand Up @@ -220,20 +217,12 @@ public void setClusterService(IClusterService clusterService) {
this.clusterService = clusterService;
}

public void setAutoRegistration(boolean autoRegistration) {
this.autoRegistration = autoRegistration;
}

public void setAutoReload(boolean autoReload) {
this.autoReload = autoReload;
}

public void setDataService(IDataService dataService) {
this.dataService = dataService;
}

public boolean isAutoRegistration() {
return autoRegistration;
return parameterService.is(ParameterConstants.AUTO_REGISTER_ENABLED);
}

}
15 changes: 14 additions & 1 deletion symmetric/src/main/resources/symmetric-default.properties
Expand Up @@ -45,16 +45,21 @@ symmetric.auto.config.database=true
symmetric.auto.upgrade=true

# If this is true, registration is opened automatically for nodes requesting it
#
# This property is override-able in the database.
symmetric.auto.registration=false

# If this is true, a reload is automatically sent to nodes when they register
#
# This property is override-able in the database.
symmetric.auto.reload=false

# This is the download rate for the HTTP symmetric transport. -1 means full throttle.
symmetric.http.download.rate.kb=-1

# This is the number of HTTP concurrent push/pull requests symmetric will accept. This is controlled
# by the NodeConcurrencyFilter. The number is per servlet the filter is applied to.
#
# This property is override-able in the database.
symmetric.http.concurrent.workers.max=20

Expand All @@ -70,21 +75,29 @@ symmetric.runtime.https.verified.server.names=

# This is the maximum number of events that will be peeked at to look for additional transaction rows after
# the max batch size is reached. The more concurrency in your db and the longer the transaction takes the
# bigger this value might have to be.
# bigger this value might have to be.
#
# This property is override-able in the database.
symmetric.runtime.outgoing.batches.peek.ahead.window.after.max.size=100

# This instructs symmetric to attempt to skip duplicate batches that are received. Symmetric might
# be more efficient when recovering from error conditions if this is set to true, but you run the
# risk of missing data if the batch ids get reset (on one node, but not another) somehow (which is unlikely in production, but
# fairly likely in lab or development setups).
#
# This property is override-able in the database.
symmetric.runtime.incoming.batches.skip.duplicates=true

# This is the number of times we will attempt to send an ACK back to the remote node
# when pulling and loading data.
#
# This property is override-able in the database.
symmetric.runtime.num.of.ack.retries=5

# This is the amount of time to wait between trying to send an ACK back to the remote node
# when pulling and loading data.
#
# This property is override-able in the database.
symmetric.runtime.time.between.ack.retries.ms=5000


Expand Down

0 comments on commit 9665990

Please sign in to comment.