Skip to content

Commit

Permalink
change outgoing_batch to use the new sequenceservice table
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 8, 2012
1 parent b5b613d commit 835f597
Show file tree
Hide file tree
Showing 17 changed files with 198 additions and 167 deletions.
Expand Up @@ -109,8 +109,6 @@ public String getSyncTriggersExpression() {
@Override
public String getSequenceName(SequenceIdentifier identifier) {
switch (identifier) {
case OUTGOING_BATCH:
return "SYM_OUTGOING_BATCH_BATCH_ID";
case DATA:
return "SYM_DATA_DATA_ID";
case TRIGGER_HIST:
Expand Down
Expand Up @@ -107,8 +107,6 @@ public boolean supportsTransactionId() {
@Override
public String getSequenceName(SequenceIdentifier identifier) {
switch (identifier) {
case OUTGOING_BATCH:
return "SEQ_SYM_OUTGOIN_BATCH_BATCH_ID";
case DATA:
return "SEQ_SYM_DATA_DATA_ID";
case TRIGGER_HIST:
Expand Down
@@ -1,5 +1,9 @@
package org.jumpmind.symmetric;

import java.io.File;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -8,9 +12,13 @@
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.io.DatabaseIO;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.SqlScript;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.properties.TypedProperties;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand All @@ -23,6 +31,7 @@
import org.jumpmind.symmetric.job.DefaultOfflineServerListener;
import org.jumpmind.symmetric.job.IJobManager;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.NodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.service.IAcknowledgeService;
Expand All @@ -42,6 +51,7 @@
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.service.ISecurityService;
import org.jumpmind.symmetric.service.ISequenceService;
import org.jumpmind.symmetric.service.IStatisticService;
import org.jumpmind.symmetric.service.ITransformService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
Expand All @@ -62,6 +72,7 @@
import org.jumpmind.symmetric.service.impl.RegistrationService;
import org.jumpmind.symmetric.service.impl.RouterService;
import org.jumpmind.symmetric.service.impl.SecurityService;
import org.jumpmind.symmetric.service.impl.SequenceService;
import org.jumpmind.symmetric.service.impl.StatisticService;
import org.jumpmind.symmetric.service.impl.TransformService;
import org.jumpmind.symmetric.service.impl.TriggerRouterService;
Expand Down Expand Up @@ -143,6 +154,8 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine {
protected IPullService pullService;

protected IJobManager jobManager;

protected ISequenceService sequenceService;

protected IExtensionPointManager extensionPointManger;

Expand Down Expand Up @@ -192,7 +205,7 @@ protected void init() {
TypedProperties properties = this.propertiesFactory.reload();
this.platform = createDatabasePlatform(properties);
this.parameterService = new ParameterService(platform, propertiesFactory, properties.get(
ParameterConstants.RUNTIME_CONFIG_TABLE_PREFIX, "sym"));
ParameterConstants.RUNTIME_CONFIG_TABLE_PREFIX, "sym"));

MDC.put("engineName", this.parameterService.getEngineName());

Expand All @@ -203,6 +216,7 @@ protected void init() {

this.bandwidthService = new BandwidthService(parameterService);
this.symmetricDialect = createSymmetricDialect();
this.sequenceService = new SequenceService(parameterService, symmetricDialect);
this.stagingManager = createStagingManager();
this.nodeService = new NodeService(parameterService, symmetricDialect);
this.configurationService = new ConfigurationService(parameterService, symmetricDialect,
Expand All @@ -219,7 +233,7 @@ protected void init() {
this.triggerRouterService = new TriggerRouterService(parameterService, symmetricDialect,
clusterService, configurationService, statisticManager);
this.outgoingBatchService = new OutgoingBatchService(parameterService, symmetricDialect,
nodeService, configurationService);
nodeService, configurationService, sequenceService);
this.dataService = new DataService(parameterService, symmetricDialect, deploymentType,
triggerRouterService, nodeService, purgeService, configurationService,
outgoingBatchService, statisticManager);
Expand Down Expand Up @@ -301,10 +315,129 @@ public void setup() {
}

public void setupDatabase(boolean force) {
configurationService.autoConfigDatabase(force);
clusterService.initLockTable();
log.info("Initializing SymmetricDS database");
if (parameterService.is(ParameterConstants.AUTO_CONFIGURE_DATABASE) || force) {
symmetricDialect.initTablesAndFunctions();
} else {
log.info("SymmetricDS is not configured to auto-create the database");
}
configurationService.initDefaultChannels();
clusterService.init();
sequenceService.init();
autoConfigRegistrationServer();
parameterService.rereadParameters();
log.info("Done initializing SymmetricDS database");
}

protected void autoConfigRegistrationServer() {
Node node = nodeService.findIdentity();

if (node == null) {
buildTablesFromDdlUtilXmlIfProvided();
loadFromScriptIfProvided();
}

node = nodeService.findIdentity();

if (node == null && StringUtils.isBlank(parameterService.getRegistrationUrl())
&& parameterService.is(ParameterConstants.AUTO_INSERT_REG_SVR_IF_NOT_FOUND, false)) {
log.info("Inserting rows for node, security, identity and group for registration server");
String nodeGroupId = parameterService.getNodeGroupId();
String nodeId = parameterService.getExternalId();
try {
nodeService.insertNode(nodeId, nodeGroupId, nodeId, nodeId);
} catch (UniqueKeyException ex) {
log.warn("Not inserting node row for {} because it already exists", nodeId);
}
nodeService.insertNodeIdentity(nodeId);
node = nodeService.findIdentity();
node.setSyncUrl(parameterService.getSyncUrl());
node.setSyncEnabled(true);
node.setHeartbeatTime(new Date());
nodeService.updateNode(node);
nodeService.insertNodeGroup(node.getNodeGroupId(), null);
NodeSecurity nodeSecurity = nodeService.findNodeSecurity(nodeId, true);
nodeSecurity.setInitialLoadTime(new Date());
nodeSecurity.setRegistrationTime(new Date());
nodeSecurity.setInitialLoadEnabled(false);
nodeSecurity.setRegistrationEnabled(false);
nodeService.updateNodeSecurity(nodeSecurity);
}
}

protected boolean buildTablesFromDdlUtilXmlIfProvided() {
boolean loaded = false;
String xml = parameterService
.getString(ParameterConstants.AUTO_CONFIGURE_REG_SVR_DDLUTIL_XML);
if (!StringUtils.isBlank(xml)) {
File file = new File(xml);
URL fileUrl = null;
if (file.isFile()) {
try {
fileUrl = file.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
} else {
fileUrl = getClass().getResource(xml);
}

if (fileUrl != null) {
try {
log.info("Building database schema from: {}", xml);
Database database = new DatabaseIO().read(new InputStreamReader(fileUrl
.openStream()));
IDatabasePlatform platform = symmetricDialect.getPlatform();
platform.createDatabase(database, true, true);
loaded = true;
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}
}
return loaded;
}

/**
* Give the end user the option to provide a script that will load a
* registration server with an initial SymmetricDS setup.
*
* Look first on the file system, then in the classpath for the SQL file.
*
* @return true if the script was executed
*/
protected boolean loadFromScriptIfProvided() {
boolean loaded = false;
String sqlScript = parameterService
.getString(ParameterConstants.AUTO_CONFIGURE_REG_SVR_SQL_SCRIPT);
if (!StringUtils.isBlank(sqlScript)) {
File file = new File(sqlScript);
URL fileUrl = null;
if (file.isFile()) {
try {
fileUrl = file.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
} else {
fileUrl = getClass().getResource(sqlScript);
if (fileUrl == null) {
fileUrl = Thread.currentThread().getContextClassLoader().getResource(sqlScript);
}
}

if (fileUrl != null) {
new SqlScript(fileUrl, symmetricDialect.getPlatform().getSqlTemplate(), true,
SqlScript.QUERY_ENDS, getSymmetricDialect().getPlatform()
.getSqlScriptReplacementTokens()).execute();
loaded = true;
}
}
return loaded;
}



public synchronized boolean start() {
return start(true);
}
Expand Down Expand Up @@ -626,6 +759,10 @@ public IExtensionPointManager getExtensionPointManager() {
public IStagingManager getStagingManager() {
return stagingManager;
}

public ISequenceService getSequenceService() {
return sequenceService;
}

private void removeMeFromMap(Map<String, ISymmetricEngine> map) {
Set<String> keys = new HashSet<String>(map.keySet());
Expand Down
Expand Up @@ -560,8 +560,6 @@ public long insertWithGeneratedKey(final String sql, final SequenceIdentifier id

public String getSequenceName(SequenceIdentifier identifier) {
switch (identifier) {
case OUTGOING_BATCH:
return "sym_outgoing_batch_batch_id";
case DATA:
return "sym_data_data_id";
case TRIGGER_HIST:
Expand All @@ -572,8 +570,6 @@ public String getSequenceName(SequenceIdentifier identifier) {

public String getSequenceKeyName(SequenceIdentifier identifier) {
switch (identifier) {
case OUTGOING_BATCH:
return "batch_id";
case DATA:
return "data_id";
case TRIGGER_HIST:
Expand Down
Expand Up @@ -20,5 +20,5 @@
package org.jumpmind.symmetric.db;

public enum SequenceIdentifier {
OUTGOING_BATCH, DATA, TRIGGER_HIST
DATA, TRIGGER_HIST
}
Expand Down
Expand Up @@ -13,6 +13,20 @@ public class Sequence {
private String lastUpdateBy;
private Date lastUpdateTime;
private boolean cycle;

public Sequence() {
}

public Sequence(String sequenceName, long currentValue, int incrementBy, long minValue,
long maxValue, String lastUpdateBy, boolean cycle) {
this.sequenceName = sequenceName;
this.currentValue = currentValue;
this.incrementBy = incrementBy;
this.minValue = minValue;
this.maxValue = maxValue;
this.lastUpdateBy = lastUpdateBy;
this.cycle = cycle;
}

public String getSequenceName() {
return sequenceName;
Expand Down
Expand Up @@ -30,7 +30,7 @@
*/
public interface IClusterService {

public void initLockTable();
public void init();

public void initLockTable(final String action);

Expand Down
Expand Up @@ -86,7 +86,7 @@ public interface IConfigurationService {

public void reloadChannels();

public void autoConfigDatabase(boolean force);
public void initDefaultChannels();

/**
* Returns two sets of channel names, one for suspended channels and one for
Expand Down
Expand Up @@ -12,4 +12,6 @@ public interface ISequenceService {

public Sequence get(String name);

public void init();

}
Expand Up @@ -59,7 +59,7 @@ public ClusterService(IParameterService parameterService, ISymmetricDialect dial
createSqlReplacementTokens()));
}

public void initLockTable() {
public void init() {
initLockTable(ROUTE);
initLockTable(PULL);
initLockTable(PUSH);
Expand Down

0 comments on commit 835f597

Please sign in to comment.