diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/csv/AbstractStreamDataCommand.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/csv/AbstractStreamDataCommand.java index 8ffd1cff10..aec43446cd 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/csv/AbstractStreamDataCommand.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/csv/AbstractStreamDataCommand.java @@ -40,7 +40,7 @@ import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerRouter; import org.jumpmind.symmetric.service.ITriggerRouterService; -import org.jumpmind.symmetric.util.AppUtils; +import org.jumpmind.util.CollectionUtils; import org.springframework.dao.IncorrectResultSizeDataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; @@ -74,7 +74,7 @@ protected void selectAndEnhanceWithLobsIfEnabled(Data data, DataExtractorContext .orderColumns(columnNames, table); Object[] objectValues = dbDialect.getPlatform().getObjectValues( dbDialect.getBinaryEncoding(), rowData, orderedColumns); - Map columnDataMap = AppUtils.toMap(columnNames, + Map columnDataMap = CollectionUtils.toMap(columnNames, objectValues); Column[] pkColumns = table.getPrimaryKeyColumns(); String sql = buildSelect(table, lobColumns, pkColumns); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 18902660e3..460284bf31 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -45,7 +45,7 @@ import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.data.DataProcessor; import org.jumpmind.symmetric.io.data.IDataProcessorListener; -import org.jumpmind.symmetric.io.data.reader.CsvDataReader; +import org.jumpmind.symmetric.io.data.reader.TextualCsvDataReader; import org.jumpmind.symmetric.io.data.transform.TransformPoint; import org.jumpmind.symmetric.io.data.transform.TransformTable; import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings; @@ -217,7 +217,7 @@ protected List loadDataAndReturnBatches(String sourceNodeId, totalNetworkMillis = System.currentTimeMillis() - totalNetworkMillis; } - CsvDataReader reader = new CsvDataReader(transport.open()); + TextualCsvDataReader reader = new TextualCsvDataReader(transport.open()); DatabaseWriterSettings settings = buildDatabaseWriterSettings(); TransformTable[] transforms = null; @@ -232,7 +232,7 @@ protected List loadDataAndReturnBatches(String sourceNodeId, TransformDatabaseWriter writer = new TransformDatabaseWriter(dbDialect.getPlatform(), settings, null, transforms, filters.toArray(new IDatabaseWriterFilter[filters .size()])); - DataProcessor processor = new DataProcessor( + DataProcessor processor = new DataProcessor( reader, writer, listener); processor.process(); @@ -388,18 +388,18 @@ public void setConfigurationService(IConfigurationService configurationService) } class ManageIncomingBatchListener implements - IDataProcessorListener { + IDataProcessorListener { private List batchesProcessed = new ArrayList(); private IncomingBatch currentBatch; - public void beforeBatchEnd(DataContext context) { + public void beforeBatchEnd(DataContext context) { enableSyncTriggers(context); } public boolean beforeBatchStarted( - DataContext context) { + DataContext context) { this.currentBatch = null; Batch batch = context.getBatch(); if (parameterService.is(ParameterConstants.DATA_LOADER_ENABLED) @@ -415,13 +415,13 @@ public boolean beforeBatchStarted( return false; } - public void afterBatchStarted(DataContext context) { + public void afterBatchStarted(DataContext context) { Batch batch = context.getBatch(); dbDialect.disableSyncTriggers(context.getWriter().getDatabaseWriter().getTransaction(), batch.getSourceNodeId()); } - public void batchSuccessful(DataContext context) { + public void batchSuccessful(DataContext context) { Batch batch = context.getBatch(); this.currentBatch.setValues(context.getReader().getStatistics().get(batch), context .getWriter().getStatistics().get(batch), true); @@ -436,7 +436,7 @@ public void batchSuccessful(DataContext } protected void enableSyncTriggers( - DataContext context) { + DataContext context) { try { ISqlTransaction transaction = context.getWriter().getDatabaseWriter() .getTransaction(); @@ -448,7 +448,7 @@ protected void enableSyncTriggers( } } - public void batchInError(DataContext context, + public void batchInError(DataContext context, Exception ex) { Batch batch = context.getBatch(); this.currentBatch.setValues(context.getReader().getStatistics().get(batch), context diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/util/AppUtils.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/util/AppUtils.java index 342b38c145..1155e25d57 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/util/AppUtils.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/util/AppUtils.java @@ -26,7 +26,6 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.util.Date; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.TimeZone; @@ -239,18 +238,6 @@ public static void runBsh(Map variables, String script) { } } - public static Map toMap(String[] keyNames, T[] values) { - if (values != null && keyNames != null && values.length >= keyNames.length) { - Map map = new HashMap(keyNames.length); - for (int i = 0; i < keyNames.length; i++) { - map.put(keyNames[i], values[i]); - } - return map; - } else { - return new HashMap(0); - } - } - /** * Checks to see if a specific port is available. * diff --git a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/AbstractDatabasePlatform.java b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/AbstractDatabasePlatform.java index 3039d86fa9..266bf21a9e 100644 --- a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/AbstractDatabasePlatform.java +++ b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/AbstractDatabasePlatform.java @@ -395,6 +395,17 @@ public boolean isBlob(int type) { return type == Types.BLOB || type == Types.BINARY || type == Types.VARBINARY || type == Types.LONGVARBINARY || type == -10; } + + public List getLobColumns(Table table) { + List lobColumns = new ArrayList(1); + Column[] allColumns = table.getColumns(); + for (Column column : allColumns) { + if (isLob(column.getTypeCode())) { + lobColumns.add(column); + } + } + return lobColumns; + } public boolean isLob(int type) { return type == Types.CLOB || type == Types.BLOB || type == Types.BINARY diff --git a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/IDatabasePlatform.java b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/IDatabasePlatform.java index 417aeaff16..091676500c 100644 --- a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/IDatabasePlatform.java +++ b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/IDatabasePlatform.java @@ -19,6 +19,8 @@ * under the License. */ +import java.util.List; + import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Database; import org.jumpmind.db.model.Table; @@ -197,6 +199,8 @@ public Object[] getObjectValues(BinaryEncoding encoding, Table table, String[] c public boolean isClob(int type); public boolean isBlob(int type); + + public List getLobColumns(Table table); public boolean isPrimaryKeyViolation(Exception ex); diff --git a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/sql/ISqlTemplate.java b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/sql/ISqlTemplate.java index 86f2eff554..27208a19cc 100644 --- a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/sql/ISqlTemplate.java +++ b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/sql/ISqlTemplate.java @@ -5,19 +5,22 @@ import org.jumpmind.db.model.Table; - /** * This interface insulates the application from the data connection technology. */ public interface ISqlTemplate { + public byte[] queryForBlob(String sql, Object... args); + + public String queryForClob(String sql, Object... args); + public T queryForObject(String sql, Class clazz, Object... params); - + public Map queryForMap(String sql, Object... params); public int queryForInt(String sql); - - public ISqlReadCursor queryForCursor(Query query, ISqlRowMapper mapper) ; + + public ISqlReadCursor queryForCursor(Query query, ISqlRowMapper mapper); public ISqlReadCursor queryForCursor(String sql, ISqlRowMapper mapper, Object[] params, int[] types); @@ -29,14 +32,15 @@ public ISqlReadCursor queryForCursor(String sql, ISqlRowMapper mapper, public List query(String sql, Object[] params, int[] types); public List query(String sql, ISqlRowMapper mapper); - + public List query(String sql, ISqlRowMapper mapper, Object... params); public List query(String sql, ISqlRowMapper mapper, Object[] params, int[] types); - + public List query(Query query, ISqlRowMapper mapper); - - public Map query(String sql, String keyCol, String valueCol, Object[] params, int[] types); + + public Map query(String sql, String keyCol, String valueCol, Object[] params, + int[] types); public int update(String sql); @@ -45,27 +49,27 @@ public ISqlReadCursor queryForCursor(String sql, ISqlRowMapper mapper, public int update(boolean autoCommit, boolean failOnError, int commitRate, String... sql); public int update(String sql, Object[] values, int[] types); - + public int update(String sql, Object[] values); - - public int update(Table table, Map params); - - public int insert(Table table, Map params); - - public int delete(Table table, Map params); - - public void save(Table table, Map params); - + + public int update(Table table, Map params); + + public int insert(Table table, Map params); + + public int delete(Table table, Map params); + + public void save(Table table, Map params); + public void testConnection(); public SqlException translate(Exception ex); public ISqlTransaction startSqlTransaction(); - + public int getDatabaseMajorVersion(); - + public int getDatabaseMinorVersion(); - - public String getDatabaseProductName(); + + public String getDatabaseProductName(); } diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/CsvData.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/CsvData.java index 3c7a9fa5ea..7a7fadaac6 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/CsvData.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/CsvData.java @@ -108,6 +108,7 @@ public void removeData(String key) { } public void putCsvData(String key, String data) { + removeData(key); if (csvData == null) { csvData = new HashMap(2); } @@ -150,6 +151,7 @@ public boolean[] getChangedDataIndicators() { } public void putParsedData(String key, String[] data) { + removeData(key); if (parsedCsvData == null) { parsedCsvData = new HashMap(2); } diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/BatchCsvDataReader.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/DatabaseCapturedCsvDataReader.java similarity index 51% rename from symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/BatchCsvDataReader.java rename to symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/DatabaseCapturedCsvDataReader.java index cc470201a3..72a7f480f8 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/BatchCsvDataReader.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/DatabaseCapturedCsvDataReader.java @@ -6,10 +6,16 @@ import java.util.List; import java.util.Map; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang.ArrayUtils; +import org.jumpmind.db.BinaryEncoding; import org.jumpmind.db.IDatabasePlatform; +import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Table; import org.jumpmind.db.sql.ISqlReadCursor; import org.jumpmind.db.sql.ISqlRowMapper; +import org.jumpmind.db.sql.ISqlTemplate; import org.jumpmind.db.sql.Row; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.CsvData; @@ -17,9 +23,10 @@ import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.IDataReader; import org.jumpmind.symmetric.io.data.IDataWriter; +import org.jumpmind.util.CollectionUtils; import org.jumpmind.util.Statistics; -public class BatchCsvDataReader implements IDataReader { +public class DatabaseCapturedCsvDataReader implements IDataReader { protected String selectSql; @@ -31,22 +38,23 @@ public class BatchCsvDataReader implements IDataReader { protected List batchesToSend; - protected Map csvDataSettings; + protected Map settings; protected Batch batch; - protected CsvDataSettings csvDataSetting; + protected DatabaseCaptureSettings setting; protected CsvData data; protected boolean extractOldData = true; - public BatchCsvDataReader(IDatabasePlatform platform, String sql, - Map csvDataSettings, boolean extractOldData, Batch... batches) { + public DatabaseCapturedCsvDataReader(IDatabasePlatform platform, String sql, + Map csvDataSettings, boolean extractOldData, + Batch... batches) { this.selectSql = sql; this.extractOldData = extractOldData; this.platform = platform; - this.csvDataSettings = csvDataSettings; + this.settings = csvDataSettings; this.batchesToSend = new ArrayList(batches.length); for (Batch batch : batches) { this.batchesToSend.add(batch); @@ -80,17 +88,17 @@ protected void closeDataCursor() { } public Table nextTable() { - csvDataSetting = null; + setting = null; data = this.dataCursor.next(); if (data != null) { - csvDataSetting = csvDataSettings.get(data.getAttribute(CsvData.ATTRIBUTE_TABLE_ID)); - if (csvDataSetting == null) { + setting = settings.get(data.getAttribute(CsvData.ATTRIBUTE_TABLE_ID)); + if (setting == null) { throw new RuntimeException(String.format( "Table mapping for id of %d was not found", data.getAttribute(CsvData.ATTRIBUTE_TABLE_ID))); } } - return csvDataSetting != null ? csvDataSetting.getTableMetaData() : null; + return setting != null ? setting.getTableMetaData() : null; } public CsvData nextData() { @@ -100,12 +108,10 @@ public CsvData nextData() { CsvData returnData = null; if (data != null) { - CsvDataSettings newCsvDataSetting = csvDataSettings.get(data + DatabaseCaptureSettings newCsvDataSetting = settings.get(data .getAttribute(CsvData.ATTRIBUTE_TABLE_ID)); - if (newCsvDataSetting != null - && csvDataSetting != null - && newCsvDataSetting.getTableMetaData().equals( - csvDataSetting.getTableMetaData())) { + if (newCsvDataSetting != null && setting != null + && newCsvDataSetting.getTableMetaData().equals(setting.getTableMetaData())) { returnData = data; data = null; } @@ -121,23 +127,87 @@ public Map getStatistics() { return statistics; } - protected String enhanceWithLobsFromTargetIfNeeded(String rowData) { - // TODO - return rowData; + protected void enhanceWithLobsFromTargetIfNeeded(CsvData data) { + Table table = setting.getTableMetaData(); + table = platform.getTableFromCache(table.getCatalog(), table.getSchema(), table.getName(), + false); + if (table != null) { + List lobColumns = platform.getLobColumns(table); + if (lobColumns.size() > 0) { + String[] columnNames = table.getColumnNames(); + String[] rowData = data.getParsedData(CsvData.ROW_DATA); + Column[] orderedColumns = table.getColumns(); + Object[] objectValues = platform.getObjectValues(batch.getBinaryEncoding(), + rowData, orderedColumns); + Map columnDataMap = CollectionUtils + .toMap(columnNames, objectValues); + Column[] pkColumns = table.getPrimaryKeyColumns(); + ISqlTemplate sqlTemplate = platform.getSqlTemplate(); + Object[] args = new Object[pkColumns.length]; + for (int i = 0; i < pkColumns.length; i++) { + args[i] = columnDataMap.get(pkColumns[i].getName()); + } + + for (Column lobColumn : lobColumns) { + String sql = buildSelect(table, lobColumn, pkColumns); + String valueForCsv = null; + if (platform.isBlob(lobColumn.getTypeCode())) { + byte[] binaryData = sqlTemplate.queryForBlob(sql, args); + if (batch.getBinaryEncoding() == BinaryEncoding.BASE64) { + valueForCsv = new String(Base64.encodeBase64(binaryData)); + } else if (batch.getBinaryEncoding() == BinaryEncoding.HEX) { + valueForCsv = new String(Hex.encodeHex(binaryData)); + } else { + valueForCsv = new String(binaryData); + } + } else { + valueForCsv = sqlTemplate.queryForClob(sql, args); + } + + int index = ArrayUtils.indexOf(columnNames, lobColumn.getName()); + rowData[index] = valueForCsv; + + } + + data.putParsedData(CsvData.ROW_DATA, rowData); + } + } + } + + protected String buildSelect(Table table, Column lobColumn, Column[] pkColumns) { + StringBuilder sql = new StringBuilder("select "); + String quote = platform.getPlatformInfo().getIdentifierQuoteString(); + sql.append(quote); + sql.append(lobColumn.getName()); + sql.append(quote); + sql.append(","); + sql.delete(sql.length() - 1, sql.length()); + sql.append(" from "); + sql.append(table.getFullyQualifiedTableName()); + sql.append(" where "); + for (Column col : pkColumns) { + sql.append(quote); + sql.append(col.getName()); + sql.append(quote); + sql.append("=? and "); + } + sql.delete(sql.length() - 5, sql.length()); + return sql.toString(); } class CsvDataRowMapper implements ISqlRowMapper { public CsvData mapRow(Row row) { CsvData data = new CsvData(); - String rowData = row.getString("ROW_DATA"); - if (rowData != null && csvDataSetting.isSelectLobsFromTarget()) { - rowData = enhanceWithLobsFromTargetIfNeeded(rowData); - } - data.putCsvData(CsvData.ROW_DATA, rowData); + data.putCsvData(CsvData.ROW_DATA, row.getString("ROW_DATA")); data.putCsvData(CsvData.PK_DATA, row.getString("PK_DATA")); if (extractOldData) { data.putCsvData(CsvData.OLD_DATA, row.getString("OLD_DATA")); } + + if (setting.isSelectLobsFromTarget()) { + enhanceWithLobsFromTargetIfNeeded(data); + } + data.putAttribute(CsvData.ATTRIBUTE_CHANNEL_ID, row.getString("CHANNEL_ID")); data.putAttribute(CsvData.ATTRIBUTE_TX_ID, row.getString("TRANSACTION_ID")); data.setDataEventType(DataEventType.getEventType(row.getString("EVENT_TYPE"))); @@ -150,11 +220,13 @@ public CsvData mapRow(Row row) { } } - public static class CsvDataSettings { + public static class DatabaseCaptureSettings { + protected boolean selectLobsFromTarget = false; + protected Table tableMetaData; - public CsvDataSettings(boolean useStreamLobs, Table tableMetaData) { + public DatabaseCaptureSettings(boolean useStreamLobs, Table tableMetaData) { this.selectLobsFromTarget = useStreamLobs; this.tableMetaData = tableMetaData; } diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/CsvDataReader.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/TextualCsvDataReader.java similarity index 97% rename from symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/CsvDataReader.java rename to symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/TextualCsvDataReader.java index a2d76a5392..6239f0b907 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/CsvDataReader.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/TextualCsvDataReader.java @@ -32,7 +32,7 @@ import org.jumpmind.symmetric.io.data.IDataWriter; import org.jumpmind.util.Statistics; -public class CsvDataReader implements IDataReader { +public class TextualCsvDataReader implements IDataReader { protected Log log = LogFactory.getLog(getClass()); @@ -48,11 +48,11 @@ public class CsvDataReader implements IDataReader { protected String sourceNodeId; protected BinaryEncoding binaryEncoding; - public CsvDataReader(StringBuilder input) { + public TextualCsvDataReader(StringBuilder input) { this(new BufferedReader(new StringReader(input.toString()))); } - public CsvDataReader(InputStream is) { + public TextualCsvDataReader(InputStream is) { this(toReader(is)); } @@ -64,15 +64,15 @@ protected static Reader toReader(InputStream is) { } } - public CsvDataReader(String input) { + public TextualCsvDataReader(String input) { this(new BufferedReader(new StringReader(input))); } - public CsvDataReader(Reader reader) { + public TextualCsvDataReader(Reader reader) { this.reader = reader; } - public CsvDataReader(File file) { + public TextualCsvDataReader(File file) { try { FileInputStream fis = new FileInputStream(file); InputStreamReader in = new InputStreamReader(fis, "UTF-8"); diff --git a/symmetric/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/reader/CsvDataReaderTest.java b/symmetric/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/reader/CsvDataReaderTest.java index ee93319b0c..2df93a9aae 100644 --- a/symmetric/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/reader/CsvDataReaderTest.java +++ b/symmetric/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/reader/CsvDataReaderTest.java @@ -25,8 +25,8 @@ public void testSimpleRead() { putInsert(builder, 4); endCsv(builder); - CsvDataReader reader = new CsvDataReader(builder); - DataContext ctx = new DataContext(reader, null); + TextualCsvDataReader reader = new TextualCsvDataReader(builder); + DataContext ctx = new DataContext(reader, null); reader.open(ctx); Batch batch = reader.nextBatch(); @@ -94,8 +94,8 @@ public void testTableContextSwitch() { putInsert(builder, 2); endCsv(builder); - CsvDataReader reader = new CsvDataReader(builder); - DataContext ctx = new DataContext(reader, null); + TextualCsvDataReader reader = new TextualCsvDataReader(builder); + DataContext ctx = new DataContext(reader, null); reader.open(ctx); Batch batch = reader.nextBatch(); diff --git a/symmetric/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/FileCsvDataWriterTest.java b/symmetric/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/FileCsvDataWriterTest.java index e016b72c20..12769b15ad 100644 --- a/symmetric/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/FileCsvDataWriterTest.java +++ b/symmetric/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/FileCsvDataWriterTest.java @@ -16,7 +16,7 @@ import org.jumpmind.symmetric.io.MemoryIoResource; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.DataProcessor; -import org.jumpmind.symmetric.io.data.reader.CsvDataReader; +import org.jumpmind.symmetric.io.data.reader.TextualCsvDataReader; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -54,9 +54,9 @@ public void readThenWrite(long threshold) throws Exception { String origCsv = IOUtils.toString(is); is.close(); - CsvDataReader reader = new CsvDataReader(origCsv); + TextualCsvDataReader reader = new TextualCsvDataReader(origCsv); FileCsvDataWriter writer = new FileCsvDataWriter(DIR, threshold, new BatchListener()); - DataProcessor processor = new DataProcessor( + DataProcessor processor = new DataProcessor( reader, writer); processor.process(); diff --git a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/jdbc/JdbcSqlTemplate.java b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/jdbc/JdbcSqlTemplate.java index 640881d587..b3a8e48580 100644 --- a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/jdbc/JdbcSqlTemplate.java +++ b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/jdbc/JdbcSqlTemplate.java @@ -23,6 +23,7 @@ import org.jumpmind.log.LogFactory; import org.jumpmind.log.LogLevel; import org.jumpmind.util.LinkedCaseInsensitiveMap; +import org.springframework.jdbc.support.lob.DefaultLobHandler; import org.springframework.jdbc.support.lob.LobHandler; public class JdbcSqlTemplate extends AbstractSqlTemplate implements ISqlTemplate { @@ -40,7 +41,7 @@ public class JdbcSqlTemplate extends AbstractSqlTemplate implements ISqlTemplate public JdbcSqlTemplate(DataSource dataSource, DatabasePlatformSettings settings, LobHandler lobHandler) { this.dataSource = dataSource; this.settings = settings; - this.lobHandler = lobHandler; + this.lobHandler = lobHandler == null ? new DefaultLobHandler() : lobHandler; } public DataSource getDataSource() { @@ -96,6 +97,52 @@ public T execute(Connection con) throws SQLException { } }); } + + public byte[] queryForBlob(final String sql, final Object... args) { + return execute(new IConnectionCallback() { + public byte[] execute(Connection con) throws SQLException { + byte[] result = null; + PreparedStatement ps = null; + ResultSet rs = null; + try { + ps = con.prepareStatement(sql); + ps.setQueryTimeout(settings.getQueryTimeout()); + JdbcUtils.setValues(ps, args); + rs = ps.executeQuery(); + if (rs.next()) { + result = lobHandler.getBlobAsBytes(rs, 1); + } + } finally { + close(rs); + close(ps); + } + return result; + } + }); + } + + public String queryForClob(final String sql, final Object... args) { + return execute(new IConnectionCallback() { + public String execute(Connection con) throws SQLException { + String result = null; + PreparedStatement ps = null; + ResultSet rs = null; + try { + ps = con.prepareStatement(sql); + ps.setQueryTimeout(settings.getQueryTimeout()); + JdbcUtils.setValues(ps, args); + rs = ps.executeQuery(); + if (rs.next()) { + result = lobHandler.getClobAsString(rs, 1); + } + } finally { + close(rs); + close(ps); + } + return result; + } + }); + } public Map queryForMap(final String sql, final Object... args) { return execute(new IConnectionCallback>() { diff --git a/symmetric/symmetric-util/src/main/java/org/jumpmind/util/CollectionUtils.java b/symmetric/symmetric-util/src/main/java/org/jumpmind/util/CollectionUtils.java new file mode 100644 index 0000000000..e73acb9657 --- /dev/null +++ b/symmetric/symmetric-util/src/main/java/org/jumpmind/util/CollectionUtils.java @@ -0,0 +1,20 @@ +package org.jumpmind.util; + +import java.util.HashMap; +import java.util.Map; + +public class CollectionUtils { + + public static Map toMap(String[] keyNames, T[] values) { + if (values != null && keyNames != null && values.length >= keyNames.length) { + Map map = new HashMap(keyNames.length); + for (int i = 0; i < keyNames.length; i++) { + map.put(keyNames[i], values[i]); + } + return map; + } else { + return new HashMap(0); + } + } + +}