diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricEngine.java b/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricEngine.java index 80624a2f03..1732b1165e 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricEngine.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricEngine.java @@ -34,6 +34,7 @@ import org.jumpmind.symmetric.job.PushJob; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.service.IBootstrapService; +import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IPullService; import org.jumpmind.symmetric.service.IPurgeService; @@ -70,6 +71,8 @@ public class SymmetricEngine { private IRegistrationService registrationService; private IPurgeService purgeService; + + private IDataService dataService; private boolean started = false; @@ -130,7 +133,8 @@ private void init(ApplicationContext applicationContext) { registrationService = (IRegistrationService) applicationContext .getBean(Constants.REGISTRATION_SERVICE); purgeService = (IPurgeService) applicationContext - .getBean(Constants.PURGE_SERVICE); + .getBean(Constants.PURGE_SERVICE); + dataService = (IDataService)applicationContext.getBean(Constants.DATA_SERVICE); dbDialect = (IDbDialect)applicationContext.getBean(Constants.DB_DIALECT); registerEngine(); logger.info("Initialized SymmetricDS externalId=" + runtimeConfig.getExternalId() + " version=" + Version.VERSION + " database="+dbDialect.getName()); @@ -213,6 +217,13 @@ public synchronized void start() { } } + /** + * Queue up an initial load or a reload to a node. + */ + public void reloadNode(String nodeId) { + dataService.reloadNode(nodeId); + } + /** * This can be called if the push job has not been enabled. It will perform a push * the same way the {@link PushJob} would have. diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/AbstractDbDialect.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/AbstractDbDialect.java index 36bd8ef1fc..7756cf69d0 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/AbstractDbDialect.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/AbstractDbDialect.java @@ -114,10 +114,10 @@ public IColumnFilter getDatabaseColumnFilter() { return null; } - public void prepareTableForInserts(Table table) { + public void prepareTableForDataLoad(Table table) { } - public void cleanupAfterInserts(Table table) { + public void cleanupAfterDataLoad(Table table) { } protected boolean allowsNullForIdentityColumn() { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/IDbDialect.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/IDbDialect.java index 350214b420..9125572dbd 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/IDbDialect.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/IDbDialect.java @@ -45,10 +45,18 @@ public void initTrigger(DataEventType dml, Trigger config, public String getEngineName(); public void removeTrigger(String schemaName, String triggerName, String tableName); - - public void prepareTableForInserts(Table table); - - public void cleanupAfterInserts(Table table); + + /** + * This is called by the data loader each time the table context changes, giving the dialect an opportunity to do any pre loading work. Only one + * table is active at any one point. + */ + public void prepareTableForDataLoad(Table table); + + /** + * This is called by the data loader each time the table context changes away from a table or when the the data loader is closed, giving the dialect + * an opportunity to do any post loading work for the given table. + */ + public void cleanupAfterDataLoad(Table table); public void initConfigDb(String tablePrefix); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java index 8d72c95569..734533bad7 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java @@ -35,6 +35,8 @@ public class SqlTemplate { + private static final String ORIG_TABLE_ALIAS = "orig"; + static final String INSERT_TRIGGER_TEMPLATE = "insertTriggerTemplate"; static final String UPDATE_TRIGGER_TEMPLATE = "updateTriggerTemplate"; @@ -78,7 +80,7 @@ public String createInitalLoadSql(Node node, IDbDialect dialect, Trigger trig, T sql = replace("externalId", node.getExternalId(), sql); Column[] columns = trig.orderColumnsForTable(metaData); - String columnsText = buildColumnString("t", columns); + String columnsText = buildColumnString("t", "t", columns); sql = replace("columns", columnsText, sql); return sql; } @@ -99,7 +101,7 @@ public String createCsvDataSql(Trigger trig, Table metaData, String whereClause) sql = replace("whereClause", whereClause, sql); Column[] columns = trig.orderColumnsForTable(metaData); - String columnsText = buildColumnString("t", columns); + String columnsText = buildColumnString("t", "t", columns); sql = replace("columns", columnsText, sql); return sql; } @@ -111,7 +113,7 @@ public String createCsvPrimaryKeySql(Trigger trig, Table metaData, String whereC sql = replace("whereClause", whereClause, sql); Column[] columns = metaData.getPrimaryKeyColumns(); - String columnsText = buildColumnString("t", columns); + String columnsText = buildColumnString("t", "t", columns); sql = replace("columns", columnsText, sql); return sql; } @@ -177,17 +179,18 @@ private String replaceTemplateVariables(IDbDialect dialect, DataEventType dml, T ddl = replace("syncOnDeleteCondition", trigger.getSyncOnDeleteCondition(), ddl); ddl = replace("syncOnIncomingBatchCondition", trigger.isSyncOnIncomingBatch() ? "1=1" : dialect .getSyncTriggersExpression(), ddl); + ddl = replace("origTableAlias", ORIG_TABLE_ALIAS, ddl); Column[] columns = trigger.orderColumnsForTable(metaData); - String columnsText = buildColumnString(newTriggerValue, columns); + String columnsText = buildColumnString(ORIG_TABLE_ALIAS, newTriggerValue, columns); ddl = replace("columns", columnsText, ddl); ddl = eval(containsBlobClobColumns(columns), "containsBlobClobColumns", ddl); columns = metaData.getPrimaryKeyColumns(); - columnsText = buildColumnString(oldTriggerValue, columns); + columnsText = buildColumnString(ORIG_TABLE_ALIAS, oldTriggerValue, columns); ddl = replace("oldKeys", columnsText, ddl); ddl = replace("oldNewPrimaryKeyJoin", aliasedPrimaryKeyJoin(oldTriggerValue, newTriggerValue, columns), ddl); - ddl = replace("tableNewPrimaryKeyJoin", aliasedPrimaryKeyJoin("orig", newTriggerValue, columns), ddl); + ddl = replace("tableNewPrimaryKeyJoin", aliasedPrimaryKeyJoin(ORIG_TABLE_ALIAS, newTriggerValue, columns), ddl); // replace $(newTriggerValue) and $(oldTriggerValue) ddl = replace("newTriggerValue", newTriggerValue, ddl); @@ -256,7 +259,7 @@ private String aliasedPrimaryKeyJoin(String aliasOne, String aliasTwo, Column[] return b.toString(); } - private String buildColumnString(String tableAlias, Column[] columns) { + private String buildColumnString(String origTableAlias, String tableAlias, Column[] columns) { String columnsText = ""; for (Column column : columns) { String templateToUse = null; @@ -319,6 +322,7 @@ private String buildColumnString(String tableAlias, Column[] columns) { columnsText = columnsText.substring(0, columnsText.length() - LAST_COMMAN_TOKEN.length()); } + columnsText = replace("origTableAlias", origTableAlias, columnsText); return replace("tableAlias", tableAlias, columnsText); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlDbDialect.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlDbDialect.java index e9102f69ce..71e2bd6fe0 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlDbDialect.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlDbDialect.java @@ -85,14 +85,14 @@ public Object[] filterColumnsValues(DmlType dml, Table table, Object[] columnVal } @Override - public void prepareTableForInserts(Table table) { + public void prepareTableForDataLoad(Table table) { if (table != null && table.getAutoIncrementColumns().length > 0) { jdbcTemplate.execute("SET IDENTITY_INSERT " + table.getName() + " ON"); } } @Override - public void cleanupAfterInserts(Table table) { + public void cleanupAfterDataLoad(Table table) { if (table != null && table.getAutoIncrementColumns().length > 0) { jdbcTemplate.execute("SET IDENTITY_INSERT " + table.getName() + " OFF"); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/DataExtractorContext.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/DataExtractorContext.java index f5478923e9..837c0ef941 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/DataExtractorContext.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/DataExtractorContext.java @@ -30,8 +30,10 @@ public class DataExtractorContext implements Cloneable { private List auditRecordsWritten = new ArrayList(); private String lastTableName; private OutgoingBatch batch; + private IDataExtractor dataExtractor; - public DataExtractorContext copy() { + public DataExtractorContext copy(IDataExtractor extractor) { + this.dataExtractor = extractor; DataExtractorContext newVersion; try { newVersion = (DataExtractorContext)super.clone(); @@ -62,4 +64,8 @@ public void setBatch(OutgoingBatch batch) { this.batch = batch; } + public IDataExtractor getDataExtractor() { + return dataExtractor; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamReloadDataCommand.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamReloadDataCommand.java index abc1b8161b..34732b81ed 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamReloadDataCommand.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamReloadDataCommand.java @@ -44,7 +44,7 @@ class StreamReloadDataCommand extends AbstractStreamDataCommand { public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException { Trigger trigger = configurationService.getTriggerById(data.getAudit().getTriggerId()); Node node = nodeService.findNode(context.getBatch().getNodeId()); - dataExtractorService.extractInitialLoadWithinBatchFor(node, trigger, new InternalOutgoingTransport(out)); + dataExtractorService.extractInitialLoadWithinBatchFor(node, trigger, new InternalOutgoingTransport(out), context); out.flush(); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java b/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java index 373ac37578..025228ab4b 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java @@ -169,12 +169,23 @@ protected boolean isMetaTokenParsed(String[] tokens) { } protected void setTable(String tableName, boolean useCache) { + + cleanupAfterDataLoad(); + context.setTableName(tableName); + if (!useCache || context.getTableTemplate() == null) { context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, tableName, - this.columnFilters != null ? this.columnFilters.get(tableName) : null)); + this.columnFilters != null ? this.columnFilters.get(tableName) : null)); + } + + dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable()); + } + + protected void cleanupAfterDataLoad() { + if (context.getTableName() != null) { + dbDialect.cleanupAfterDataLoad(context.getTableTemplate().getTable()); } - dbDialect.prepareTableForInserts(context.getTableTemplate().getTable()); } protected int insert(String[] tokens, BinaryEncoding encoding) { @@ -322,15 +333,13 @@ public IDataLoader clone() { } public void close() { + + cleanupAfterDataLoad(); + if (csvReader != null) { csvReader.close(); } - if (context != null) { - Table[] tables = context.getAllTablesProcessed(); - for (Table table : tables) { - dbDialect.cleanupAfterInserts(table); - } - } + } public IDataLoaderContext getContext() { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java index d5665ea37b..e7502ef4fc 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java @@ -21,6 +21,7 @@ package org.jumpmind.symmetric.service; +import org.jumpmind.symmetric.extract.DataExtractorContext; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.Trigger; @@ -33,7 +34,7 @@ public interface IDataExtractorService { public OutgoingBatch extractInitialLoadFor(Node node, Trigger config, IOutgoingTransport transport); public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, - final IOutgoingTransport transport); + final IOutgoingTransport transport, DataExtractorContext ctx); /** * @return true if work was done or false if there was no work to do. diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 8bd84adb98..b9886b5392 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -80,9 +80,9 @@ public void extractNodeIdentityFor(Node node, IOutgoingTransport transport) { outgoingBatchService.insertOutgoingBatch(batch); try { - BufferedWriter writer = transport.open(); - DataExtractorContext ctxCopy = context.copy(); + BufferedWriter writer = transport.open(); IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion()); + DataExtractorContext ctxCopy = context.copy(dataExtractor); dataExtractor.init(writer, ctxCopy); dataExtractor.begin(batch, writer); TriggerHistory audit = new TriggerHistory(tableName, "node_id", "node_id"); @@ -110,22 +110,21 @@ public OutgoingBatch extractInitialLoadFor(Node node, final Trigger trigger, fin OutgoingBatch batch = new OutgoingBatch(node, trigger.getChannelId(), BatchType.INITIAL_LOAD); outgoingBatchService.insertOutgoingBatch(batch); - writeInitialLoad(node, trigger, transport, batch); + writeInitialLoad(node, trigger, transport, batch, null); outgoingBatchService.markOutgoingBatchSent(batch); return batch; } - public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, final IOutgoingTransport transport) { - - writeInitialLoad(node, trigger, transport, null); + public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, final IOutgoingTransport transport, DataExtractorContext ctx) { + writeInitialLoad(node, trigger, transport, null, ctx); } protected void writeInitialLoad(Node node, final Trigger trigger, final IOutgoingTransport transport, - final OutgoingBatch batch) { + final OutgoingBatch batch, final DataExtractorContext ctx) { final String sql = dbDialect.createInitalLoadSqlFor(node, trigger); final TriggerHistory audit = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId()); - final IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion()); + final IDataExtractor dataExtractor = ctx != null ? ctx.getDataExtractor() : getDataExtractor(node.getSymmetricVersion()); jdbcTemplate.execute(new ConnectionCallback() { public Object doInConnection(Connection conn) throws SQLException, DataAccessException { @@ -134,8 +133,8 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc java.sql.ResultSet.CONCUR_READ_ONLY); st.setFetchSize(dbDialect.getStreamingResultsFetchSize()); ResultSet rs = st.executeQuery(); - final BufferedWriter writer = transport.open(); - final DataExtractorContext ctxCopy = context.copy(); + final BufferedWriter writer = transport.open(); + final DataExtractorContext ctxCopy = ctx == null ? context.copy(dataExtractor) : ctx; if (batch != null) { dataExtractor.init(writer, ctxCopy); dataExtractor.begin(batch, writer); @@ -322,7 +321,7 @@ public void endBatch(OutgoingBatch batch) throws Exception { public void init() throws Exception { this.writer = transport.open(); - this.context = DataExtractorService.this.context.copy(); + this.context = DataExtractorService.this.context.copy(dataExtractor); dataExtractor.init(writer, context); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java index 4d07a0c4b1..3793ccd6eb 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java @@ -39,10 +39,13 @@ import org.jumpmind.symmetric.model.BatchInfo; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.IncomingBatchHistory; +import org.jumpmind.symmetric.model.NodeSecurity; import org.jumpmind.symmetric.model.IncomingBatchHistory.Status; import org.jumpmind.symmetric.service.IAcknowledgeService; import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.IDataLoaderService; +import org.jumpmind.symmetric.service.IDataService; +import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IRegistrationService; import org.jumpmind.symmetric.transport.AbstractTransportManager; import org.jumpmind.symmetric.transport.IIncomingTransport; @@ -74,6 +77,12 @@ public IIncomingTransport getPullTransport(final Node remote, final Node local) runAtClient(remote.getSyncURL(), null, respOs, new IClientRunnable() { public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception { + // TODO this is duplicated from the Pull Servlet. It should be consolidated somehow! + INodeService nodeService = (INodeService)factory.getBean(Constants.NODE_SERVICE); + NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId()); + if (security.isInitialLoadEnabled()) { + ((IDataService)factory.getBean(Constants.DATA_SERVICE)).insertReloadEvent(local); + } IDataExtractorService extractor = (IDataExtractorService) factory .getBean(Constants.DATAEXTRACTOR_SERVICE); IOutgoingTransport transport = new InternalOutgoingTransport( diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/PullServlet.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/PullServlet.java index 2df4b17f93..27dbf6f53d 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/PullServlet.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/PullServlet.java @@ -36,10 +36,7 @@ import org.jumpmind.symmetric.service.IRegistrationService; import org.jumpmind.symmetric.transport.IOutgoingTransport; -/** - * @author awilcox - * - */ + public class PullServlet extends AbstractServlet { private static final Log logger = LogFactory.getLog(PullServlet.class); diff --git a/symmetric/src/main/resources/dialects/mssql.xml b/symmetric/src/main/resources/dialects/mssql.xml index a03ddec620..971b756b31 100644 --- a/symmetric/src/main/resources/dialects/mssql.xml +++ b/symmetric/src/main/resources/dialects/mssql.xml @@ -44,15 +44,15 @@ - + - + - + @@ -85,7 +85,7 @@ if (@SyncEnabled <> 0x1) begin $(if:containsBlobClobColumns) insert into $(defaultSchema)$(prefixName)_data (table_name, channel_id, event_type, trigger_hist_id, transaction_id, row_data, create_time) - (select '$(targetTableName)','$(channelName)','I', $(triggerHistoryId), $(txIdExpression), $(columns), current_timestamp from inserted inner join $(schemaName)$(tableName) orig on $(tableNewPrimaryKeyJoin) where $(syncOnInsertCondition)); + (select '$(targetTableName)','$(channelName)','I', $(triggerHistoryId), $(txIdExpression), $(columns), current_timestamp from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) where $(syncOnInsertCondition)); $(else:containsBlobClobColumns) insert into $(defaultSchema)$(prefixName)_data (table_name, channel_id, event_type, trigger_hist_id, transaction_id, row_data, create_time) (select '$(targetTableName)','$(channelName)','I', $(triggerHistoryId), $(txIdExpression), $(columns), current_timestamp from inserted where $(syncOnInsertCondition)); @@ -113,7 +113,7 @@ if (@SyncEnabled <> 0x1) begin $(if:containsBlobClobColumns) insert into $(defaultSchema)$(prefixName)_data (table_name, channel_id, event_type, trigger_hist_id, transaction_id, row_data, pk_data, create_time) - (select '$(targetTableName)','$(channelName)','U', $(triggerHistoryId), $(txIdExpression), $(columns), $(oldKeys), current_timestamp from inserted inner join $(schemaName)$(tableName) orig on $(tableNewPrimaryKeyJoin) inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition)); + (select '$(targetTableName)','$(channelName)','U', $(triggerHistoryId), $(txIdExpression), $(columns), $(oldKeys), current_timestamp from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition)); $(else:containsBlobClobColumns) insert into $(defaultSchema)$(prefixName)_data (table_name, channel_id, event_type, trigger_hist_id, transaction_id, row_data, pk_data, create_time) (select '$(targetTableName)','$(channelName)','U', $(triggerHistoryId), $(txIdExpression), $(columns), $(oldKeys), current_timestamp from inserted inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition)); diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java index d2e09f2a67..097233cf8f 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java @@ -46,19 +46,19 @@ public class IntegrationTest extends AbstractIntegrationTest implements ITest { public String getTestName() { try { - return "Test from " + getRootDatabaseName() + " to " - + getClientDatabaseName(); + return "Test from " + getRootDatabaseName() + " to " + getClientDatabaseName(); } catch (RuntimeException ex) { logger.error(ex, ex); throw ex; } } - @Test(testName = "Integration Test", groups = "continuous", timeOut = 60000) + @Test(testName = "Integration Test", groups = "continuous", timeOut = 120000) public void testLifecycle() { try { init(); register(); + initialLoad(); testSyncToClient(); testSyncToRootAutoGeneratedPrimaryKey(); testSyncToRoot(); @@ -76,21 +76,27 @@ public void testLifecycle() { protected void init() { BeanFactory rootBeanFactory = getRootEngine().getApplicationContext(); - rootJdbcTemplate = new JdbcTemplate((DataSource) rootBeanFactory - .getBean(Constants.DATA_SOURCE)); + rootJdbcTemplate = new JdbcTemplate((DataSource) rootBeanFactory.getBean(Constants.DATA_SOURCE)); - BeanFactory clientBeanFactory = getClientEngine() - .getApplicationContext(); - clientJdbcTemplate = new JdbcTemplate((DataSource) clientBeanFactory - .getBean(Constants.DATA_SOURCE)); + BeanFactory clientBeanFactory = getClientEngine().getApplicationContext(); + clientJdbcTemplate = new JdbcTemplate((DataSource) clientBeanFactory.getBean(Constants.DATA_SOURCE)); } protected void register() { - getRootEngine().openRegistration(TestConstants.TEST_CLIENT_NODE_GROUP, - TestConstants.TEST_CLIENT_EXTERNAL_ID); + getRootEngine().openRegistration(TestConstants.TEST_CLIENT_NODE_GROUP, TestConstants.TEST_CLIENT_EXTERNAL_ID); getClientEngine().start(); - Assert.assertTrue(getClientEngine().isRegistered(), - "The client did not register."); + Assert.assertTrue(getClientEngine().isRegistered(), "The client did not register."); + } + + protected void initialLoad() { + INodeService nodeService = (INodeService) getRootEngine().getApplicationContext().getBean( + Constants.NODE_SERVICE); + getRootEngine().reloadNode( + nodeService.findNodeByExternalId(TestConstants.TEST_CLIENT_NODE_GROUP, + TestConstants.TEST_CLIENT_EXTERNAL_ID).getNodeId()); + getClientEngine().pull(); + + // TODO - need to add validation here } protected void testSyncToClient() { @@ -98,145 +104,98 @@ protected void testSyncToClient() { getClientEngine().pull(); // now change some data that should be sync'd - rootJdbcTemplate.update(insertCustomerSql, new Object[] { 101, - "Charlie Brown", "1", "300 Grub Street", "New Yorl", "NY", - 90009, new Date(), "This is a test", BINARY_DATA }); + rootJdbcTemplate.update(insertCustomerSql, new Object[] { 101, "Charlie Brown", "1", "300 Grub Street", + "New Yorl", "NY", 90009, new Date(), "This is a test", BINARY_DATA }); getClientEngine().pull(); - Assert - .assertEquals( - clientJdbcTemplate - .queryForInt("select count(*) from test_customer where customer_id=101"), - 1, "The customer was not sync'd to the client."); + Assert.assertEquals(clientJdbcTemplate.queryForInt("select count(*) from test_customer where customer_id=101"), + 1, "The customer was not sync'd to the client."); if (getRootDbDialect().isClobSyncSupported()) { - Assert - .assertEquals( - clientJdbcTemplate - .queryForObject( - "select notes from test_customer where customer_id=101", - String.class), "This is a test", - "The CLOB notes field on customer was not sync'd to the client."); + Assert.assertEquals(clientJdbcTemplate.queryForObject( + "select notes from test_customer where customer_id=101", String.class), "This is a test", + "The CLOB notes field on customer was not sync'd to the client."); } if (getRootDbDialect().isBlobSyncSupported()) { - Assert - .assertTrue( - ArrayUtils - .isEquals( - clientJdbcTemplate - .queryForObject( - "select icon from test_customer where customer_id=101", - byte[].class), - BINARY_DATA), - "The BLOB icon field on customer was not sync'd to the client."); + Assert.assertTrue(ArrayUtils.isEquals(clientJdbcTemplate.queryForObject( + "select icon from test_customer where customer_id=101", byte[].class), BINARY_DATA), + "The BLOB icon field on customer was not sync'd to the client."); } } protected void testSyncToRootAutoGeneratedPrimaryKey() { final String NEW_VALUE = "unique new value one value"; - clientJdbcTemplate.update(insertTestTriggerTableSql, new Object[] { - "value one", "value \" two" }); + clientJdbcTemplate.update(insertTestTriggerTableSql, new Object[] { "value one", "value \" two" }); getClientEngine().push(); - clientJdbcTemplate.update(updateTestTriggerTableSql, - new Object[] { NEW_VALUE }); + clientJdbcTemplate.update(updateTestTriggerTableSql, new Object[] { NEW_VALUE }); getClientEngine().push(); - Assert - .assertEquals( - rootJdbcTemplate - .queryForInt( - "select count(*) from test_triggers_table where string_one_value=?", - new Object[] { NEW_VALUE }), 1, - "The update on test_triggers_table did not work."); + Assert.assertEquals(rootJdbcTemplate.queryForInt( + "select count(*) from test_triggers_table where string_one_value=?", new Object[] { NEW_VALUE }), 1, + "The update on test_triggers_table did not work."); } protected void testSyncToRoot() throws ParseException { - Date date = DateUtils.parseDate("2007-01-03", - new String[] { "yyyy-MM-dd" }); - clientJdbcTemplate.update(insertOrderHeaderSql, new Object[] { "10", - 100, null, date }, new int[] { Types.VARCHAR, Types.INTEGER, - Types.CHAR, Types.DATE }); - clientJdbcTemplate.update(insertOrderDetailSql, new Object[] { "10", 1, - "STK", "110000065", 3, 3.33 }); + Date date = DateUtils.parseDate("2007-01-03", new String[] { "yyyy-MM-dd" }); + clientJdbcTemplate.update(insertOrderHeaderSql, new Object[] { "10", 100, null, date }, new int[] { + Types.VARCHAR, Types.INTEGER, Types.CHAR, Types.DATE }); + clientJdbcTemplate.update(insertOrderDetailSql, new Object[] { "10", 1, "STK", "110000065", 3, 3.33 }); getClientEngine().push(); } protected void testSyncInsertCondition() throws ParseException { // Should not sync when status = null - Date date = DateUtils.parseDate("2007-01-02", - new String[] { "yyyy-MM-dd" }); - rootJdbcTemplate.update(insertOrderHeaderSql, new Object[] { "11", 100, - null, date }, new int[] { Types.VARCHAR, Types.INTEGER, - Types.CHAR, Types.DATE }); + Date date = DateUtils.parseDate("2007-01-02", new String[] { "yyyy-MM-dd" }); + rootJdbcTemplate.update(insertOrderHeaderSql, new Object[] { "11", 100, null, date }, new int[] { + Types.VARCHAR, Types.INTEGER, Types.CHAR, Types.DATE }); getClientEngine().pull(); - IOutgoingBatchService outgoingBatchService = (IOutgoingBatchService) getRootEngine() - .getApplicationContext().getBean( - Constants.OUTGOING_BATCH_SERVICE); - List batches = outgoingBatchService - .getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID); - Assert.assertEquals(batches.size(), 0, - "There should be no outgoing batches, yet I found some."); + IOutgoingBatchService outgoingBatchService = (IOutgoingBatchService) getRootEngine().getApplicationContext() + .getBean(Constants.OUTGOING_BATCH_SERVICE); + List batches = outgoingBatchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID); + Assert.assertEquals(batches.size(), 0, "There should be no outgoing batches, yet I found some."); - Assert.assertEquals(clientJdbcTemplate.queryForList( - selectOrderHeaderSql, new Object[] { "11" }).size(), 0, + Assert.assertEquals(clientJdbcTemplate.queryForList(selectOrderHeaderSql, new Object[] { "11" }).size(), 0, "The order record was sync'd when it should not have been."); // Should sync when status = C - rootJdbcTemplate.update(insertOrderHeaderSql, new Object[] { "12", 100, - "C", date }, new int[] { Types.VARCHAR, Types.INTEGER, - Types.CHAR, Types.DATE }); + rootJdbcTemplate.update(insertOrderHeaderSql, new Object[] { "12", 100, "C", date }, new int[] { Types.VARCHAR, + Types.INTEGER, Types.CHAR, Types.DATE }); getClientEngine().pull(); - Assert.assertEquals(clientJdbcTemplate.queryForList( - selectOrderHeaderSql, new Object[] { "12" }).size(), 1, + Assert.assertEquals(clientJdbcTemplate.queryForList(selectOrderHeaderSql, new Object[] { "12" }).size(), 1, "The order record was not sync'd when it should have been."); // TODO: make sure event did not fire } @SuppressWarnings("unchecked") protected void testSyncUpdateCondition() { - rootJdbcTemplate.update(updateOrderHeaderStatusSql, new Object[] { - null, "1" }); + rootJdbcTemplate.update(updateOrderHeaderStatusSql, new Object[] { null, "1" }); getClientEngine().pull(); - Assert.assertEquals(clientJdbcTemplate.queryForList( - selectOrderHeaderSql, new Object[] { "1" }).size(), 0, + Assert.assertEquals(clientJdbcTemplate.queryForList(selectOrderHeaderSql, new Object[] { "1" }).size(), 0, "The order record was sync'd when it should not have been."); - rootJdbcTemplate.update(updateOrderHeaderStatusSql, new Object[] { "C", - "1" }); + rootJdbcTemplate.update(updateOrderHeaderStatusSql, new Object[] { "C", "1" }); getClientEngine().pull(); - List list = clientJdbcTemplate.queryForList(selectOrderHeaderSql, - new Object[] { "1" }); + List list = clientJdbcTemplate.queryForList(selectOrderHeaderSql, new Object[] { "1" }); Assert.assertEquals(list.size(), 1, "The order record should exist."); Map map = (Map) list.get(0); - Assert - .assertEquals(map.get("status"), "C", - "Status should be complete"); + Assert.assertEquals(map.get("status"), "C", "Status should be complete"); // TODO: make sure event did not fire } @SuppressWarnings("unchecked") protected void testIgnoreNodeChannel() { - INodeService nodeService = (INodeService) getRootEngine() - .getApplicationContext().getBean("nodeService"); - nodeService.ignoreNodeChannelForExternalId(true, - TestConstants.TEST_CHANNEL_ID, - TestConstants.TEST_ROOT_NODE_GROUP, - TestConstants.TEST_ROOT_EXTERNAL_ID); - rootJdbcTemplate.update(insertCustomerSql, new Object[] { 201, - "Charlie Dude", "1", "300 Grub Street", "New Yorl", "NY", - 90009, new Date(), "This is a test", BINARY_DATA }); + INodeService nodeService = (INodeService) getRootEngine().getApplicationContext().getBean("nodeService"); + nodeService.ignoreNodeChannelForExternalId(true, TestConstants.TEST_CHANNEL_ID, + TestConstants.TEST_ROOT_NODE_GROUP, TestConstants.TEST_ROOT_EXTERNAL_ID); + rootJdbcTemplate.update(insertCustomerSql, new Object[] { 201, "Charlie Dude", "1", "300 Grub Street", + "New Yorl", "NY", 90009, new Date(), "This is a test", BINARY_DATA }); getClientEngine().pull(); - Assert - .assertEquals( - clientJdbcTemplate - .queryForInt("select count(*) from test_customer where customer_id=201"), - 0, "The customer was sync'd to the client."); - nodeService.ignoreNodeChannelForExternalId(false, - TestConstants.TEST_CHANNEL_ID, - TestConstants.TEST_ROOT_NODE_GROUP, - TestConstants.TEST_ROOT_EXTERNAL_ID); + Assert.assertEquals(clientJdbcTemplate.queryForInt("select count(*) from test_customer where customer_id=201"), + 0, "The customer was sync'd to the client."); + nodeService.ignoreNodeChannelForExternalId(false, TestConstants.TEST_CHANNEL_ID, + TestConstants.TEST_ROOT_NODE_GROUP, TestConstants.TEST_ROOT_EXTERNAL_ID); } @@ -244,13 +203,10 @@ protected void testPurge() throws Exception { Thread.sleep(1000); getRootEngine().purge(); getClientEngine().purge(); - Assert.assertEquals(rootJdbcTemplate - .queryForInt("select count(*) from " - + TestConstants.TEST_PREFIX + "data"), 0, - "Expected all data rows to have been purged."); + Assert.assertEquals(rootJdbcTemplate.queryForInt("select count(*) from " + TestConstants.TEST_PREFIX + "data"), + 0, "Expected all data rows to have been purged."); Assert.assertEquals(clientJdbcTemplate - .queryForInt("select count(*) from " - + TestConstants.TEST_PREFIX + "data"), 0, + .queryForInt("select count(*) from " + TestConstants.TEST_PREFIX + "data"), 0, "Expected all data rows to have been purged."); } @@ -260,11 +216,8 @@ protected void testHeartbeat() throws Exception { Thread.sleep(1000); getClientEngine().heartbeat(); getClientEngine().push(); - Date time = (Date) rootJdbcTemplate.queryForObject( - "select heartbeat_time from " + TestConstants.TEST_PREFIX - + "node where external_id='" - + TestConstants.TEST_CLIENT_EXTERNAL_ID + "'", - Timestamp.class); + Date time = (Date) rootJdbcTemplate.queryForObject("select heartbeat_time from " + TestConstants.TEST_PREFIX + + "node where external_id='" + TestConstants.TEST_CLIENT_EXTERNAL_ID + "'", Timestamp.class); Assert.assertTrue(time != null && time.getTime() > ts, "The client node was not sync'd to the root as expected."); } diff --git a/symmetric/src/test/resources/test-integration-root-setup.sql b/symmetric/src/test/resources/test-integration-root-setup.sql index c2035a432f..7a40bb716b 100644 --- a/symmetric/src/test/resources/test-integration-root-setup.sql +++ b/symmetric/src/test/resources/test-integration-root-setup.sql @@ -26,7 +26,7 @@ values('test_order_header','test-node-group','test-root-group','testchannel', 1, insert into sym_trigger (source_table_name,source_node_group_id,target_node_group_id,channel_id,sync_on_update,sync_on_insert,sync_on_delete,sync_on_update_condition,sync_on_insert_condition,sync_on_delete_condition,initial_load_select,node_select,tx_id_expression,initial_load_order,last_updated_by,last_updated_time,name_for_insert_trigger,create_time) -values('test_order_header','test-root-group','test-node-group','testchannel', 1, 1, 1,'$(newTriggerValue).status = ''C''', '$(newTriggerValue).status = ''C''', null, null, null, null, 1, 'erilong', current_timestamp,null,current_timestamp); +values('test_order_header','test-root-group','test-node-group','testchannel', 1, 1, 1,'$(newTriggerValue).status = ''C''', '$(newTriggerValue).status = ''C''', null, 't.status = ''C''', null, null, 1, 'erilong', current_timestamp,null,current_timestamp); insert into sym_trigger (source_table_name,source_node_group_id,target_node_group_id,channel_id,sync_on_update,sync_on_insert,sync_on_delete,sync_on_update_condition,sync_on_insert_condition,sync_on_delete_condition,initial_load_select,node_select,tx_id_expression,initial_load_order,last_updated_by,last_updated_time,name_for_insert_trigger,create_time)