diff --git a/symmetric/src/changes/changes.xml b/symmetric/src/changes/changes.xml index 624b3f1336..6933dd4de5 100644 --- a/symmetric/src/changes/changes.xml +++ b/symmetric/src/changes/changes.xml @@ -29,7 +29,10 @@ Remove extra comma in mssql.xml trigger text. - + + + Multi-row updates don't work for the Sql Server Dialect. + Bug in selectDataIdSql. data_id was not qualified. diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/AbstractDbDialect.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/AbstractDbDialect.java index ac04150bdd..41a805f8c4 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/AbstractDbDialect.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/AbstractDbDialect.java @@ -842,6 +842,10 @@ public boolean isBlobSyncSupported() { public boolean isClobSyncSupported() { return true; } + + public boolean isTransactionIdOverrideSupported() { + return true; + } public void setSqlTemplate(SqlTemplate sqlTemplate) { this.sqlTemplate = sqlTemplate; diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/IDbDialect.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/IDbDialect.java index bf6b8c5333..6d16a91716 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/IDbDialect.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/IDbDialect.java @@ -151,6 +151,12 @@ public void initTrigger(DataEventType dml, Trigger config, public boolean isClobSyncSupported(); + /** + * An indicator as to whether the ability to override the default transaction id provided by the + * dialect can be overridden in the trigger configuration. + */ + public boolean isTransactionIdOverrideSupported(); + public void createTables(String xml); public String getSelectLastInsertIdSql(String sequenceName); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java index 1647cf789c..46a512ad52 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java @@ -176,8 +176,11 @@ public String replaceTemplateVariables(IDbDialect dialect, DataEventType dml, Tr ddl = replace("targetGroupId", trigger.getTargetGroupId(), ddl); ddl = replace("channelName", trigger.getChannelId(), ddl); ddl = replace("triggerHistoryId", Integer.toString(history.getTriggerHistoryId()), ddl); - ddl = replace("txIdExpression", trigger.getTxIdExpression() == null ? dialect.getTransactionTriggerExpression() - : trigger.getTxIdExpression(), ddl); + String triggerExpression = dialect.getTransactionTriggerExpression(); + if (dialect.isTransactionIdOverrideSupported() && trigger.getTxIdExpression() != null) { + triggerExpression = trigger.getTxIdExpression(); + } + ddl = replace("txIdExpression", triggerExpression, ddl); ddl = replace("nodeSelectWhere", trigger.getNodeSelect(), ddl); ddl = replace("nodeSelectWhereEscaped", replace("'", "''", trigger.getNodeSelect()), ddl); ddl = replace("syncOnInsertCondition", trigger.getSyncOnInsertCondition(), ddl); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlDbDialect.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlDbDialect.java index 71e2bd6fe0..835a4c5990 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlDbDialect.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlDbDialect.java @@ -143,6 +143,10 @@ public boolean isCharSpacePadded() { public boolean isCharSpaceTrimmed() { return false; } + + public boolean isTransactionIdOverrideSupported() { + return false; + } /** * SQL Server pads an empty string with spaces. diff --git a/symmetric/src/main/resources/dialects/mssql.xml b/symmetric/src/main/resources/dialects/mssql.xml index 53568ba298..667d82e5bc 100644 --- a/symmetric/src/main/resources/dialects/mssql.xml +++ b/symmetric/src/main/resources/dialects/mssql.xml @@ -78,22 +78,31 @@ begin declare @TransactionId varchar(1000) declare @SyncEnabled varbinary(128) + declare @DataRow varchar(8000) if (@@TRANCOUNT > 0) begin execute sp_getbindtoken @TransactionId output; end $(syncOnIncomingBatchCondition) if (@SyncEnabled <> 0x1) begin + declare DataCursor cursor local for $(if:containsBlobClobColumns) - insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, create_time) - (select '$(targetTableName)','I', $(triggerHistoryId), $(columns), current_timestamp from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) where $(syncOnInsertCondition)); + select $(columns) from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) where $(syncOnInsertCondition) $(else:containsBlobClobColumns) - insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, create_time) - (select '$(targetTableName)','I', $(triggerHistoryId), $(columns), current_timestamp from inserted where $(syncOnInsertCondition)); - $(end:containsBlobClobColumns) - if (@@ROWCOUNT > 0) begin - insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where - c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)); - end + select $(columns) from inserted where $(syncOnInsertCondition) + $(end:containsBlobClobColumns) + open DataCursor + fetch next from DataCursor into @DataRow + while @@FETCH_STATUS = 0 begin + insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, create_time) + values('$(targetTableName)','I', $(triggerHistoryId), @DataRow, current_timestamp) + if (@@ROWCOUNT > 0) begin + insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where + c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)) + end + fetch next from DataCursor into @DataRow + end + close DataCursor + deallocate DataCursor end end ]]> @@ -106,29 +115,34 @@ begin declare @TransactionId varchar(1000) declare @SyncEnabled varbinary(128) + declare @DataRow varchar(8000) + declare @OldPk varchar(2000) if (@@TRANCOUNT > 0) begin execute sp_getbindtoken @TransactionId output; end $(syncOnIncomingBatchCondition) if (@SyncEnabled <> 0x1) begin + declare DataCursor cursor local for $(if:containsBlobClobColumns) - insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, create_time) - (select '$(targetTableName)','U', $(triggerHistoryId), $(columns), $(oldKeys), current_timestamp from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition)); + select $(columns), $(oldKeys) from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition) $(else:containsBlobClobColumns) - insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, create_time) - (select '$(targetTableName)','U', $(triggerHistoryId), $(columns), $(oldKeys), current_timestamp from inserted inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition)); + select $(columns), $(oldKeys) from inserted inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition) $(end:containsBlobClobColumns) - if (@@ROWCOUNT > 0) begin - if (@@ROWCOUNT = 1) begin - declare @OldKeys varchar(1000) - select @OldKeys=$(oldKeys) from deleted; - update $(defaultSchema)$(prefixName)_data set pk_data=@OldKeys where data_id=@@IDENTITY; - end - insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where - c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)); + open DataCursor + fetch next from DataCursor into @DataRow, @OldPk + while @@FETCH_STATUS = 0 begin + insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, create_time) + values('$(targetTableName)','U', $(triggerHistoryId), @DataRow, @OldPk, current_timestamp) + if (@@ROWCOUNT > 0) begin + insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where + c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)) + end + fetch next from DataCursor into @DataRow, @OldPk + end + close DataCursor + deallocate DataCursor end - end - end + end ]]> @@ -139,17 +153,27 @@ begin declare @TransactionId varchar(1000) declare @SyncEnabled varbinary(128) + declare @OldPk varchar(2000) if (@@TRANCOUNT > 0) begin execute sp_getbindtoken @TransactionId output; end $(syncOnIncomingBatchCondition) if (@SyncEnabled <> 0x1) begin - insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, create_time) - (select '$(targetTableName)','D', $(triggerHistoryId), $(oldKeys), current_timestamp from deleted where $(syncOnDeleteCondition)); - if (@@ROWCOUNT > 0) begin - insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where - c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)); - end + declare DataCursor cursor local for + select $(oldKeys) from deleted where $(syncOnDeleteCondition) + open DataCursor + fetch next from DataCursor into @OldPk + while @@FETCH_STATUS = 0 begin + insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, create_time) + values('$(targetTableName)','D', $(triggerHistoryId), @OldPk, current_timestamp) + if (@@ROWCOUNT > 0) begin + insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where + c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)) + end + fetch next from DataCursor into @OldPk + end + close DataCursor + deallocate DataCursor end end ]]> diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java index 1223709b73..c84445159f 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/IntegrationTest.java @@ -11,8 +11,10 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.time.DateUtils; +import org.apache.ddlutils.model.Table; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.TestConstants; +import org.jumpmind.symmetric.db.IDbDialect; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IOutgoingBatchService; @@ -60,7 +62,8 @@ public String getTestName() { } } - @Test(testName = "Integration Test", groups = "continuous", timeOut = 120000) + //@Test(testName = "Integration Test", groups = "continuous", timeOut = 120000) + @Test(testName = "Integration Test", groups = "continuous") public void testLifecycle() { try { init(); @@ -102,11 +105,12 @@ protected void register() { statMgr.flush(); } - protected void initialLoad() { + protected void initialLoad() { + IDbDialect rootDialect = getRootDbDialect(); rootJdbcTemplate.update(insertCustomerSql, new Object[] { 301, "Linus", "1", "42 Blanket Street", - "Santa Claus", "IN", 90009, new Date(), "This is a test", BINARY_DATA }); - rootJdbcTemplate.update(insertTestTriggerTableSql, new Object[] {1, "wow", "mom"}); - rootJdbcTemplate.update(insertTestTriggerTableSql, new Object[] {2, "mom", "wow"}); + "Santa Claus", "IN", 90009, new Date(), "This is a test", BINARY_DATA }); + insertIntoTestTriggerTable(rootDialect, new Object[] {1, "wow", "mom"}); + insertIntoTestTriggerTable(rootDialect, new Object[] {2, "mom", "wow"}); INodeService nodeService = (INodeService) getRootEngine().getApplicationContext().getBean( Constants.NODE_SERVICE); String nodeId = nodeService.findNodeByExternalId(TestConstants.TEST_CLIENT_NODE_GROUP, @@ -121,6 +125,16 @@ protected void initialLoad() { Assert.assertEquals(clientJdbcTemplate.queryForInt("select count(*) from sym_node_security where initial_load_enabled=1"), 0, "Initial load was not successful according to the client"); Assert.assertEquals(rootJdbcTemplate.queryForInt("select count(*) from sym_node_security where initial_load_enabled=1"), 0, "Initial load was not successful accordign to the root"); } + + private void insertIntoTestTriggerTable(IDbDialect dialect, Object[] values) { + Table testTriggerTable = dialect.getMetaDataFor(null, null, "test_triggers_table", true); + try { + dialect.prepareTableForDataLoad(testTriggerTable); + dialect.getJdbcTemplate().update(insertTestTriggerTableSql, values); + } finally { + dialect.cleanupAfterDataLoad(testTriggerTable); + } + } protected void testSyncToClient() { // test pulling no data @@ -151,13 +165,15 @@ protected void testSyncToClient() { protected void testSyncToRootAutoGeneratedPrimaryKey() { final String NEW_VALUE = "unique new value one value"; - clientJdbcTemplate.update(insertTestTriggerTableSql, new Object[] { 3, "value one", "value \" two" }); + IDbDialect clientDialect = getClientDbDialect(); + insertIntoTestTriggerTable(clientDialect, new Object[] { 3, "value one", "value \" two"}); getClientEngine().push(); clientJdbcTemplate.update(updateTestTriggerTableSql, new Object[] { NEW_VALUE }); getClientEngine().push(); - Assert.assertEquals(rootJdbcTemplate.queryForInt( - "select count(*) from test_triggers_table where string_one_value=?", new Object[] { NEW_VALUE }), 3, - "The update on test_triggers_table did not work."); + int syncCount = rootJdbcTemplate.queryForInt( + "select count(*) from test_triggers_table where string_one_value=?", new Object[] { NEW_VALUE }); + Assert.assertEquals(syncCount, 3, + syncCount + " of the rows were updated"); } protected void testSyncToRoot() throws ParseException { @@ -261,11 +277,26 @@ protected void testHeartbeat() throws Exception { "The client node was not sync'd to the root as expected."); } - protected void testVirtualTransactionId() { - rootJdbcTemplate.update("insert into test_very_long_table_name_1234 values('42')"); - Assert.assertEquals(rootJdbcTemplate.queryForObject("select transaction_id from sym_data_event where data_id in (select max(data_id) from sym_data)", String.class), "42", "The hardcoded transaction id was not found."); - Assert.assertEquals(rootJdbcTemplate.update("delete from test_very_long_table_name_1234 where id='42'"), 1); - Assert.assertEquals(rootJdbcTemplate.queryForObject("select transaction_id from sym_data_event where data_id in (select max(data_id) from sym_data)", String.class), "42", "The hardcoded transaction id was not found."); + protected void testVirtualTransactionId() { + rootJdbcTemplate.update("insert into test_very_long_table_name_1234 values('42')"); + if (getRootDbDialect().isTransactionIdOverrideSupported()) { + Assert + .assertEquals( + rootJdbcTemplate + .queryForObject( + "select transaction_id from sym_data_event where data_id in (select max(data_id) from sym_data)", + String.class), "42", + "The hardcoded transaction id was not found."); + Assert.assertEquals(rootJdbcTemplate + .update("delete from test_very_long_table_name_1234 where id='42'"), 1); + Assert + .assertEquals( + rootJdbcTemplate + .queryForObject( + "select transaction_id from sym_data_event where data_id in (select max(data_id) from sym_data)", + String.class), "42", + "The hardcoded transaction id was not found."); + } } protected void testCaseSensitiveTableNames() {