Skip to content

Commit

Permalink
Added IColumnFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Oct 11, 2007
1 parent aa6558c commit 25c3b18
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 6 deletions.
@@ -0,0 +1,9 @@
package org.jumpmind.symmetric.load;

import org.jumpmind.symmetric.load.StatementBuilder.DmlType;

public interface IColumnFilter {

public String[] filterColumnsNames(DmlType dml, String[] columnNames);
public Object[] filterColumnsValues(DmlType dml, Object[] columnValues);
}
Expand Up @@ -24,14 +24,15 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.springframework.transaction.annotation.Transactional;

public interface IDataLoader extends Cloneable {

public void open(BufferedReader in) throws IOException;

public void open(BufferedReader in, List<IDataLoaderFilter> filters) throws IOException;
public void open(BufferedReader in, List<IDataLoaderFilter> filters, Map<String,IColumnFilter> columnFilters) throws IOException;

public boolean hasNext() throws IOException;

Expand Down
Expand Up @@ -77,11 +77,14 @@ public class TableTemplate {
private Column[] columnMetaData;

private HashMap<DmlType, StatementBuilder> statementMap;

private IColumnFilter columnFilter;

public TableTemplate(JdbcTemplate jdbcTemplate, IDbDialect dbDialect,
String tableName) {
String tableName, IColumnFilter columnFilter) {
this.jdbcTemplate = jdbcTemplate;
this.dbDialect = dbDialect;
this.columnFilter = columnFilter;
// TODO should we be passing the schema in the csv?
table = dbDialect.getMetaDataFor(null, tableName, true);
allMetaData = new HashMap<String, Column>();
Expand All @@ -105,27 +108,40 @@ public boolean isIgnoreThisTable() {
public int insert(String[] columnValues) {
StatementBuilder st = getStatementBuilder(DmlType.INSERT);
Object[] values = filterValues(columnMetaData, columnValues);
if (this.columnFilter != null) {
values = this.columnFilter.filterColumnsValues(DmlType.INSERT, values);
}
return jdbcTemplate.update(st.getSql(), values);
}

public int update(String[] columnValues, String[] keyValues) {
StatementBuilder st = getStatementBuilder(DmlType.UPDATE);
Object[] values = ArrayUtils.addAll(filterValues(columnMetaData,
columnValues), filterValues(keyMetaData, keyValues));
if (this.columnFilter != null) {
values = this.columnFilter.filterColumnsValues(DmlType.UPDATE, values);
}
return jdbcTemplate.update(st.getSql(), values);
}

public int delete(String[] keyValues) {
StatementBuilder st = getStatementBuilder(DmlType.DELETE);
Object[] values = filterValues(keyMetaData, keyValues);
if (this.columnFilter != null) {
values = this.columnFilter.filterColumnsValues(DmlType.DELETE, values);
}
return jdbcTemplate.update(st.getSql(), values);
}

private StatementBuilder getStatementBuilder(DmlType type) {
StatementBuilder st = statementMap.get(type);
String[] statementColumns = existColumnNames;
if (this.columnFilter != null) {
statementColumns = this.columnFilter.filterColumnsNames(type, statementColumns);
}
if (st == null) {
st = new StatementBuilder(type, table.getName(), existKeyNames,
existColumnNames);
statementColumns);
statementMap.put(type, st);
}
return st;
Expand Down
Expand Up @@ -24,13 +24,15 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.csv.CsvConstants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.load.DataLoaderStatistics;
import org.jumpmind.symmetric.load.IColumnFilter;
import org.jumpmind.symmetric.load.IDataLoader;
import org.jumpmind.symmetric.load.IDataLoaderContext;
import org.jumpmind.symmetric.load.IDataLoaderFilter;
Expand Down Expand Up @@ -62,6 +64,8 @@ public class CsvLoader implements IDataLoader {
protected boolean allowMissingDelete;

protected List<IDataLoaderFilter> filters;

protected Map<String, IColumnFilter> columnFilters;

public void open(BufferedReader reader) throws IOException {
csvReader = new CsvReader(reader);
Expand All @@ -70,9 +74,10 @@ public void open(BufferedReader reader) throws IOException {
stats = new DataLoaderStatistics();
}

public void open(BufferedReader reader, List<IDataLoaderFilter> filters) throws IOException {
public void open(BufferedReader reader, List<IDataLoaderFilter> filters, Map<String, IColumnFilter> columnFilters) throws IOException {
open(reader);
this.filters = filters;
this.columnFilters = columnFilters;
}

public boolean hasNext() throws IOException {
Expand Down Expand Up @@ -147,7 +152,7 @@ protected boolean isMetaTokenParsed(String[] tokens) {
protected void setTable(String tableName) {
context.setTableName(tableName);
if (context.getTableTemplate() == null) {
context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, tableName));
context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, tableName, this.columnFilters != null ? this.columnFilters.get(tableName) : null));
}
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.io.OutputStream;
import java.util.List;

import org.jumpmind.symmetric.load.IColumnFilter;
import org.jumpmind.symmetric.load.IDataLoaderFilter;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.transport.IIncomingTransport;
Expand All @@ -50,4 +51,6 @@ public interface IDataLoaderService {
public void removeDataLoaderFilter(IDataLoaderFilter filter);

public void setTransportManager(ITransportManager transportManager);

public void addColumnFilter(String tableName, IColumnFilter filter);
}
Expand Up @@ -26,13 +26,16 @@
import java.io.OutputStream;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ErrorConstants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.load.IColumnFilter;
import org.jumpmind.symmetric.load.IDataLoader;
import org.jumpmind.symmetric.load.IDataLoaderFilter;
import org.jumpmind.symmetric.model.IncomingBatch;
Expand Down Expand Up @@ -69,6 +72,8 @@ public class DataLoaderService extends AbstractService implements
protected BeanFactory beanFactory;

protected List<IDataLoaderFilter> filters;

protected Map<String, IColumnFilter> columnFilters = new HashMap<String, IColumnFilter>();

/**
* Connect to the remote node and pull data. The acknowledgment of
Expand Down Expand Up @@ -97,7 +102,7 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(
IncomingBatch status = null;
IncomingBatchHistory history = null;
try {
dataLoader.open(transport.open(), filters);
dataLoader.open(transport.open(), filters, columnFilters);
while (dataLoader.hasNext()) {
status = new IncomingBatch(dataLoader.getContext());
history = new IncomingBatchHistory(dataLoader.getContext());
Expand Down Expand Up @@ -237,4 +242,8 @@ public void setDbDialect(IDbDialect dbDialect) {
this.dbDialect = dbDialect;
}

public void addColumnFilter(String tableName, IColumnFilter filter) {
this.columnFilters.put(tableName, filter);
}

}

0 comments on commit 25c3b18

Please sign in to comment.