Skip to content

Commit

Permalink
SYMMETRICDS-515 - use streaming api to get lobs out of database
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Sep 26, 2011
1 parent 2af9767 commit f65a716
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 31 deletions.
Expand Up @@ -97,6 +97,7 @@
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.jdbc.support.lob.DefaultLobHandler;
import org.springframework.jdbc.support.lob.LobHandler;

/**
Expand Down Expand Up @@ -179,17 +180,21 @@ protected AbstractDbDialect() {
_defaultSizes.put(new Integer(6), "15,0");
_defaultSizes.put(new Integer(8), "15,0");
_defaultSizes.put(new Integer(3), "15,15");
_defaultSizes.put(new Integer(2), "15,15");
_defaultSizes.put(new Integer(2), "15,15");
}

public String encodeForCsv(byte[] data) {
BinaryEncoding encoding = getBinaryEncoding();
if (BinaryEncoding.BASE64.equals(encoding)) {
return new String(Base64.encodeBase64(data));
} else if (BinaryEncoding.HEX.equals(encoding)) {
return new String(Hex.encodeHex(data));
if (data != null) {
BinaryEncoding encoding = getBinaryEncoding();
if (BinaryEncoding.BASE64.equals(encoding)) {
return new String(Base64.encodeBase64(data));
} else if (BinaryEncoding.HEX.equals(encoding)) {
return new String(Hex.encodeHex(data));
} else {
throw new NotImplementedException();
}
} else {
throw new NotImplementedException();
return null;
}
}

Expand Down Expand Up @@ -258,7 +263,8 @@ public Object doInConnection(Connection c) throws SQLException, DataAccessExcept
driverVersion = meta.getDriverVersion();
return null;
}
});
});
this.initLobHandler();
}

public int getQueryTimeoutInSeconds() {
Expand Down Expand Up @@ -1587,6 +1593,15 @@ public Map<String, String> getSqlScriptReplacementTokens() {

public boolean needsToSelectLobData() {
return false;
}

public boolean isClob(int type) {
return type == Types.CLOB || type == Types.LONGVARCHAR;
}

public boolean isBlob(int type) {
return type == Types.BLOB || type == Types.BINARY
|| type == Types.VARBINARY || type == Types.LONGVARBINARY || type == -10;
}

public boolean isLob(int type) {
Expand Down Expand Up @@ -1652,5 +1667,9 @@ public void cleanupTriggers() {

public void addDatabaseUpgradeListener(IDatabaseUpgradeListener listener) {
databaseUpgradeListeners.add(listener);
}

protected void initLobHandler() {
lobHandler = new DefaultLobHandler();
}
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
* under the License. */
package org.jumpmind.symmetric.db;

import java.sql.Types;
import java.util.Date;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -278,7 +279,11 @@ public long insertWithGeneratedKey(JdbcTemplate jdbcTemplate, final String sql,
*/
public boolean needsToSelectLobData();

public boolean isLob(int type);
public boolean isLob(int type);

public boolean isClob(int type);

public boolean isBlob(int type);

/**
* This is a SQL clause that compares the old data to the new data in a trigger.
Expand Down
Expand Up @@ -68,7 +68,8 @@ public void init(Platform pf, int queryTimeout, JdbcTemplate jdbcTemplate) {
}
}
}


@Override
protected void initLobHandler() {
lobHandler = new OracleLobHandler();
try {
Expand Down
Expand Up @@ -20,10 +20,11 @@
*/
package org.jumpmind.symmetric.extract.csv;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.ArrayUtils;
import org.jumpmind.symmetric.SymmetricException;
Expand All @@ -42,6 +43,8 @@
import org.jumpmind.symmetric.util.CsvUtils;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.support.lob.LobHandler;

abstract class AbstractStreamDataCommand implements IStreamDataCommand {

Expand All @@ -61,11 +64,11 @@ protected void selectAndEnhanceWithLobsIfEnabled(Data data, DataExtractorContext
Table table = dbDialect.getTable(trigger, true);
if (table != null) {
if (trigger.isUseStreamLobs()) {
List<Column> lobColumns = getLobColumns(table);
final List<Column> lobColumns = getLobColumns(table);
if (lobColumns.size() > 0) {
try {
String[] columnNames = triggerHistory.getParsedColumnNames();
String[] rowData = data.toParsedRowData();
final String[] columnNames = triggerHistory.getParsedColumnNames();
final String[] rowData = data.toParsedRowData();
Column[] orderedColumns = dbDialect
.orderColumns(columnNames, table);
Object[] objectValues = dbDialect.getObjectValues(
Expand All @@ -81,29 +84,36 @@ protected void selectAndEnhanceWithLobsIfEnabled(Data data, DataExtractorContext
args[i] = columnDataMap.get(pkColumns[i].getName());
types[i] = pkColumns[i].getTypeCode();
}
Map<String, Object> lobData = template
.queryForMap(sql, args, types);
Set<String> lobColumnNames = lobData.keySet();
for (String lobColumnName : lobColumnNames) {
Object value = lobData.get(lobColumnName);
String valueForCsv = null;
if (value instanceof byte[]) {
valueForCsv = dbDialect.encodeForCsv((byte[]) value);
} else if (value != null) {
valueForCsv = value.toString();
template.query(sql, args, types, new RowMapper<Object>() {
public Object mapRow(ResultSet rs, int rowNum)
throws SQLException {
LobHandler lobHandler = dbDialect.getLobHandler();
for (Column col : lobColumns) {
String valueForCsv = null;
if (dbDialect.isBlob(col.getTypeCode())) {
byte[] blobBytes = lobHandler.getBlobAsBytes(rs,
col.getName());
valueForCsv = dbDialect.encodeForCsv(blobBytes);
} else {
String clobText = lobHandler.getClobAsString(rs,
col.getName());
valueForCsv = clobText;
}
int index = ArrayUtils.indexOf(columnNames,
col.getName());
rowData[index] = valueForCsv;
}
return null;
}

int index = ArrayUtils.indexOf(columnNames, lobColumnName);
rowData[index] = valueForCsv;
}
});
data.setRowData(CsvUtils.escapeCsvData(rowData));
} catch (IncorrectResultSizeDataAccessException ex) {
// Row could have been deleted by the time we
// get around to extracting
log.warn("DataExtractorRowMissingCannotGetLobData",
data.getRowData());
} catch (Exception ex) {
throw new SymmetricException("DataExtractorTroubleExtractingLobData", data.getRowData());
throw new SymmetricException("DataExtractorTroubleExtractingLobData", ex, data.getRowData());
}

}
Expand Down
Expand Up @@ -6,8 +6,7 @@
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd" default-lazy-init="true">

<bean id="oracleDialect" class="org.jumpmind.symmetric.db.oracle.OracleDbDialect" init-method="initLobHandler"
scope="prototype">
<bean id="oracleDialect" class="org.jumpmind.symmetric.db.oracle.OracleDbDialect" scope="prototype">
<property name="tablePrefix" value="$[sym.sync.table.prefix]" />
<property name="parameterService" ref="parameterService" />
<property name="defaultSchema" value="$[sym.db.default.schema]" />
Expand Down

0 comments on commit f65a716

Please sign in to comment.