Skip to content

Commit

Permalink
1858083 - fixed dataload for sql server. also fixed another problem i…
Browse files Browse the repository at this point in the history
…n the data loader for sql server.
  • Loading branch information
chenson42 committed Dec 25, 2007
1 parent 7682c6d commit 7c2d507
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 163 deletions.
Expand Up @@ -34,6 +34,7 @@
import org.jumpmind.symmetric.job.PushJob;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IBootstrapService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IPullService;
import org.jumpmind.symmetric.service.IPurgeService;
Expand Down Expand Up @@ -70,6 +71,8 @@ public class SymmetricEngine {
private IRegistrationService registrationService;

private IPurgeService purgeService;

private IDataService dataService;

private boolean started = false;

Expand Down Expand Up @@ -130,7 +133,8 @@ private void init(ApplicationContext applicationContext) {
registrationService = (IRegistrationService) applicationContext
.getBean(Constants.REGISTRATION_SERVICE);
purgeService = (IPurgeService) applicationContext
.getBean(Constants.PURGE_SERVICE);
.getBean(Constants.PURGE_SERVICE);
dataService = (IDataService)applicationContext.getBean(Constants.DATA_SERVICE);
dbDialect = (IDbDialect)applicationContext.getBean(Constants.DB_DIALECT);
registerEngine();
logger.info("Initialized SymmetricDS externalId=" + runtimeConfig.getExternalId() + " version=" + Version.VERSION + " database="+dbDialect.getName());
Expand Down Expand Up @@ -213,6 +217,13 @@ public synchronized void start() {
}
}

/**
* Queue up an initial load or a reload to a node.
*/
public void reloadNode(String nodeId) {
dataService.reloadNode(nodeId);
}

/**
* This can be called if the push job has not been enabled. It will perform a push
* the same way the {@link PushJob} would have.
Expand Down
Expand Up @@ -114,10 +114,10 @@ public IColumnFilter getDatabaseColumnFilter() {
return null;
}

public void prepareTableForInserts(Table table) {
public void prepareTableForDataLoad(Table table) {
}

public void cleanupAfterInserts(Table table) {
public void cleanupAfterDataLoad(Table table) {
}

protected boolean allowsNullForIdentityColumn() {
Expand Down
16 changes: 12 additions & 4 deletions symmetric/src/main/java/org/jumpmind/symmetric/db/IDbDialect.java
Expand Up @@ -45,10 +45,18 @@ public void initTrigger(DataEventType dml, Trigger config,
public String getEngineName();

public void removeTrigger(String schemaName, String triggerName, String tableName);

public void prepareTableForInserts(Table table);

public void cleanupAfterInserts(Table table);

/**
* This is called by the data loader each time the table context changes, giving the dialect an opportunity to do any pre loading work. Only one
* table is active at any one point.
*/
public void prepareTableForDataLoad(Table table);

/**
* This is called by the data loader each time the table context changes away from a table or when the the data loader is closed, giving the dialect
* an opportunity to do any post loading work for the given table.
*/
public void cleanupAfterDataLoad(Table table);

public void initConfigDb(String tablePrefix);

Expand Down
18 changes: 11 additions & 7 deletions symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java
Expand Up @@ -35,6 +35,8 @@

public class SqlTemplate {

private static final String ORIG_TABLE_ALIAS = "orig";

static final String INSERT_TRIGGER_TEMPLATE = "insertTriggerTemplate";

static final String UPDATE_TRIGGER_TEMPLATE = "updateTriggerTemplate";
Expand Down Expand Up @@ -78,7 +80,7 @@ public String createInitalLoadSql(Node node, IDbDialect dialect, Trigger trig, T
sql = replace("externalId", node.getExternalId(), sql);

Column[] columns = trig.orderColumnsForTable(metaData);
String columnsText = buildColumnString("t", columns);
String columnsText = buildColumnString("t", "t", columns);
sql = replace("columns", columnsText, sql);
return sql;
}
Expand All @@ -99,7 +101,7 @@ public String createCsvDataSql(Trigger trig, Table metaData, String whereClause)
sql = replace("whereClause", whereClause, sql);

Column[] columns = trig.orderColumnsForTable(metaData);
String columnsText = buildColumnString("t", columns);
String columnsText = buildColumnString("t", "t", columns);
sql = replace("columns", columnsText, sql);
return sql;
}
Expand All @@ -111,7 +113,7 @@ public String createCsvPrimaryKeySql(Trigger trig, Table metaData, String whereC
sql = replace("whereClause", whereClause, sql);

Column[] columns = metaData.getPrimaryKeyColumns();
String columnsText = buildColumnString("t", columns);
String columnsText = buildColumnString("t", "t", columns);
sql = replace("columns", columnsText, sql);
return sql;
}
Expand Down Expand Up @@ -177,17 +179,18 @@ private String replaceTemplateVariables(IDbDialect dialect, DataEventType dml, T
ddl = replace("syncOnDeleteCondition", trigger.getSyncOnDeleteCondition(), ddl);
ddl = replace("syncOnIncomingBatchCondition", trigger.isSyncOnIncomingBatch() ? "1=1" : dialect
.getSyncTriggersExpression(), ddl);
ddl = replace("origTableAlias", ORIG_TABLE_ALIAS, ddl);

Column[] columns = trigger.orderColumnsForTable(metaData);
String columnsText = buildColumnString(newTriggerValue, columns);
String columnsText = buildColumnString(ORIG_TABLE_ALIAS, newTriggerValue, columns);
ddl = replace("columns", columnsText, ddl);
ddl = eval(containsBlobClobColumns(columns), "containsBlobClobColumns", ddl);

columns = metaData.getPrimaryKeyColumns();
columnsText = buildColumnString(oldTriggerValue, columns);
columnsText = buildColumnString(ORIG_TABLE_ALIAS, oldTriggerValue, columns);
ddl = replace("oldKeys", columnsText, ddl);
ddl = replace("oldNewPrimaryKeyJoin", aliasedPrimaryKeyJoin(oldTriggerValue, newTriggerValue, columns), ddl);
ddl = replace("tableNewPrimaryKeyJoin", aliasedPrimaryKeyJoin("orig", newTriggerValue, columns), ddl);
ddl = replace("tableNewPrimaryKeyJoin", aliasedPrimaryKeyJoin(ORIG_TABLE_ALIAS, newTriggerValue, columns), ddl);

// replace $(newTriggerValue) and $(oldTriggerValue)
ddl = replace("newTriggerValue", newTriggerValue, ddl);
Expand Down Expand Up @@ -256,7 +259,7 @@ private String aliasedPrimaryKeyJoin(String aliasOne, String aliasTwo, Column[]
return b.toString();
}

private String buildColumnString(String tableAlias, Column[] columns) {
private String buildColumnString(String origTableAlias, String tableAlias, Column[] columns) {
String columnsText = "";
for (Column column : columns) {
String templateToUse = null;
Expand Down Expand Up @@ -319,6 +322,7 @@ private String buildColumnString(String tableAlias, Column[] columns) {
columnsText = columnsText.substring(0, columnsText.length() - LAST_COMMAN_TOKEN.length());
}

columnsText = replace("origTableAlias", origTableAlias, columnsText);
return replace("tableAlias", tableAlias, columnsText);

}
Expand Down
Expand Up @@ -85,14 +85,14 @@ public Object[] filterColumnsValues(DmlType dml, Table table, Object[] columnVal
}

@Override
public void prepareTableForInserts(Table table) {
public void prepareTableForDataLoad(Table table) {
if (table != null && table.getAutoIncrementColumns().length > 0) {
jdbcTemplate.execute("SET IDENTITY_INSERT " + table.getName() + " ON");
}
}

@Override
public void cleanupAfterInserts(Table table) {
public void cleanupAfterDataLoad(Table table) {
if (table != null && table.getAutoIncrementColumns().length > 0) {
jdbcTemplate.execute("SET IDENTITY_INSERT " + table.getName() + " OFF");
}
Expand Down
Expand Up @@ -30,8 +30,10 @@ public class DataExtractorContext implements Cloneable {
private List<String> auditRecordsWritten = new ArrayList<String>();
private String lastTableName;
private OutgoingBatch batch;
private IDataExtractor dataExtractor;

public DataExtractorContext copy() {
public DataExtractorContext copy(IDataExtractor extractor) {
this.dataExtractor = extractor;
DataExtractorContext newVersion;
try {
newVersion = (DataExtractorContext)super.clone();
Expand Down Expand Up @@ -62,4 +64,8 @@ public void setBatch(OutgoingBatch batch) {
this.batch = batch;
}

public IDataExtractor getDataExtractor() {
return dataExtractor;
}

}
Expand Up @@ -44,7 +44,7 @@ class StreamReloadDataCommand extends AbstractStreamDataCommand {
public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException {
Trigger trigger = configurationService.getTriggerById(data.getAudit().getTriggerId());
Node node = nodeService.findNode(context.getBatch().getNodeId());
dataExtractorService.extractInitialLoadWithinBatchFor(node, trigger, new InternalOutgoingTransport(out));
dataExtractorService.extractInitialLoadWithinBatchFor(node, trigger, new InternalOutgoingTransport(out), context);
out.flush();
}

Expand Down
Expand Up @@ -169,12 +169,23 @@ protected boolean isMetaTokenParsed(String[] tokens) {
}

protected void setTable(String tableName, boolean useCache) {

cleanupAfterDataLoad();

context.setTableName(tableName);

if (!useCache || context.getTableTemplate() == null) {
context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, tableName,
this.columnFilters != null ? this.columnFilters.get(tableName) : null));
this.columnFilters != null ? this.columnFilters.get(tableName) : null));
}

dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable());
}

protected void cleanupAfterDataLoad() {
if (context.getTableName() != null) {
dbDialect.cleanupAfterDataLoad(context.getTableTemplate().getTable());
}
dbDialect.prepareTableForInserts(context.getTableTemplate().getTable());
}

protected int insert(String[] tokens, BinaryEncoding encoding) {
Expand Down Expand Up @@ -322,15 +333,13 @@ public IDataLoader clone() {
}

public void close() {

cleanupAfterDataLoad();

if (csvReader != null) {
csvReader.close();
}
if (context != null) {
Table[] tables = context.getAllTablesProcessed();
for (Table table : tables) {
dbDialect.cleanupAfterInserts(table);
}
}

}

public IDataLoaderContext getContext() {
Expand Down
Expand Up @@ -21,6 +21,7 @@

package org.jumpmind.symmetric.service;

import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.Trigger;
Expand All @@ -33,7 +34,7 @@ public interface IDataExtractorService {
public OutgoingBatch extractInitialLoadFor(Node node, Trigger config, IOutgoingTransport transport);

public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger,
final IOutgoingTransport transport);
final IOutgoingTransport transport, DataExtractorContext ctx);

/**
* @return true if work was done or false if there was no work to do.
Expand Down
Expand Up @@ -80,9 +80,9 @@ public void extractNodeIdentityFor(Node node, IOutgoingTransport transport) {
outgoingBatchService.insertOutgoingBatch(batch);

try {
BufferedWriter writer = transport.open();
DataExtractorContext ctxCopy = context.copy();
BufferedWriter writer = transport.open();
IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
DataExtractorContext ctxCopy = context.copy(dataExtractor);
dataExtractor.init(writer, ctxCopy);
dataExtractor.begin(batch, writer);
TriggerHistory audit = new TriggerHistory(tableName, "node_id", "node_id");
Expand Down Expand Up @@ -110,22 +110,21 @@ public OutgoingBatch extractInitialLoadFor(Node node, final Trigger trigger, fin

OutgoingBatch batch = new OutgoingBatch(node, trigger.getChannelId(), BatchType.INITIAL_LOAD);
outgoingBatchService.insertOutgoingBatch(batch);
writeInitialLoad(node, trigger, transport, batch);
writeInitialLoad(node, trigger, transport, batch, null);
outgoingBatchService.markOutgoingBatchSent(batch);
return batch;
}

public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, final IOutgoingTransport transport) {

writeInitialLoad(node, trigger, transport, null);
public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, final IOutgoingTransport transport, DataExtractorContext ctx) {
writeInitialLoad(node, trigger, transport, null, ctx);
}

protected void writeInitialLoad(Node node, final Trigger trigger, final IOutgoingTransport transport,
final OutgoingBatch batch) {
final OutgoingBatch batch, final DataExtractorContext ctx) {

final String sql = dbDialect.createInitalLoadSqlFor(node, trigger);
final TriggerHistory audit = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
final IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
final IDataExtractor dataExtractor = ctx != null ? ctx.getDataExtractor() : getDataExtractor(node.getSymmetricVersion());

jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
Expand All @@ -134,8 +133,8 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc
java.sql.ResultSet.CONCUR_READ_ONLY);
st.setFetchSize(dbDialect.getStreamingResultsFetchSize());
ResultSet rs = st.executeQuery();
final BufferedWriter writer = transport.open();
final DataExtractorContext ctxCopy = context.copy();
final BufferedWriter writer = transport.open();
final DataExtractorContext ctxCopy = ctx == null ? context.copy(dataExtractor) : ctx;
if (batch != null) {
dataExtractor.init(writer, ctxCopy);
dataExtractor.begin(batch, writer);
Expand Down Expand Up @@ -322,7 +321,7 @@ public void endBatch(OutgoingBatch batch) throws Exception {

public void init() throws Exception {
this.writer = transport.open();
this.context = DataExtractorService.this.context.copy();
this.context = DataExtractorService.this.context.copy(dataExtractor);
dataExtractor.init(writer, context);
}

Expand Down
Expand Up @@ -39,10 +39,13 @@
import org.jumpmind.symmetric.model.BatchInfo;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.IncomingBatchHistory;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.IncomingBatchHistory.Status;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IDataLoaderService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.transport.AbstractTransportManager;
import org.jumpmind.symmetric.transport.IIncomingTransport;
Expand Down Expand Up @@ -74,6 +77,12 @@ public IIncomingTransport getPullTransport(final Node remote, final Node local)
runAtClient(remote.getSyncURL(), null, respOs, new IClientRunnable() {
public void run(BeanFactory factory, InputStream is, OutputStream os)
throws Exception {
// TODO this is duplicated from the Pull Servlet. It should be consolidated somehow!
INodeService nodeService = (INodeService)factory.getBean(Constants.NODE_SERVICE);
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
if (security.isInitialLoadEnabled()) {
((IDataService)factory.getBean(Constants.DATA_SERVICE)).insertReloadEvent(local);
}
IDataExtractorService extractor = (IDataExtractorService) factory
.getBean(Constants.DATAEXTRACTOR_SERVICE);
IOutgoingTransport transport = new InternalOutgoingTransport(
Expand Down
Expand Up @@ -36,10 +36,7 @@
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.transport.IOutgoingTransport;

/**
* @author awilcox
*
*/

public class PullServlet extends AbstractServlet {

private static final Log logger = LogFactory.getLog(PullServlet.class);
Expand Down

0 comments on commit 7c2d507

Please sign in to comment.