diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlSymmetricDialect.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlSymmetricDialect.java index 33edd0870e..43e865902b 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlSymmetricDialect.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlSymmetricDialect.java @@ -413,7 +413,8 @@ public boolean needsToSelectLobData() { @Override protected String getDbSpecificDataHasChangedCondition(Trigger trigger) { - return "@OldDataRow is null or @DataRow != @OldDataRow"; + /* gets filled/replaced by trigger template as it will compare by each column */ + return "$(anyNonBlobColumnChanged)"; } } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlTriggerTemplate.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlTriggerTemplate.java index 80df396ff5..7c4f1d0f5a 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlTriggerTemplate.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlTriggerTemplate.java @@ -25,6 +25,7 @@ import java.util.HashMap; +import org.apache.commons.lang.StringUtils; import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Table; import org.jumpmind.symmetric.common.ParameterConstants; @@ -78,29 +79,21 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) { " set @NCT = @@OPTIONS & 512 \n" + " set nocount on \n" + " declare @TransactionId varchar(1000) \n" + -" declare @DataRow "+(castToNVARCHAR ? "n" : "")+"varchar(max) \n" + -" declare @ChannelId varchar(128) \n" + -" $(declareNewKeyVariables) \n" + " if (@@TRANCOUNT > 0) begin \n" + " select @TransactionId = convert(VARCHAR(1000),transaction_id) from sys.dm_exec_requests where session_id=@@SPID and open_transaction_count > 0 \n" + " end \n" + " $(custom_before_insert_text) \n" + " if ($(syncOnIncomingBatchCondition)) begin \n" + -" declare DataCursor cursor local for \n" + +" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data \n" + +" (table_name, event_type, trigger_hist_id, row_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" + +" select '$(targetTableName)','I', $(triggerHistoryId), $(columns), \n" + +" $(channelExpression), $(txIdExpression), " + defaultCatalog + "dbo.$(prefixName)_node_disabled(), $(externalSelect), current_timestamp \n" + " $(if:containsBlobClobColumns) \n" + -" select $(columns) $(newKeyNames), $(channelExpression) from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) where $(syncOnInsertCondition)\n" + +" from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) \n" + " $(else:containsBlobClobColumns) \n" + -" select $(columns) $(newKeyNames), $(channelExpression) from inserted where $(syncOnInsertCondition) \n" + +" from inserted \n" + " $(end:containsBlobClobColumns) \n" + -" open DataCursor \n" + -" fetch next from DataCursor into @DataRow $(newKeyVariables), @ChannelId \n" + -" while @@FETCH_STATUS = 0 begin \n" + -" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" + -" values('$(targetTableName)','I', $(triggerHistoryId), @DataRow, @ChannelId, $(txIdExpression), " + defaultCatalog + "dbo.$(prefixName)_node_disabled(), $(externalSelect), current_timestamp) \n" + -" fetch next from DataCursor into @DataRow $(newKeyVariables), @ChannelId \n" + -" end \n" + -" close DataCursor \n" + -" deallocate DataCursor \n" + +" where $(syncOnInsertCondition) \n" + " end \n" + " $(custom_on_insert_text) \n" + " if (@NCT = 0) set nocount off \n" + @@ -114,34 +107,20 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) { " set @NCT = @@OPTIONS & 512 \n" + " set nocount on \n" + " declare @TransactionId varchar(1000) \n" + -" declare @DataRow "+(castToNVARCHAR ? "n" : "")+"varchar(max) \n" + -" declare @OldPk "+(castToNVARCHAR ? "n" : "")+"varchar(2000) \n" + -" declare @OldDataRow "+(castToNVARCHAR ? "n" : "")+"varchar(max) \n" + -" declare @ChannelId varchar(128) \n" + -" $(declareOldKeyVariables) \n" + -" $(declareNewKeyVariables) \n" + " if (@@TRANCOUNT > 0) begin \n" + " select @TransactionId = convert(VARCHAR(1000),transaction_id) from sys.dm_exec_requests where session_id=@@SPID and open_transaction_count > 0 \n" + " end \n" + " $(custom_before_update_text) \n" + " if ($(syncOnIncomingBatchCondition)) begin \n" + -" declare DataCursor cursor local for \n" + +" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" + +" select '$(targetTableName)','U', $(triggerHistoryId), $(columns), $(oldKeys), $(oldColumns), $(channelExpression), "+ +" $(txIdExpression), " + defaultCatalog + "dbo.$(prefixName)_node_disabled(), $(externalSelect), current_timestamp\n" + " $(if:containsBlobClobColumns) \n" + -" select $(columns), $(oldKeys), $(oldColumns) $(oldKeyNames) $(newKeyNames), $(channelExpression) from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnUpdateCondition)\n" + +" from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) inner join deleted on $(oldNewPrimaryKeyJoin) \n" + " $(else:containsBlobClobColumns) \n" + -" select $(columns), $(oldKeys), $(oldColumns) $(oldKeyNames) $(newKeyNames), $(channelExpression) from inserted inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnUpdateCondition) \n" + +" from inserted inner join deleted on $(oldNewPrimaryKeyJoin) \n" + " $(end:containsBlobClobColumns) \n" + -" open DataCursor \n" + -" fetch next from DataCursor into @DataRow, @OldPk, @OldDataRow $(oldKeyVariables) $(newKeyVariables), @ChannelId \n" + -" while @@FETCH_STATUS = 0 begin \n" + -" if ($(dataHasChangedCondition)) begin \n" + -" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" + -" values('$(targetTableName)','U', $(triggerHistoryId), @DataRow, @OldPk, @OldDataRow, @ChannelId, $(txIdExpression), " + defaultCatalog + "dbo.$(prefixName)_node_disabled(), $(externalSelect), current_timestamp)\n" + -" end \n" + -" fetch next from DataCursor into @DataRow, @OldPk, @OldDataRow $(oldKeyVariables) $(newKeyVariables), @ChannelId \n" + -" end \n" + -" close DataCursor \n" + -" deallocate DataCursor \n" + +" where $(syncOnUpdateCondition) and ($(dataHasChangedCondition)) \n" + " end \n" + " $(custom_on_update_text) \n" + " if (@NCT = 0) set nocount off \n" + @@ -155,42 +134,21 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) { " set @NCT = @@OPTIONS & 512 \n" + " set nocount on \n" + " declare @TransactionId varchar(1000) \n" + -" declare @OldPk "+(castToNVARCHAR ? "n" : "")+"varchar(2000) \n" + -" declare @OldDataRow "+(castToNVARCHAR ? "n" : "")+"varchar(max) \n" + -" declare @DataRow "+(castToNVARCHAR ? "n" : "")+"varchar(max) \n" + -" declare @ChannelId varchar(128) \n" + -" $(declareOldKeyVariables) \n" + -" $(declareNewKeyVariables) \n" + " \n" + " if (@@TRANCOUNT > 0) begin \n" + " select @TransactionId = convert(VARCHAR(1000),transaction_id) from sys.dm_exec_requests where session_id=@@SPID and open_transaction_count > 0 \n" + " end \n" + " $(custom_before_update_text) \n" + " if ($(syncOnIncomingBatchCondition)) begin \n" + -" declare DeleteCursor cursor local for \n" + -" select $(oldKeys), $(oldColumns) $(oldKeyNames) from deleted where $(syncOnDeleteCondition) \n" + -" declare InsertCursor cursor local for \n" + -" $(if:containsBlobClobColumns) \n" + -" select $(columns) $(newKeyNames), $(channelExpression) from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) where $(syncOnInsertCondition)\n" + -" $(else:containsBlobClobColumns) \n" + -" select $(columns) $(newKeyNames), $(channelExpression) from inserted where $(syncOnInsertCondition) \n" + -" $(end:containsBlobClobColumns) \n" + -" open DeleteCursor \n" + -" open InsertCursor \n" + -" fetch next from DeleteCursor into @OldPk, @OldDataRow $(oldKeyVariables) \n" + -" fetch next from InsertCursor into @DataRow $(newKeyVariables), @ChannelId \n" + -" while @@FETCH_STATUS = 0 begin \n" + -" if ($(dataHasChangedCondition)) begin \n" + -" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" + -" values('$(targetTableName)','U', $(triggerHistoryId), @DataRow, @OldPk, @OldDataRow, @ChannelId, $(txIdExpression), " + defaultCatalog + "dbo.$(prefixName)_node_disabled(), $(externalSelect), current_timestamp)\n" + -" end \n" + -" fetch next from DeleteCursor into @OldPk, @OldDataRow $(oldKeyVariables) \n" + -" fetch next from InsertCursor into @DataRow $(newKeyVariables), @ChannelId \n" + -" end \n" + -" close DeleteCursor \n" + -" close InsertCursor \n" + -" deallocate DeleteCursor \n" + -" deallocate InsertCursor \n" + +" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" + +" select '$(targetTableName)','U', $(triggerHistoryId), $(columns), $(oldKeys), $(oldColumns), $(channelExpression), "+ +" $(txIdExpression), " + defaultCatalog + "dbo.$(prefixName)_node_disabled(), $(externalSelect), current_timestamp\n" + +" $(if:containsBlobClobColumns) \n" + +" from (select $(nonBlobColumns), row_number() over (order by (select 1)) as __row_num from inserted) inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) inner join (select $(nonBlobColumns), row_number() over (order by (select 1)) as __row_num from deleted)deleted on (inserted.__row_num = deleted.__row_num)\n" + +" $(else:containsBlobClobColumns) \n" + +" from (select *, row_number() over (order by (select 1)) as __row_num from inserted) inserted inner join (select *, row_number() over (order by (select 1)) as __row_num from deleted) deleted on (inserted.__row_num = deleted.__row_num) \n" + +" $(end:containsBlobClobColumns) \n" + +" where $(syncOnUpdateCondition) and ($(dataHasChangedCondition)) \n" + " end \n" + " $(custom_on_update_text) \n" + " if (@NCT = 0) set nocount off \n" + @@ -204,26 +162,15 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) { " set @NCT = @@OPTIONS & 512 \n" + " set nocount on \n" + " declare @TransactionId varchar(1000) \n" + -" declare @OldPk "+(castToNVARCHAR ? "n" : "")+"varchar(2000) \n" + -" declare @OldDataRow "+(castToNVARCHAR ? "n" : "")+"varchar(max) \n" + -" declare @ChannelId varchar(128) \n" + -" $(declareOldKeyVariables) \n" + " if (@@TRANCOUNT > 0) begin \n" + " select @TransactionId = convert(VARCHAR(1000),transaction_id) from sys.dm_exec_requests where session_id=@@SPID and open_transaction_count > 0 \n" + " end \n" + " $(custom_before_delete_text) \n" + " if ($(syncOnIncomingBatchCondition)) begin \n" + -" declare DataCursor cursor local for \n" + -" select $(oldKeys), $(oldColumns) $(oldKeyNames), $(channelExpression) from deleted where $(syncOnDeleteCondition) \n" + -" open DataCursor \n" + -" fetch next from DataCursor into @OldPk, @OldDataRow $(oldKeyVariables), @ChannelId \n" + -" while @@FETCH_STATUS = 0 begin \n" + -" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" + -" values('$(targetTableName)','D', $(triggerHistoryId), @OldPk, @OldDataRow, @ChannelId, $(txIdExpression), " + defaultCatalog + "dbo.$(prefixName)_node_disabled(), $(externalSelect), current_timestamp)\n" + -" fetch next from DataCursor into @OldPk,@OldDataRow $(oldKeyVariables), @ChannelId \n" + -" end \n" + -" close DataCursor \n" + -" deallocate DataCursor \n" + +" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" + +" select '$(targetTableName)','D', $(triggerHistoryId), $(oldKeys), $(oldColumns), $(channelExpression), \n" + +" $(txIdExpression), " + defaultCatalog + "dbo.$(prefixName)_node_disabled(), $(externalSelect), current_timestamp\n" + +" from deleted where $(syncOnDeleteCondition) \n" + " end \n" + " $(custom_on_delete_text) \n" + " if (@NCT = 0) set nocount off \n" + @@ -246,9 +193,61 @@ protected String replaceTemplateVariables(DataEventType dml, Trigger trigger, buildKeyVariablesDeclare(columns, "old"), ddl); ddl = FormatUtils.replace("declareNewKeyVariables", buildKeyVariablesDeclare(columns, "new"), ddl); + + ddl = FormatUtils.replace("anyNonBlobColumnChanged", + buildNonLobColumnsAreNotEqualString(table, newTriggerValue, oldTriggerValue), ddl); + + ddl = FormatUtils.replace("nonBlobColumns", buildNonLobColumnsString(table), ddl); return ddl; } + + private boolean isNotComparable(Column column) { + String columnType = column.getJdbcTypeName(); + return StringUtils.equalsIgnoreCase(columnType, "IMAGE") + || StringUtils.equalsIgnoreCase(columnType, "TEXT") + || StringUtils.equalsIgnoreCase(columnType, "NTEXT"); + } + + private String buildNonLobColumnsAreNotEqualString(Table table, String table1Name, String table2Name){ + StringBuilder builder = new StringBuilder(); + + for(Column column : table.getColumns()){ + boolean isLob = isNotComparable(column); + if(isLob || column.isPrimaryKey()){ + continue; + } + if(builder.length() > 0){ + builder.append(" or "); + } + + builder.append(String.format("((%1$s.\"%2$s\" IS NOT NULL AND %3$s.\"%2$s\" IS NOT NULL AND %1$s.\"%2$s\"<>%3$s.\"%2$s\") or (%1$s.\"%2$s\" IS NULL AND %3$s.\"%2$s\" IS NOT NULL) or (%1$s.\"%2$s\" IS NOT NULL AND %3$s.\"%2$s\" IS NULL))", + table1Name, column.getName(), table2Name)); + + } + + return "(" + builder.toString() + ")"; + } + + private String buildNonLobColumnsString(Table table){ + StringBuilder builder = new StringBuilder(); + + for(Column column : table.getColumns()){ + boolean isLob = symmetricDialect.getPlatform().isLob(column.getMappedTypeCode()); + if(isLob){ + continue; + } + if(builder.length() > 0){ + builder.append(","); + } + builder.append('"'); + builder.append(column.getName()); + builder.append('"'); + } + + return builder.toString(); + } + protected String getSourceTablePrefix(TriggerHistory triggerHistory) { String prefix = (isNotBlank(triggerHistory.getSourceSchemaName()) ? SymmetricUtils.quote( symmetricDialect, triggerHistory.getSourceSchemaName()) + symmetricDialect.getPlatform().getDatabaseInfo().getSchemaSeparator() : ""); @@ -261,9 +260,9 @@ protected String getSourceTablePrefix(TriggerHistory triggerHistory) { + "." : ""); if (symmetricDialect.getParameterService().is(ParameterConstants.MSSQL_INCLUDE_CATALOG_IN_TRIGGERS, true)) { - prefix = (isNotBlank(symmetricDialect.getPlatform().getDefaultCatalog()) ? SymmetricUtils - .quote(symmetricDialect, symmetricDialect.getPlatform().getDefaultCatalog()) - + "." : "") + prefix; + prefix = (isNotBlank(symmetricDialect.getPlatform().getDefaultCatalog()) ? SymmetricUtils + .quote(symmetricDialect, symmetricDialect.getPlatform().getDefaultCatalog()) + + "." : "") + prefix; } } return prefix; @@ -281,4 +280,4 @@ protected String getColumnSize(Column column) { protected boolean requiresEmptyLobTemplateForDeletes() { return true; } -} \ No newline at end of file +}