Skip to content

Commit

Permalink
1955378 - added symmetric.runtime.dont.include.keys.in.update.statement
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 29, 2008
1 parent bc41ec4 commit 7f8309c
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 99 deletions.
5 changes: 5 additions & 0 deletions symmetric/src/changes/changes.xml
Expand Up @@ -17,6 +17,11 @@
<action dev="chenson42" type="add" issue="aid=1961240&amp;atid=997727">
Pass the IDataLoaderContext into the IColumnFilter interface.
</action>
<action dev="chenson42" type="add" issue="aid=1955378&amp;atid=997727">
Don't include the keys in data loader update statement if they haven't changed. This feature
is turned off by default and can be enabled by the new symmetric.runtime.dont.include.keys.in.update.statement
parameter.
</action>
<action dev="chenson42" type="add" issue="aid=1955383&amp;atid=997727">
Add a stop() method to the SymmetricEngine API.
</action>
Expand Down
Expand Up @@ -43,6 +43,7 @@ public class ParameterConstants {
public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "symmetric.runtime.incoming.batches.skip.duplicates";
public final static String DATA_LOADER_NUM_OF_ACK_RETRIES = "symmetric.runtime.num.of.ack.retries";
public final static String DATA_LOADER_TIME_BETWEEN_ACK_RETRIES = "symmetric.runtime.time.between.ack.retries.ms";
public final static String DATA_LOADER_NO_KEYS_IN_UPDATE = "symmetric.runtime.dont.include.keys.in.update.statement";

public final static String TRANSPORT_HTTP_TIMEOUT = "symmetric.runtime.http.timeout.ms";
public final static String TRANSPORT_HTTP_USE_COMPRESSION_CLIENT = "symmetric.runtime.http.compression";
Expand Down
Expand Up @@ -23,14 +23,17 @@

import java.sql.Types;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.ddlutils.model.Column;

public class StatementBuilder {
public enum DmlType {
INSERT, UPDATE, DELETE
INSERT, UPDATE, DELETE, UPDATE_NO_KEYS
};

protected DmlType dmlType;
Expand All @@ -39,26 +42,17 @@ public enum DmlType {

protected int[] types;

public StatementBuilder(DmlType type, String tableName, String[] keyNames, String[] columnNames) {
if (type == DmlType.INSERT) {
sql = buildInsertSql(tableName, columnNames);
} else if (type == DmlType.UPDATE) {
sql = buildUpdateSql(tableName, keyNames, columnNames);
} else if (type == DmlType.DELETE) {
sql = buildDeleteSql(tableName, keyNames);
} else {
throw new NotImplementedException("Unimplemented SQL type: " + type);
}
dmlType = type;
}

public StatementBuilder(DmlType type, String tableName, Column[] keys, Column[] columns, boolean isBlobOverrideToBinary) {
if (type == DmlType.INSERT) {
sql = buildInsertSql(tableName, columns);
types = buildTypes(columns, isBlobOverrideToBinary);
} else if (type == DmlType.UPDATE) {
sql = buildUpdateSql(tableName, keys, columns);
types = buildTypes(keys, columns, isBlobOverrideToBinary);
} else if (type == DmlType.UPDATE_NO_KEYS) {
columns = removeKeysFromColumns(keys, columns);
sql = buildUpdateSql(tableName, keys, columns);
types = buildTypes(keys, columns, isBlobOverrideToBinary);
} else if (type == DmlType.DELETE) {
sql = buildDeleteSql(tableName, keys);
types = buildTypes(keys, isBlobOverrideToBinary);
Expand All @@ -67,6 +61,20 @@ public StatementBuilder(DmlType type, String tableName, Column[] keys, Column[]
}
dmlType = type;
}

protected Column[] removeKeysFromColumns(Column[] keys, Column[] columns) {
Column[] columnsWithoutKeys = new Column[columns.length-keys.length];
Set<Column> keySet = new HashSet<Column>();
CollectionUtils.addAll(keySet, keys);
int n = 0;
for (int i = 0; i < columns.length; i++) {
Column column = columns[i];
if (!keySet.contains(column)) {
columnsWithoutKeys[n++] = column;
}
}
return columnsWithoutKeys;
}

protected int[] buildTypes(Column[] keys, Column[] columns, boolean isBlobOverrideToBinary) {
int[] columnTypes = buildTypes(columns, isBlobOverrideToBinary);
Expand Down
111 changes: 95 additions & 16 deletions symmetric/src/main/java/org/jumpmind/symmetric/load/TableTemplate.java
Expand Up @@ -62,33 +62,41 @@ public class TableTemplate {
private IDbDialect dbDialect;

private Table table;

private String tableName;

private String[] keyNames;

private String[] columnNames;

private Map<String, Column> allMetaData;

private Column[] keyMetaData;

private Column[] columnMetaData;

private Column[] columnKeyMetaData;

private Column[] noKeyColumnPlusKeyMetaData;

private HashMap<DmlType, StatementBuilder> statementMap;

private int[] keyIndexesToRemoveOnUpdate;

private List<IColumnFilter> columnFilters = new ArrayList<IColumnFilter>();

public TableTemplate(JdbcTemplate jdbcTemplate, IDbDialect dbDialect, String tableName, IColumnFilter columnFilter) {
private boolean dontIncludeKeysInUpdateStatement = false;

public TableTemplate(JdbcTemplate jdbcTemplate, IDbDialect dbDialect, String tableName, IColumnFilter columnFilter,
boolean dontIncludeKeysInUpdateStatement) {
this.jdbcTemplate = jdbcTemplate;
this.dbDialect = dbDialect;
this.setupColumnFilters(columnFilter, dbDialect);
this.tableName = tableName;
this.dontIncludeKeysInUpdateStatement = dontIncludeKeysInUpdateStatement;
resetMetaData();
}

public void resetMetaData() {
table = dbDialect.getMetaDataFor(null, null, tableName, true);
allMetaData = new HashMap<String, Column>();
Expand All @@ -103,14 +111,14 @@ public void resetMetaData() {
}
}
}

private void setupColumnFilters(IColumnFilter pluginFilter, IDbDialect dbDialect) {
if (pluginFilter != null) {
this.columnFilters.add(pluginFilter);
}
if (dbDialect.getDatabaseColumnFilter() != null) {
this.columnFilters.add(dbDialect.getDatabaseColumnFilter());
}
}
}

public String getTableName() {
Expand All @@ -135,9 +143,79 @@ public int update(IDataLoaderContext ctx, String[] columnValues, String[] keyVal
}

public int update(IDataLoaderContext ctx, String[] columnValues, String[] keyValues, BinaryEncoding encoding) {
StatementBuilder st = getStatementBuilder(ctx, DmlType.UPDATE, encoding);
StatementBuilder st = null;
Column[] metaData = null;
if (dontIncludeKeysInUpdateStatement) {
String[] values = removeKeysFromColumnValuesIfSame(ctx, keyValues, columnValues);
if (values != null) {
columnValues = values;
st = getStatementBuilder(ctx, DmlType.UPDATE_NO_KEYS, encoding);
metaData = noKeyColumnPlusKeyMetaData;
}
}

if (st == null) {
st = getStatementBuilder(ctx, DmlType.UPDATE, encoding);
metaData = columnKeyMetaData;
}
String[] values = (String[]) ArrayUtils.addAll(columnValues, keyValues);
return execute(ctx, st, values, columnKeyMetaData, encoding);
return execute(ctx, st, values, metaData, encoding);
}

/**
* This is in support of allowing update statements that don't use the keys in the set portion of the
* update statement.
*/
private String[] removeKeysFromColumnValuesIfSame(IDataLoaderContext ctx, String[] keyValues, String[] columnValues) {
if (keyIndexesToRemoveOnUpdate == null) {
String[] colNames = ctx.getColumnNames();
String[] keyNames = ctx.getKeyNames();
String[] noKeyColNames = new String[colNames.length-keyNames.length];
keyIndexesToRemoveOnUpdate = new int[keyNames.length];
int indexToRemoveIndex = 0;
int indexOfNoKeyColNames = 0;
for (int index = 0; index < colNames.length; index++) {
if (ArrayUtils.contains(keyNames, colNames[index])) {
keyIndexesToRemoveOnUpdate[indexToRemoveIndex++] = index;
} else {
noKeyColNames[indexOfNoKeyColNames++] = colNames[index];
}
}

noKeyColumnPlusKeyMetaData = getColumnMetaData((String[])ArrayUtils.addAll(noKeyColNames, keyNames));
}

if (noKeyColumnPlusKeyMetaData == null) {
String[] noKeys = new String[columnValues.length - keyValues.length];
int noKeysIndex = 0;
for (int index = 0; index < columnValues.length; index++) {
if (!ArrayUtils.contains(keyIndexesToRemoveOnUpdate, index)) {
noKeys[noKeysIndex++] = columnValues[index];
}
}
return noKeys;
}

boolean keyChanged = false;
for (int index = 0; index < keyIndexesToRemoveOnUpdate.length; index++) {
if (!StringUtils.equals(keyValues[index], columnValues[index])) {
keyChanged = true;
}
}

if (!keyChanged) {
String[] noKeys = new String[columnValues.length - keyValues.length];
int noKeysIndex = 0;
for (int index = 0; index < columnValues.length; index++) {
if (!ArrayUtils.contains(keyIndexesToRemoveOnUpdate, index)) {
noKeys[noKeysIndex++] = columnValues[index];
}
}
return noKeys;

} else {
return null;
}
}

public int delete(IDataLoaderContext ctx, String[] keyValues) {
Expand All @@ -152,7 +230,7 @@ private StatementBuilder getStatementBuilder(IDataLoaderContext ctx, DmlType typ
if (columnFilters != null) {
for (IColumnFilter columnFilter : columnFilters) {
filteredColumnNames = columnFilter.filterColumnsNames(ctx, type, getTable(), columnNames);
}
}
}
if (keyMetaData == null) {
keyMetaData = getColumnMetaData(keyNames);
Expand All @@ -173,14 +251,15 @@ private StatementBuilder getStatementBuilder(IDataLoaderContext ctx, DmlType typ
&& !table.getCatalog().equals(dbDialect.getDefaultCatalog())) {
tableName = table.getCatalog() + "." + tableName;
}
st = new StatementBuilder(type, tableName, keyMetaData,
getColumnMetaData(filteredColumnNames), dbDialect.isBlobOverrideToBinary());
st = new StatementBuilder(type, tableName, keyMetaData, getColumnMetaData(filteredColumnNames), dbDialect
.isBlobOverrideToBinary());
statementMap.put(type, st);
}
return st;
}

private int execute(IDataLoaderContext ctx, StatementBuilder st, String[] values, Column[] metaData, BinaryEncoding encoding) {
private int execute(IDataLoaderContext ctx, StatementBuilder st, String[] values, Column[] metaData,
BinaryEncoding encoding) {
List<Object> list = new ArrayList<Object>(values.length);

for (int i = 0; i < values.length; i++) {
Expand Down Expand Up @@ -221,8 +300,8 @@ private int execute(IDataLoaderContext ctx, StatementBuilder st, String[] values
Object[] objectValues = list.toArray();
if (columnFilters != null) {
for (IColumnFilter columnFilter : columnFilters) {
objectValues = columnFilter.filterColumnsValues(ctx, st.getDmlType(), getTable(), objectValues);
}
objectValues = columnFilter.filterColumnsValues(ctx, st.getDmlType(), getTable(), objectValues);
}
}
return jdbcTemplate.update(st.getSql(), objectValues, st.getTypes());
}
Expand Down Expand Up @@ -251,7 +330,7 @@ private void clear() {
columnMetaData = null;
columnKeyMetaData = null;
}

private Column[] getColumnMetaData(String[] names) {
Column[] columns = new Column[names.length];
for (int i = 0; i < names.length; i++) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.csv.CsvConstants;
import org.jumpmind.symmetric.db.BinaryEncoding;
import org.jumpmind.symmetric.db.IDbDialect;
Expand All @@ -40,6 +41,7 @@
import org.jumpmind.symmetric.load.IDataLoaderFilter;
import org.jumpmind.symmetric.load.IDataLoaderStatistics;
import org.jumpmind.symmetric.load.TableTemplate;
import org.jumpmind.symmetric.service.IParameterService;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.jdbc.core.JdbcTemplate;

Expand All @@ -53,6 +55,8 @@ public class CsvLoader implements IDataLoader {

protected IDbDialect dbDialect;

protected IParameterService parameterService;

protected CsvReader csvReader;

protected DataLoaderContext context;
Expand Down Expand Up @@ -175,7 +179,8 @@ protected void setTable(String tableName) {

if (context.getTableTemplate() == null) {
context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, tableName,
this.columnFilters != null ? this.columnFilters.get(tableName) : null));
this.columnFilters != null ? this.columnFilters.get(tableName) : null, parameterService
.is(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE)));
}

dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable());
Expand Down Expand Up @@ -411,4 +416,8 @@ public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
public void setDbDialect(IDbDialect dbDialect) {
this.dbDialect = dbDialect;
}

public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
}
}
6 changes: 6 additions & 0 deletions symmetric/src/main/resources/symmetric-default.properties
Expand Up @@ -126,6 +126,12 @@ symmetric.runtime.outgoing.batches.peek.ahead.window.after.max.size=100
# This property is override-able in the database.
symmetric.runtime.incoming.batches.skip.duplicates=true

# If this property is set to true, then keys will not be included in set portion of
# the update statements generated by the data loader.
#
# This property is override-able in the database.
symmetric.runtime.dont.include.keys.in.update.statement=false

# This is the number of times we will attempt to send an ACK back to the remote node
# when pulling and loading data.
#
Expand Down
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -156,6 +156,7 @@
<property name="jdbcTemplate" ref="dataLoaderTemplate" />
<property name="enableFallbackInsert" value="true" />
<property name="enableFallbackUpdate" value="true" />
<property name="parameterService" ref="parameterService" />
<property name="allowMissingDelete" value="true" />
</bean>

Expand Down

0 comments on commit 7f8309c

Please sign in to comment.