diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/load/IColumnFilter.java b/symmetric/src/main/java/org/jumpmind/symmetric/load/IColumnFilter.java new file mode 100644 index 0000000000..04d3f7277e --- /dev/null +++ b/symmetric/src/main/java/org/jumpmind/symmetric/load/IColumnFilter.java @@ -0,0 +1,9 @@ +package org.jumpmind.symmetric.load; + +import org.jumpmind.symmetric.load.StatementBuilder.DmlType; + +public interface IColumnFilter { + + public String[] filterColumnsNames(DmlType dml, String[] columnNames); + public Object[] filterColumnsValues(DmlType dml, Object[] columnValues); +} diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/load/IDataLoader.java b/symmetric/src/main/java/org/jumpmind/symmetric/load/IDataLoader.java index d626bf81de..4b2056b093 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/load/IDataLoader.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/load/IDataLoader.java @@ -24,6 +24,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.util.List; +import java.util.Map; import org.springframework.transaction.annotation.Transactional; @@ -31,7 +32,7 @@ public interface IDataLoader extends Cloneable { public void open(BufferedReader in) throws IOException; - public void open(BufferedReader in, List filters) throws IOException; + public void open(BufferedReader in, List filters, Map columnFilters) throws IOException; public boolean hasNext() throws IOException; diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/load/TableTemplate.java b/symmetric/src/main/java/org/jumpmind/symmetric/load/TableTemplate.java index 9b8384641c..9121fae3b5 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/load/TableTemplate.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/load/TableTemplate.java @@ -77,11 +77,14 @@ public class TableTemplate { private Column[] columnMetaData; private HashMap statementMap; + + private IColumnFilter columnFilter; public TableTemplate(JdbcTemplate jdbcTemplate, IDbDialect dbDialect, - String tableName) { + String tableName, IColumnFilter columnFilter) { this.jdbcTemplate = jdbcTemplate; this.dbDialect = dbDialect; + this.columnFilter = columnFilter; // TODO should we be passing the schema in the csv? table = dbDialect.getMetaDataFor(null, tableName, true); allMetaData = new HashMap(); @@ -105,6 +108,9 @@ public boolean isIgnoreThisTable() { public int insert(String[] columnValues) { StatementBuilder st = getStatementBuilder(DmlType.INSERT); Object[] values = filterValues(columnMetaData, columnValues); + if (this.columnFilter != null) { + values = this.columnFilter.filterColumnsValues(DmlType.INSERT, values); + } return jdbcTemplate.update(st.getSql(), values); } @@ -112,20 +118,30 @@ public int update(String[] columnValues, String[] keyValues) { StatementBuilder st = getStatementBuilder(DmlType.UPDATE); Object[] values = ArrayUtils.addAll(filterValues(columnMetaData, columnValues), filterValues(keyMetaData, keyValues)); + if (this.columnFilter != null) { + values = this.columnFilter.filterColumnsValues(DmlType.UPDATE, values); + } return jdbcTemplate.update(st.getSql(), values); } public int delete(String[] keyValues) { StatementBuilder st = getStatementBuilder(DmlType.DELETE); Object[] values = filterValues(keyMetaData, keyValues); + if (this.columnFilter != null) { + values = this.columnFilter.filterColumnsValues(DmlType.DELETE, values); + } return jdbcTemplate.update(st.getSql(), values); } private StatementBuilder getStatementBuilder(DmlType type) { StatementBuilder st = statementMap.get(type); + String[] statementColumns = existColumnNames; + if (this.columnFilter != null) { + statementColumns = this.columnFilter.filterColumnsNames(type, statementColumns); + } if (st == null) { st = new StatementBuilder(type, table.getName(), existKeyNames, - existColumnNames); + statementColumns); statementMap.put(type, st); } return st; diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java b/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java index 859b682071..753c053a47 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java @@ -24,6 +24,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; @@ -31,6 +32,7 @@ import org.jumpmind.symmetric.common.csv.CsvConstants; import org.jumpmind.symmetric.db.IDbDialect; import org.jumpmind.symmetric.load.DataLoaderStatistics; +import org.jumpmind.symmetric.load.IColumnFilter; import org.jumpmind.symmetric.load.IDataLoader; import org.jumpmind.symmetric.load.IDataLoaderContext; import org.jumpmind.symmetric.load.IDataLoaderFilter; @@ -62,6 +64,8 @@ public class CsvLoader implements IDataLoader { protected boolean allowMissingDelete; protected List filters; + + protected Map columnFilters; public void open(BufferedReader reader) throws IOException { csvReader = new CsvReader(reader); @@ -70,9 +74,10 @@ public void open(BufferedReader reader) throws IOException { stats = new DataLoaderStatistics(); } - public void open(BufferedReader reader, List filters) throws IOException { + public void open(BufferedReader reader, List filters, Map columnFilters) throws IOException { open(reader); this.filters = filters; + this.columnFilters = columnFilters; } public boolean hasNext() throws IOException { @@ -147,7 +152,7 @@ protected boolean isMetaTokenParsed(String[] tokens) { protected void setTable(String tableName) { context.setTableName(tableName); if (context.getTableTemplate() == null) { - context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, tableName)); + context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, tableName, this.columnFilters != null ? this.columnFilters.get(tableName) : null)); } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java index d055cfc607..202101a6b2 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java @@ -26,6 +26,7 @@ import java.io.OutputStream; import java.util.List; +import org.jumpmind.symmetric.load.IColumnFilter; import org.jumpmind.symmetric.load.IDataLoaderFilter; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.transport.IIncomingTransport; @@ -50,4 +51,6 @@ public interface IDataLoaderService { public void removeDataLoaderFilter(IDataLoaderFilter filter); public void setTransportManager(ITransportManager transportManager); + + public void addColumnFilter(String tableName, IColumnFilter filter); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 2278d255eb..c284f83772 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -26,13 +26,16 @@ import java.io.OutputStream; import java.net.ConnectException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ErrorConstants; import org.jumpmind.symmetric.db.IDbDialect; +import org.jumpmind.symmetric.load.IColumnFilter; import org.jumpmind.symmetric.load.IDataLoader; import org.jumpmind.symmetric.load.IDataLoaderFilter; import org.jumpmind.symmetric.model.IncomingBatch; @@ -69,6 +72,8 @@ public class DataLoaderService extends AbstractService implements protected BeanFactory beanFactory; protected List filters; + + protected Map columnFilters = new HashMap(); /** * Connect to the remote node and pull data. The acknowledgment of @@ -97,7 +102,7 @@ protected List loadDataAndReturnBatches( IncomingBatch status = null; IncomingBatchHistory history = null; try { - dataLoader.open(transport.open(), filters); + dataLoader.open(transport.open(), filters, columnFilters); while (dataLoader.hasNext()) { status = new IncomingBatch(dataLoader.getContext()); history = new IncomingBatchHistory(dataLoader.getContext()); @@ -237,4 +242,8 @@ public void setDbDialect(IDbDialect dbDialect) { this.dbDialect = dbDialect; } + public void addColumnFilter(String tableName, IColumnFilter filter) { + this.columnFilters.put(tableName, filter); + } + }