Skip to content

Commit

Permalink
working on supporting streaming during the captured data extract
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Dec 30, 2011
1 parent d83aa4b commit 5cd8485
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 86 deletions.
Expand Up @@ -40,7 +40,7 @@
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.util.AppUtils;
import org.jumpmind.util.CollectionUtils;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
Expand Down Expand Up @@ -74,7 +74,7 @@ protected void selectAndEnhanceWithLobsIfEnabled(Data data, DataExtractorContext
.orderColumns(columnNames, table);
Object[] objectValues = dbDialect.getPlatform().getObjectValues(
dbDialect.getBinaryEncoding(), rowData, orderedColumns);
Map<String, Object> columnDataMap = AppUtils.toMap(columnNames,
Map<String, Object> columnDataMap = CollectionUtils.toMap(columnNames,
objectValues);
Column[] pkColumns = table.getPrimaryKeyColumns();
String sql = buildSelect(table, lobColumns, pkColumns);
Expand Down
Expand Up @@ -45,7 +45,7 @@
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataProcessor;
import org.jumpmind.symmetric.io.data.IDataProcessorListener;
import org.jumpmind.symmetric.io.data.reader.CsvDataReader;
import org.jumpmind.symmetric.io.data.reader.TextualCsvDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.io.data.transform.TransformTable;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
Expand Down Expand Up @@ -217,7 +217,7 @@ protected List<IncomingBatch> loadDataAndReturnBatches(String sourceNodeId,
totalNetworkMillis = System.currentTimeMillis() - totalNetworkMillis;
}

CsvDataReader reader = new CsvDataReader(transport.open());
TextualCsvDataReader reader = new TextualCsvDataReader(transport.open());
DatabaseWriterSettings settings = buildDatabaseWriterSettings();

TransformTable[] transforms = null;
Expand All @@ -232,7 +232,7 @@ protected List<IncomingBatch> loadDataAndReturnBatches(String sourceNodeId,
TransformDatabaseWriter writer = new TransformDatabaseWriter(dbDialect.getPlatform(),
settings, null, transforms, filters.toArray(new IDatabaseWriterFilter[filters
.size()]));
DataProcessor<CsvDataReader, TransformDatabaseWriter> processor = new DataProcessor<CsvDataReader, TransformDatabaseWriter>(
DataProcessor<TextualCsvDataReader, TransformDatabaseWriter> processor = new DataProcessor<TextualCsvDataReader, TransformDatabaseWriter>(
reader, writer, listener);
processor.process();

Expand Down Expand Up @@ -388,18 +388,18 @@ public void setConfigurationService(IConfigurationService configurationService)
}

class ManageIncomingBatchListener implements
IDataProcessorListener<CsvDataReader, TransformDatabaseWriter> {
IDataProcessorListener<TextualCsvDataReader, TransformDatabaseWriter> {

private List<IncomingBatch> batchesProcessed = new ArrayList<IncomingBatch>();

private IncomingBatch currentBatch;

public void beforeBatchEnd(DataContext<CsvDataReader, TransformDatabaseWriter> context) {
public void beforeBatchEnd(DataContext<TextualCsvDataReader, TransformDatabaseWriter> context) {
enableSyncTriggers(context);
}

public boolean beforeBatchStarted(
DataContext<CsvDataReader, TransformDatabaseWriter> context) {
DataContext<TextualCsvDataReader, TransformDatabaseWriter> context) {
this.currentBatch = null;
Batch batch = context.getBatch();
if (parameterService.is(ParameterConstants.DATA_LOADER_ENABLED)
Expand All @@ -415,13 +415,13 @@ public boolean beforeBatchStarted(
return false;
}

public void afterBatchStarted(DataContext<CsvDataReader, TransformDatabaseWriter> context) {
public void afterBatchStarted(DataContext<TextualCsvDataReader, TransformDatabaseWriter> context) {
Batch batch = context.getBatch();
dbDialect.disableSyncTriggers(context.getWriter().getDatabaseWriter().getTransaction(),
batch.getSourceNodeId());
}

public void batchSuccessful(DataContext<CsvDataReader, TransformDatabaseWriter> context) {
public void batchSuccessful(DataContext<TextualCsvDataReader, TransformDatabaseWriter> context) {
Batch batch = context.getBatch();
this.currentBatch.setValues(context.getReader().getStatistics().get(batch), context
.getWriter().getStatistics().get(batch), true);
Expand All @@ -436,7 +436,7 @@ public void batchSuccessful(DataContext<CsvDataReader, TransformDatabaseWriter>
}

protected void enableSyncTriggers(
DataContext<CsvDataReader, TransformDatabaseWriter> context) {
DataContext<TextualCsvDataReader, TransformDatabaseWriter> context) {
try {
ISqlTransaction transaction = context.getWriter().getDatabaseWriter()
.getTransaction();
Expand All @@ -448,7 +448,7 @@ protected void enableSyncTriggers(
}
}

public void batchInError(DataContext<CsvDataReader, TransformDatabaseWriter> context,
public void batchInError(DataContext<TextualCsvDataReader, TransformDatabaseWriter> context,
Exception ex) {
Batch batch = context.getBatch();
this.currentBatch.setValues(context.getReader().getStatistics().get(batch), context
Expand Down
Expand Up @@ -26,7 +26,6 @@
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TimeZone;
Expand Down Expand Up @@ -239,18 +238,6 @@ public static void runBsh(Map<String, Object> variables, String script) {
}
}

public static <T> Map<String, T> toMap(String[] keyNames, T[] values) {
if (values != null && keyNames != null && values.length >= keyNames.length) {
Map<String, T> map = new HashMap<String, T>(keyNames.length);
for (int i = 0; i < keyNames.length; i++) {
map.put(keyNames[i], values[i]);
}
return map;
} else {
return new HashMap<String, T>(0);
}
}

/**
* Checks to see if a specific port is available.
*
Expand Down
Expand Up @@ -395,6 +395,17 @@ public boolean isBlob(int type) {
return type == Types.BLOB || type == Types.BINARY || type == Types.VARBINARY
|| type == Types.LONGVARBINARY || type == -10;
}

public List<Column> getLobColumns(Table table) {
List<Column> lobColumns = new ArrayList<Column>(1);
Column[] allColumns = table.getColumns();
for (Column column : allColumns) {
if (isLob(column.getTypeCode())) {
lobColumns.add(column);
}
}
return lobColumns;
}

public boolean isLob(int type) {
return type == Types.CLOB || type == Types.BLOB || type == Types.BINARY
Expand Down
Expand Up @@ -19,6 +19,8 @@
* under the License.
*/

import java.util.List;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.Table;
Expand Down Expand Up @@ -197,6 +199,8 @@ public Object[] getObjectValues(BinaryEncoding encoding, Table table, String[] c
public boolean isClob(int type);

public boolean isBlob(int type);

public List<Column> getLobColumns(Table table);

public boolean isPrimaryKeyViolation(Exception ex);

Expand Down
Expand Up @@ -5,19 +5,22 @@

import org.jumpmind.db.model.Table;


/**
* This interface insulates the application from the data connection technology.
*/
public interface ISqlTemplate {

public byte[] queryForBlob(String sql, Object... args);

public String queryForClob(String sql, Object... args);

public <T> T queryForObject(String sql, Class<T> clazz, Object... params);

public Map<String, Object> queryForMap(String sql, Object... params);

public int queryForInt(String sql);
public <T> ISqlReadCursor<T> queryForCursor(Query query, ISqlRowMapper<T> mapper) ;

public <T> ISqlReadCursor<T> queryForCursor(Query query, ISqlRowMapper<T> mapper);

public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper,
Object[] params, int[] types);
Expand All @@ -29,14 +32,15 @@ public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper,
public List<Row> query(String sql, Object[] params, int[] types);

public <T> List<T> query(String sql, ISqlRowMapper<T> mapper);

public <T> List<T> query(String sql, ISqlRowMapper<T> mapper, Object... params);

public <T> List<T> query(String sql, ISqlRowMapper<T> mapper, Object[] params, int[] types);

public <T> List<T> query(Query query, ISqlRowMapper<T> mapper);

public <T,W> Map<T,W> query(String sql, String keyCol, String valueCol, Object[] params, int[] types);

public <T, W> Map<T, W> query(String sql, String keyCol, String valueCol, Object[] params,
int[] types);

public int update(String sql);

Expand All @@ -45,27 +49,27 @@ public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper,
public int update(boolean autoCommit, boolean failOnError, int commitRate, String... sql);

public int update(String sql, Object[] values, int[] types);

public int update(String sql, Object[] values);
public int update(Table table, Map<String,Object> params);
public int insert(Table table, Map<String, Object> params);
public int delete(Table table, Map<String, Object> params);
public void save(Table table, Map<String, Object> params);

public int update(Table table, Map<String, Object> params);

public int insert(Table table, Map<String, Object> params);

public int delete(Table table, Map<String, Object> params);

public void save(Table table, Map<String, Object> params);

public void testConnection();

public SqlException translate(Exception ex);

public ISqlTransaction startSqlTransaction();

public int getDatabaseMajorVersion();

public int getDatabaseMinorVersion();
public String getDatabaseProductName();

public String getDatabaseProductName();

}
Expand Up @@ -108,6 +108,7 @@ public void removeData(String key) {
}

public void putCsvData(String key, String data) {
removeData(key);
if (csvData == null) {
csvData = new HashMap<String, String>(2);
}
Expand Down Expand Up @@ -150,6 +151,7 @@ public boolean[] getChangedDataIndicators() {
}

public void putParsedData(String key, String[] data) {
removeData(key);
if (parsedCsvData == null) {
parsedCsvData = new HashMap<String, String[]>(2);
}
Expand Down

0 comments on commit 5cd8485

Please sign in to comment.