Skip to content

Commit

Permalink
[ 1945266 ] column level updates (sql-server)
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Nov 5, 2008
1 parent e62c837 commit 101563c
Show file tree
Hide file tree
Showing 19 changed files with 281 additions and 37 deletions.
11 changes: 11 additions & 0 deletions symmetric/src/changes/changes.xml
Expand Up @@ -5,11 +5,22 @@
<author email="chenson42@users.sourceforge.net">Chris Henson</author>
</properties>
<body>
<release version="1.5.0" date="2008-12-01" description="">
<action dev="erilong" type="add" issue="aid=1945266&amp;atid=997727">
Column level synchronization to merge rows that do not conflict.
</action>
</release>
<release version="1.4.2" date="2008-11-xx" description="Patch Release">
<action dev="chenson42" type="fix">
Use JDOM in XmlPublisherFilter to formatting the XML document. If you are upgrading and are currently doing the escaping characters for
XML, you'll probably want to remove that logic.
</action>
<action dev="erilong" type="fix" issue="aid=2216936&amp;atid=997724">
The SQL-Server dialect was broken for the sync_on_incoming_batch feature.
</action>
<action dev="erilong" type="fix">
The sample database was missing a trigger that prevented the demo initial load.
</action>
</release>
<release version="1.4.1" date="2008-10-21" description="Patch Release">
<action dev="chenson42" type="fix">
Expand Down
15 changes: 15 additions & 0 deletions symmetric/src/docbook/user-guide/ch03/sec01-data-model-config.xml
Expand Up @@ -648,6 +648,21 @@
this on, because an update loop is possible.
</entry>
</row>
<row>
<entry>sync_column_level</entry>
<entry>booleanint</entry>
<entry>N</entry>
<entry></entry>
<entry>0</entry>
<entry>
Perform column level synchronization for updates, so only the
fields that changed are updated on the target database.
When updates come from multiple sources at the same time,
this enables merging of changes that do not conflict.
The entire change row is still sent, which preserves the ability
to fallback to an insert if the row does not exist.
</entry>
</row>
<row>
<entry>name_for_update_trigger</entry>
<entry>varchar(30)</entry>
Expand Down
Expand Up @@ -41,6 +41,8 @@ public class CsvConstants {

public static final String UPDATE = "update";

public static final String OLD = "old";

public static final String DELETE = "delete";

public static final String COMMIT = "commit";
Expand Down
Expand Up @@ -219,6 +219,12 @@ public String replaceTemplateVariables(IDbDialect dialect, DataEventType dml, Tr
Column[] columns = trigger.orderColumnsForTable(metaData);
String columnsText = buildColumnString(ORIG_TABLE_ALIAS, newTriggerValue, columns);
ddl = replace("columns", columnsText, ddl);
if (trigger.isSyncColumnLevel()) {
columnsText = buildColumnString(ORIG_TABLE_ALIAS, oldTriggerValue, columns);
} else {
columnsText = "null";
}
ddl = replace("oldColumns", columnsText, ddl);
ddl = eval(containsBlobClobColumns(columns), "containsBlobClobColumns", ddl);

// some column templates need tableName and schemaName
Expand Down Expand Up @@ -507,4 +513,5 @@ public String getFunctionInstalledSql(String functionName) {
return null;
}
}

}
Expand Up @@ -31,6 +31,7 @@
import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.extract.IDataExtractor;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEventType;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.service.INodeService;
Expand Down Expand Up @@ -98,6 +99,10 @@ public void preprocessTable(Data data, BufferedWriter out, DataExtractorContext
Util.write(out, CsvConstants.TABLE, ", ", data.getTableName());
out.newLine();
}
if (data.getEventType() == DataEventType.UPDATE && data.getOldData() != null) {
Util.write(out, CsvConstants.OLD, ", ", data.getOldData());
out.newLine();
}

context.setLastTableName(data.getTableName());
}
Expand Down
Expand Up @@ -128,6 +128,10 @@ public void setColumnNames(String[] columnNames) {
tableTemplate.setColumnNames(columnNames);
}

public void setOldData(String[] oldData) {
tableTemplate.setOldData(oldData);
}

