Skip to content

Commit

Permalink
1979581 - fixed the HTTP transport so that node concurrency pushes ar…
Browse files Browse the repository at this point in the history
…e more efficient. Also effectively got rid of IRuntimeConfig
  • Loading branch information
chenson42 committed Jun 8, 2008
1 parent c996080 commit 087d437
Show file tree
Hide file tree
Showing 62 changed files with 743 additions and 1,009 deletions.
1 change: 0 additions & 1 deletion symmetric/pom.xml
Expand Up @@ -451,7 +451,6 @@
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.5</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
Expand Down
58 changes: 24 additions & 34 deletions symmetric/src/main/java/org/jumpmind/symmetric/SymmetricEngine.java
Expand Up @@ -34,7 +34,6 @@
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.config.IRuntimeConfig;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.job.PullJob;
import org.jumpmind.symmetric.job.PurgeJob;
Expand All @@ -43,6 +42,7 @@
import org.jumpmind.symmetric.service.IBootstrapService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IPullService;
import org.jumpmind.symmetric.service.IPurgeService;
import org.jumpmind.symmetric.service.IPushService;
Expand Down Expand Up @@ -70,7 +70,7 @@ public class SymmetricEngine {

private IBootstrapService bootstrapService;

private IRuntimeConfig runtimeConfig;
private IParameterService parameterService;

private INodeService nodeService;

Expand All @@ -88,8 +88,6 @@ public class SymmetricEngine {

private IDbDialect dbDialect;

private Properties properties;

private static Map<String, SymmetricEngine> registeredEnginesByUrl = new HashMap<String, SymmetricEngine>();

private static Map<String, SymmetricEngine> registeredEnginesByName = new HashMap<String, SymmetricEngine>();
Expand Down Expand Up @@ -126,7 +124,7 @@ protected SymmetricEngine(ApplicationContext ctx) {
}

public void stop() {
logger.info("Closing SymmetricDS externalId=" + runtimeConfig.getExternalId() + " version="
logger.info("Closing SymmetricDS externalId=" + parameterService.getExternalId() + " version="
+ Version.version() + " database=" + dbDialect.getName());
removeMeFromMap(registeredEnginesByName);
removeMeFromMap(registeredEnginesByUrl);
Expand All @@ -139,9 +137,8 @@ public void stop() {
}
}
applicationContext = null;
properties = null;
bootstrapService = null;
runtimeConfig = null;
parameterService = null;
nodeService = null;
registrationService = null;
purgeService = null;
Expand All @@ -166,35 +163,22 @@ private ApplicationContext createContext() {

private void init(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
properties = (Properties) applicationContext.getBean(Constants.PROPERTIES);
bootstrapService = (IBootstrapService) applicationContext.getBean(Constants.BOOTSTRAP_SERVICE);
runtimeConfig = (IRuntimeConfig) applicationContext.getBean(Constants.RUNTIME_CONFIG);
parameterService = (IParameterService) applicationContext.getBean(Constants.PARAMETER_SERVICE);
nodeService = (INodeService) applicationContext.getBean(Constants.NODE_SERVICE);
registrationService = (IRegistrationService) applicationContext
.getBean(Constants.REGISTRATION_SERVICE);
purgeService = (IPurgeService) applicationContext.getBean(Constants.PURGE_SERVICE);
dataService = (IDataService) applicationContext.getBean(Constants.DATA_SERVICE);
dbDialect = (IDbDialect) applicationContext.getBean(Constants.DB_DIALECT);
registerEngine();
startLegacyJMX();
logger.info("Initialized SymmetricDS externalId=" + runtimeConfig.getExternalId() + " version="
+ Version.version() + " database=" + dbDialect.getName());
}

private void startLegacyJMX() {
try {
getApplicationContext().getBean(Constants.LEGACY_JMX);
} catch (Exception ex) {
logger.warn("Unable to register legacy JMX bean: " + ex.getMessage());
}
}

/**
* Register this instance of the engine so it can be found by other processes in the JVM.
* @see #findEngineByUrl(String)
*/
private void registerEngine() {
registeredEnginesByUrl.put(runtimeConfig.getMyUrl(), this);
registeredEnginesByUrl.put(parameterService.getMyUrl(), this);
registeredEnginesByName.put(getEngineName(), this);
}

Expand All @@ -203,31 +187,31 @@ private void registerEngine() {
*/
private void startJobs() {
if (Boolean.TRUE.toString().equalsIgnoreCase(
properties.getProperty(ParameterConstants.START_PUSH_JOB))) {
parameterService.getString(ParameterConstants.START_PUSH_JOB))) {
applicationContext.getBean(Constants.PUSH_JOB_TIMER);
}
if (Boolean.TRUE.toString().equalsIgnoreCase(
properties.getProperty(ParameterConstants.START_PULL_JOB))) {
parameterService.getString(ParameterConstants.START_PULL_JOB))) {
applicationContext.getBean(Constants.PULL_JOB_TIMER);
}

if (Boolean.TRUE.toString().equalsIgnoreCase(
properties.getProperty(ParameterConstants.START_PURGE_JOB))) {
parameterService.getString(ParameterConstants.START_PURGE_JOB))) {
applicationContext.getBean(Constants.PURGE_JOB_TIMER);
}

if (Boolean.TRUE.toString().equalsIgnoreCase(
properties.getProperty(ParameterConstants.START_HEARTBEAT_JOB))) {
parameterService.getString(ParameterConstants.START_HEARTBEAT_JOB))) {
applicationContext.getBean(Constants.HEARTBEAT_JOB_TIMER);
}

if (Boolean.TRUE.toString().equalsIgnoreCase(
properties.getProperty(ParameterConstants.START_SYNCTRIGGERS_JOB))) {
parameterService.getString(ParameterConstants.START_SYNCTRIGGERS_JOB))) {
applicationContext.getBean(Constants.SYNC_TRIGGERS_JOB_TIMER);
}

if (Boolean.TRUE.toString().equalsIgnoreCase(
properties.getProperty(ParameterConstants.START_STATISTIC_FLUSH_JOB))) {
parameterService.getString(ParameterConstants.START_STATISTIC_FLUSH_JOB))) {
applicationContext.getBean(Constants.STATISTIC_FLUSH_JOB_TIMER);
}