public String[] getKeyNames() {
return tableTemplate.getKeyNames();
}
Expand Down
Expand Up @@ -75,6 +75,8 @@ public class TableTemplate {
private String[] keyNames;

private String[] columnNames;

private String[] oldData;

private Map<String, Column> allMetaData;

Expand Down Expand Up @@ -165,7 +167,26 @@ public int update(IDataLoaderContext ctx, String[] columnValues, String[] keyVal
public int update(IDataLoaderContext ctx, String[] columnValues, String[] keyValues, BinaryEncoding encoding) {
StatementBuilder st = null;
Column[] metaData = null;
if (dontIncludeKeysInUpdateStatement) {
if (oldData != null) {
ArrayList<String> changedColumnNameList = new ArrayList<String>();
ArrayList<String> changedColumnValueList = new ArrayList<String>();
ArrayList<Column> changedColumnMetaList = new ArrayList<Column>();
for (int i = 0; i < columnValues.length; i++) {
if (! StringUtils.equals(columnValues[i], oldData[i])) {
changedColumnNameList.add(columnNames[i]);
changedColumnMetaList.add(allMetaData.get(columnNames[i].trim().toUpperCase()));
changedColumnValueList.add(columnValues[i]);
}
}
if (changedColumnNameList.size() > 0) {
String[] changedColumnNames = changedColumnNameList.toArray(new String[0]);
st = createStatementBuilder(ctx, DmlType.UPDATE, changedColumnNames, encoding);
columnValues = (String[]) changedColumnValueList.toArray(new String[0]);
Column[] changedColumnMetaData = (Column[]) changedColumnMetaList.toArray(new Column[0]);
metaData = (Column[]) ArrayUtils.addAll(changedColumnMetaData, keyMetaData);
}
oldData = null;
} else if (dontIncludeKeysInUpdateStatement) {
String[] values = removeKeysFromColumnValuesIfSame(ctx, keyValues, columnValues);
if (values != null) {
columnValues = values;
Expand Down Expand Up @@ -257,39 +278,43 @@ public int delete(IDataLoaderContext ctx, String[] keyValues) {
private StatementBuilder getStatementBuilder(IDataLoaderContext ctx, DmlType type, BinaryEncoding encoding) {
StatementBuilder st = statementMap.get(type);
if (st == null) {
String[] filteredColumnNames = columnNames;
if (columnFilters != null) {
for (IColumnFilter columnFilter : columnFilters) {
filteredColumnNames = columnFilter.filterColumnsNames(ctx, type, getTable(), columnNames);
}
}
if (keyMetaData == null) {
keyMetaData = getColumnMetaData(keyNames);
}
if (columnMetaData == null) {
columnMetaData = getColumnMetaData(columnNames);
}
if (columnKeyMetaData == null) {
columnKeyMetaData = (Column[]) ArrayUtils.addAll(columnMetaData, keyMetaData);
}

String tableName = table.getName();
if (table.getSchema() != null && dbDialect.getDefaultSchema() != null
&& !table.getSchema().equals(dbDialect.getDefaultSchema())) {
tableName = table.getSchema() + "." + tableName;
}
if (table.getCatalog() != null && dbDialect.getDefaultCatalog() != null
&& !table.getCatalog().equals(dbDialect.getDefaultCatalog())) {
tableName = table.getCatalog() + "." + tableName;
}
st = new StatementBuilder(type, tableName, keyMetaData, getColumnMetaData(filteredColumnNames), dbDialect
.isBlobOverrideToBinary(), dbDialect.isDateOverrideToTimestamp(),
dbDialect.getIdentifierQuoteString());
st = createStatementBuilder(ctx, type, columnNames, encoding);
statementMap.put(type, st);
}
return st;
}

private StatementBuilder createStatementBuilder(IDataLoaderContext ctx, DmlType type,
String[] filteredColumnNames, BinaryEncoding encoding) {
if (columnFilters != null) {
for (IColumnFilter columnFilter : columnFilters) {
filteredColumnNames = columnFilter.filterColumnsNames(ctx, type, getTable(), filteredColumnNames);
}
}
if (keyMetaData == null) {
keyMetaData = getColumnMetaData(keyNames);
}
if (columnMetaData == null) {
columnMetaData = getColumnMetaData(columnNames);
}
if (columnKeyMetaData == null) {
columnKeyMetaData = (Column[]) ArrayUtils.addAll(columnMetaData, keyMetaData);
}

String tableName = table.getName();
if (table.getSchema() != null && dbDialect.getDefaultSchema() != null
&& !table.getSchema().equals(dbDialect.getDefaultSchema())) {
tableName = table.getSchema() + "." + tableName;
}
if (table.getCatalog() != null && dbDialect.getDefaultCatalog() != null
&& !table.getCatalog().equals(dbDialect.getDefaultCatalog())) {
tableName = table.getCatalog() + "." + tableName;
}
return new StatementBuilder(type, tableName, keyMetaData, getColumnMetaData(filteredColumnNames),
dbDialect.isBlobOverrideToBinary(), dbDialect.isDateOverrideToTimestamp(), dbDialect
.getIdentifierQuoteString());
}

private int execute(IDataLoaderContext ctx, StatementBuilder st, String[] values, Column[] metaData,
BinaryEncoding encoding) {
List<Object> list = new ArrayList<Object>(values.length);
Expand Down Expand Up @@ -373,11 +398,16 @@ public void setColumnNames(String[] columnNames) {
clear();
}

public void setOldData(String[] oldData) {
this.oldData = oldData;
}

private void clear() {
statementMap.clear();
keyMetaData = null;
columnMetaData = null;
columnKeyMetaData = null;
oldData = null;
}

private Column[] getColumnMetaData(String[] names) {
Expand Down
Expand Up @@ -137,6 +137,8 @@ public void load() throws IOException {
if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
delete(tokens);
}
} else if (tokens[0].equals(CsvConstants.OLD)) {
context.setOldData((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
} else if (isMetaTokenParsed(tokens)) {
continue;
} else if (tokens[0].equals(CsvConstants.COMMIT)) {
Expand Down
15 changes: 15 additions & 0 deletions symmetric/src/main/java/org/jumpmind/symmetric/model/Data.java
Expand Up @@ -41,6 +41,13 @@ public class Data {
* Comma deliminated row data.
*/
private String rowData;

/**
* To support column-level sync and conflict resolution.
* Comma delimited old row data from an update.
*/

private String oldData;

/**
* This is a reference to the audit row the trigger refered to when the data
Expand Down Expand Up @@ -134,4 +141,12 @@ public void setAudit(TriggerHistory audit) {
this.audit = audit;
}

public String getOldData() {
return oldData;
}

public void setOldData(String oldData) {
this.oldData = oldData;
}

}
10 changes: 10 additions & 0 deletions symmetric/src/main/java/org/jumpmind/symmetric/model/Trigger.java
Expand Up @@ -70,6 +70,8 @@ public class Trigger {
private boolean syncOnDelete = true;

private boolean syncOnIncomingBatch = false;

private boolean syncColumnLevel = false;

private String nameForInsertTrigger;

Expand Down Expand Up @@ -453,4 +455,12 @@ public void setSourceCatalogName(String sourceCatalogName) {
this.sourceCatalogName = sourceCatalogName;
}

public boolean isSyncColumnLevel() {
return syncColumnLevel;
}

public void setSyncColumnLevel(boolean syncColumnLevel) {
this.syncColumnLevel = syncColumnLevel;
}

}
Expand Up @@ -372,6 +372,7 @@ public Object mapRow(java.sql.ResultSet rs, int arg1) throws java.sql.SQLExcepti
trig.setSyncOnUpdate(rs.getBoolean("sync_on_update"));
trig.setSyncOnDelete(rs.getBoolean("sync_on_delete"));
trig.setSyncOnIncomingBatch(rs.getBoolean("sync_on_incoming_batch"));
trig.setSyncColumnLevel(rs.getBoolean("sync_column_level"));
trig.setNameForDeleteTrigger(rs.getString("name_for_delete_trigger"));
trig.setNameForInsertTrigger(rs.getString("name_for_insert_trigger"));
trig.setNameForUpdateTrigger(rs.getString("name_for_update_trigger"));
Expand Down
Expand Up @@ -297,9 +297,12 @@ private Data next(ResultSet results) throws SQLException {
DataEventType eventType = DataEventType.getEventType(results.getString(3));
String rowData = results.getString(4);
String pk = results.getString(5);
String oldData = results.getString(6);
Date created = results.getDate(7);
TriggerHistory audit = configurationService.getHistoryRecordFor(results.getInt(8));
return new Data(dataId, pk, rowData, eventType, tableName, created, audit);
Data data = new Data(dataId, pk, rowData, eventType, tableName, created, audit);
data.setOldData(oldData);
return data;
}

public void setOutgoingBatchService(IOutgoingBatchService batchBuilderService) {
Expand Down
2 changes: 2 additions & 0 deletions symmetric/src/main/resources/ddl-config.xml
Expand Up @@ -19,6 +19,7 @@
<column name="event_type" type="CHAR" size="1" />
<column name="row_data" type="LONGVARCHAR" />
<column name="pk_data" type="LONGVARCHAR" />
<column name="old_data" type="LONGVARCHAR" />
<column name="trigger_hist_id" type="INTEGER" required="true" />
<column name="create_time" type="TIMESTAMP" />
<!-- This helps the purge query -->
Expand Down Expand Up @@ -155,6 +156,7 @@
<column name="sync_on_insert" type="BOOLEANINT" size="1" required="true" default="1" />
<column name="sync_on_delete" type="BOOLEANINT" size="1" required="true" default="1" />
<column name="sync_on_incoming_batch" type="BOOLEANINT" size="1" required="true" default="0" />
<column name="sync_column_level" type="BOOLEANINT" size="1" required="true" default="0" />
<column name="name_for_update_trigger" type="VARCHAR" size="50" />
<column name="name_for_insert_trigger" type="VARCHAR" size="50" />
<column name="name_for_delete_trigger" type="VARCHAR" size="50" />
Expand Down
13 changes: 7 additions & 6 deletions symmetric/src/main/resources/dialects/mssql.xml
Expand Up @@ -119,25 +119,26 @@
declare @SyncEnabled varbinary(128)
declare @DataRow varchar(max)
declare @OldPk varchar(2000)
declare @OldDataRow varchar(max)
if (@@TRANCOUNT > 0) begin
execute sp_getbindtoken @TransactionId output;
end
select @SyncEnabled = context_info from master.dbo.sysprocesses where spid=@@SPID
if ($(syncOnIncomingBatchCondition)) begin
declare DataCursor cursor local for
$(if:containsBlobClobColumns)
select $(columns), $(oldKeys) from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition)
select $(columns), $(oldKeys), $(oldColumns) from inserted inner join $(schemaName)$(tableName) $(origTableAlias) on $(tableNewPrimaryKeyJoin) inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition)
$(else:containsBlobClobColumns)
select $(columns), $(oldKeys) from inserted inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition)
select $(columns), $(oldKeys), $(oldColumns) from inserted inner join deleted on $(oldNewPrimaryKeyJoin) where $(syncOnInsertCondition)
$(end:containsBlobClobColumns)
open DataCursor
fetch next from DataCursor into @DataRow, @OldPk
fetch next from DataCursor into @DataRow, @OldPk, @OldDataRow
while @@FETCH_STATUS = 0 begin
insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, create_time)
values('$(targetTableName)','U', $(triggerHistoryId), @DataRow, @OldPk, current_timestamp)
insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, old_data, create_time)
values('$(targetTableName)','U', $(triggerHistoryId), @DataRow, @OldPk, @OldDataRow, current_timestamp)
insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where
c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere))
fetch next from DataCursor into @DataRow, @OldPk
fetch next from DataCursor into @DataRow, @OldPk, @OldDataRow
end
close DataCursor
deallocate DataCursor
Expand Down
Expand Up @@ -10,7 +10,7 @@
<util:map id="dataExtractorServiceSql">
<entry key="selectEventDataToExtractSql">
<value>
select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, e.batch_id,
select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, d.old_data,
d.create_time, d.trigger_hist_id from ${sync.table.prefix}_data d inner join
${sync.table.prefix}_data_event e on d.data_id = e.data_id where e.node_id = ? and
e.batch_id = ? order by d.data_id asc
Expand Down

0 comments on commit 101563c

Please sign in to comment.