Expand All @@ -237,7 +221,9 @@ private void startJobs() {
* Get a list of configured properties for Symmetric. Read-only.
*/
public Properties getProperties() {
return new Properties(properties);
Properties p = new Properties();
p.putAll(parameterService.getAllParameters());
return p;
}

public String getEngineName() {
Expand All @@ -261,18 +247,22 @@ public synchronized void start() {
if (!starting) {
starting = true;
setup();
registerEngine();
Node node = nodeService.findIdentity();
if (node != null) {
logger.info("Starting registered node [group=" + node.getNodeGroupId() + ", id="
+ node.getNodeId() + ", externalId=" + node.getExternalId() + "]");
} else {
logger.info("Starting unregistered node [group=" + runtimeConfig.getNodeGroupId()
+ ", externalId=" + runtimeConfig.getExternalId() + "]");
logger.info("Starting unregistered node [group=" + parameterService.getNodeGroupId()
+ ", externalId=" + parameterService.getExternalId() + "]");
}
bootstrapService.register();
bootstrapService.syncTriggers();
startJobs();
started = true;
logger.info("Started SymmetricDS externalId=" + parameterService.getExternalId() + " version="
+ Version.version() + " database=" + dbDialect.getName());

}
}

Expand All @@ -290,7 +280,7 @@ public void reloadNode(String nodeId) {
*/
public void push() {
if (!Boolean.TRUE.toString().equalsIgnoreCase(
properties.getProperty(ParameterConstants.START_PUSH_JOB))) {
parameterService.getString(ParameterConstants.START_PUSH_JOB))) {
((IPushService) applicationContext.getBean(Constants.PUSH_SERVICE)).pushData();
} else {
throw new UnsupportedOperationException("Cannot actuate a push if it is already scheduled.");
Expand All @@ -312,7 +302,7 @@ public void syncTriggers() {
*/
public void pull() {
if (!Boolean.TRUE.toString().equalsIgnoreCase(
properties.getProperty(ParameterConstants.START_PULL_JOB))) {
parameterService.getString(ParameterConstants.START_PULL_JOB))) {
((IPullService) applicationContext.getBean(Constants.PULL_SERVICE)).pullData();
} else {
throw new UnsupportedOperationException("Cannot actuate a push if it is already scheduled.");
Expand All @@ -325,7 +315,7 @@ public void pull() {
*/
public void purge() {
if (!Boolean.TRUE.toString().equalsIgnoreCase(
properties.getProperty(ParameterConstants.START_PURGE_JOB))) {
parameterService.getString(ParameterConstants.START_PURGE_JOB))) {
purgeService.purge();
} else {
throw new UnsupportedOperationException("Cannot actuate a purge if it is already scheduled.");
Expand Down
Expand Up @@ -93,7 +93,7 @@ public class Constants {

public static final String NODE_CONCURRENCY_FILTER = "nodeConcurrencyFilter";

public static final String RUNTIME_CONFIG = "runtimeConfiguration";
public static final String CONCURRENT_CONNECTION_MANGER = "concurrentConnectionManager";

public static final String DB_DIALECT = "dbDialect";

Expand Down
Expand Up @@ -32,6 +32,13 @@ public class ParameterConstants {
public final static String START_RUNTIME_REGISTRATION_URL = "symmetric.runtime.registration.url";
public final static String START_RUNTIME_MY_URL = "symmetric.runtime.my.url";
public final static String START_RUNTIME_ENGINE_NAME = "symmetric.runtime.engine.name";
public final static String START_RUNTIME_GROUP_ID = "symmetric.runtime.group.id";
public final static String START_RUNTIME_EXTERNAL_ID = "symmetric.runtime.external.id";
public final static String START_RUNTIME_SCHEMA_VERSION = "symmetric.runtime.schema.version";

@Deprecated
public final static String RUNTIME_CONFIGURATION_CLASS = "symmetric.runtime.configuration.class";

public final static String AUTO_REGISTER_ENABLED = "symmetric.auto.registration";
public final static String AUTO_RELOAD_ENABLED = "symmetric.auto.reload";
public final static String AUTO_DELETE_BEFORE_RELOAD = "symmetric.runtime.initial.load.delete.first";
Expand All @@ -40,6 +47,7 @@ public class ParameterConstants {
public final static String PARAMETER_REFRESH_PERIOD_IN_MS = "symmetric.runtime.parameter.reload.timeout.ms";

public final static String CONCURRENT_WORKERS = "symmetric.http.concurrent.workers.max";
public final static String CONCURRENT_RESERVATION_TIMEOUT = "symmetric.http.concurrent.reservation.timeout.ms";

public final static String OUTGOING_BATCH_PEEK_AHEAD_WINDOW = "symmetric.runtime.outgoing.batches.peek.ahead.window.after.max.size";
public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "symmetric.runtime.incoming.batches.skip.duplicates";
Expand Down
@@ -0,0 +1,11 @@
package org.jumpmind.symmetric.config;

public interface IParameterFilter {

/**
* @param key
* @param value
* @return the new value
*/
public String filterParameter(String key, String value);
}
Expand Up @@ -23,14 +23,15 @@

/**
* This interface is used to pull the runtime configuration for this
* symmetric installation.
* SymmetricDSinstallation.
*
* If the registrationURL is null, then this server will not register with another
* server (it is likely that it is the host itself).
*
* This interface is meant to be 'pluggable.' It might be that different installations
* might want to pull this information from different places.
*/
@Deprecated
public interface IRuntimeConfig {

/**
Expand All @@ -44,15 +45,16 @@ public interface IRuntimeConfig {
public String getExternalId();

/**
* Provide the url used to register at to get initial configuration information
* Provide the URL used to register at to get initial configuration information
*/
public String getRegistrationUrl();

/**
* Provide information about the url used to contact this symmetric instance
* Provide the URL of this specific instance of SymmetricDS
*/
public String getMyUrl();


/**
* Provide information about the version of the schema being sync'd.
*/
Expand Down

This file was deleted.

0 comments on commit 087d437

Please sign in to comment.