Skip to content
Permalink
Browse files

0003809: Data extract fallback to contains_big_lob

  • Loading branch information...
erilong committed Nov 28, 2018
1 parent 46a01dd commit 39c7f7b3b65d9203004b5e6ea5b652b61f9cc149
@@ -26,7 +26,6 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.service.IParameterService;

@@ -68,7 +67,7 @@ protected boolean doesTriggerExistOnPlatform(String catalog, String schema, Stri
}

@Override
public String massageDataExtractionSql(String sql, Channel channel) {
public String massageDataExtractionSql(String sql, boolean isContainsBigLob) {
/*
* Remove tranaction_id from the sql because DB2 doesn't support
* transactions. In fact, DB2 iSeries does return results because the
@@ -79,7 +78,7 @@ public String massageDataExtractionSql(String sql, Channel channel) {
if (!this.getParameterService().is(ParameterConstants.DB2_CAPTURE_TRANSACTION_ID, false)) {
sql = sql.replace("d.transaction_id, ", "");
}
return super.massageDataExtractionSql(sql, channel);
return super.massageDataExtractionSql(sql, isContainsBigLob);
}

protected String getSystemSchemaName() {
@@ -30,7 +30,6 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.service.IParameterService;
import org.springframework.jdbc.UncategorizedSQLException;
@@ -158,8 +157,8 @@ public void cleanupTriggers() {
}

@Override
public String massageDataExtractionSql(String sql, Channel channel) {
if (channel != null && !channel.isContainsBigLob()) {
public String massageDataExtractionSql(String sql, boolean isContainsBigLob) {
if (!isContainsBigLob) {
String[] sizes = parameterService.getString(ParameterConstants.FIREBIRD_EXTRACT_VARCHAR_ROW_OLD_PK_DATA, "20000,20000,1000").split(",");
sql = StringUtils.replace(sql, "d.row_data", "cast(d.row_data as varchar(" + sizes[0] + "))");
sql = StringUtils.replace(sql, "d.old_data", "cast(d.old_data as varchar(" + sizes[1] + "))");
@@ -30,7 +30,6 @@
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.db.SequenceIdentifier;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.service.IParameterService;
import org.springframework.jdbc.UncategorizedSQLException;
@@ -216,8 +215,8 @@ public void cleanupTriggers() {
}

@Override
public String massageDataExtractionSql(String sql, Channel channel) {
if (channel != null && !channel.isContainsBigLob()) {
public String massageDataExtractionSql(String sql, boolean isContainsBigLob) {
if (!isContainsBigLob) {
sql = StringUtils.replace(sql, "d.row_data", "cast(d.row_data as varchar(10000))");
sql = StringUtils.replace(sql, "d.old_data", "cast(d.old_data as varchar(10000))");
sql = StringUtils.replace(sql, "d.pk_data", "cast(d.pk_data as varchar(500))");
@@ -349,22 +349,22 @@ public boolean supportsTransactionViews() {
}

@Override
public String massageDataExtractionSql(String sql, Channel channel) {
if (channel != null && !channel.isContainsBigLob()) {
public String massageDataExtractionSql(String sql, boolean isContainsBigLob) {
if (!isContainsBigLob) {
sql = StringUtils.replace(sql, "d.row_data", "dbms_lob.substr(d.row_data, 4000, 1 )");
sql = StringUtils.replace(sql, "d.old_data", "dbms_lob.substr(d.old_data, 4000, 1 )");
sql = StringUtils.replace(sql, "d.pk_data", "dbms_lob.substr(d.pk_data, 4000, 1 )");
}
sql = super.massageDataExtractionSql(sql, channel);
sql = super.massageDataExtractionSql(sql, isContainsBigLob);
return sql;
}

@Override
public String massageForLob(String sql, Channel channel) {
if (channel != null && !channel.isContainsBigLob()) {
public String massageForLob(String sql, boolean isContainsBigLob) {
if (!isContainsBigLob) {
return String.format("dbms_lob.substr(%s, 4000, 1)", sql);
} else {
return super.massageForLob(sql, channel);
return super.massageForLob(sql, isContainsBigLob);
}
}

@@ -312,22 +312,22 @@ public boolean supportsTransactionViews() {
}

@Override
public String massageDataExtractionSql(String sql, Channel channel) {
if (channel != null && !channel.isContainsBigLob()) {
public String massageDataExtractionSql(String sql, boolean isContainsBigLob) {
if (!isContainsBigLob) {
sql = StringUtils.replace(sql, "d.row_data", "dbms_lob.substr(d.row_data, 4000, 1 )");
sql = StringUtils.replace(sql, "d.old_data", "dbms_lob.substr(d.old_data, 4000, 1 )");
sql = StringUtils.replace(sql, "d.pk_data", "dbms_lob.substr(d.pk_data, 4000, 1 )");
}
sql = super.massageDataExtractionSql(sql, channel);
sql = super.massageDataExtractionSql(sql, isContainsBigLob);
return sql;
}

@Override
public String massageForLob(String sql, Channel channel) {
if (channel != null && !channel.isContainsBigLob()) {
public String massageForLob(String sql, boolean isContainsBigLob) {
if (!isContainsBigLob) {
return String.format("dbms_lob.substr(%s, 4000, 1)", sql);
} else {
return super.massageForLob(sql, channel);
return super.massageForLob(sql, isContainsBigLob);
}
}

@@ -836,7 +836,7 @@ public boolean canGapsOccurInCapturedDataIds() {
return true;
}

public String massageDataExtractionSql(String sql, Channel channel) {
public String massageDataExtractionSql(String sql, boolean isContainsBigLob) {
String textColumnExpression = parameterService.getString(ParameterConstants.DATA_EXTRACTOR_TEXT_COLUMN_EXPRESSION);
if (isNotBlank(textColumnExpression)) {
sql = sql.replace("d.old_data", textColumnExpression.replace("$(columnName)", "d.old_data"));
@@ -854,7 +854,7 @@ public String getDriverVersion() {
return driverVersion;
}

public String massageForLob(String sql, Channel channel) {
public String massageForLob(String sql, boolean isContainsBigLob) {
return sql;
}

@@ -962,7 +962,7 @@ else if (column.getJdbcTypeName() != null
symmetricDialect.getMasterCollation(), formattedColumnText);

if (isLob) {
formattedColumnText = symmetricDialect.massageForLob(formattedColumnText, channel);
formattedColumnText = symmetricDialect.massageForLob(formattedColumnText, channel != null ? channel.isContainsBigLob() : true);
}

formattedColumnText = FormatUtils.replace("origTableAlias", origTableAlias,
@@ -223,9 +223,9 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc
*/
public boolean canGapsOccurInCapturedDataIds();

public String massageDataExtractionSql(String sql, Channel channel);
public String massageDataExtractionSql(String sql, boolean isContainsBigLob);

public String massageForLob(String sql, Channel channel);
public String massageForLob(String sql, boolean isContainsBigLob);

public boolean isInitialLoadTwoPassLob(Table table);

@@ -28,9 +28,9 @@
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.ProtocolException;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
@@ -225,7 +225,7 @@ protected void testColumnNamesMatchValues(DataMetaData dataMetaData, ISymmetricD
dataMetaData.getData().getTableName(),
additionalErrorMessage,
ArrayUtils.toString(columnNames), ArrayUtils.toString(values));
throw new SymmetricException(message);
throw new ProtocolException(message);
}
}

@@ -425,7 +425,7 @@ protected String getSql(String sqlName, Channel channel) {
select = select.replace("d.pk_data", "''");
}
return engine.getSymmetricDialect().massageDataExtractionSql(
select, channel);
select, channel.isContainsBigLob());
}

protected boolean fillPeekAheadQueue(List<Data> peekAheadQueue, int peekAheadCount,
@@ -179,6 +179,8 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId,
public ISqlReadCursor<Data> selectDataFor(Batch batch);

public ISqlReadCursor<Data> selectDataFor(Long batchId, String channelId);

public ISqlReadCursor<Data> selectDataFor(Long batchId, String targetNodeId, boolean isContainsBigLob);

public Map<String, Date> getLastDataCaptureByChannel();
}
@@ -791,6 +791,9 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ
+ currentBatch.getChannelId() + "' channel needs to be turned on.",
e.getCause() != null ? e.getCause() : e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
} else if (!(e instanceof RuntimeException)) {
throw new RuntimeException(e);
@@ -1024,7 +1027,19 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe
currentBatch.resetStats();

IDataReader dataReader = buildExtractDataReader(sourceNode, targetNode, currentBatch, extractInfo);
new DataProcessor(dataReader, writer, listener, "extract").process(ctx);
try {
new DataProcessor(dataReader, writer, listener, "extract").process(ctx);
} catch (ProtocolException e) {
if (!configurationService.getNodeChannel(currentBatch.getChannelId(), false).getChannel().isContainsBigLob()) {
log.warn(e.getMessage());
log.info("Re-attempting extraction with contains_big_lobs enabled for channel " + currentBatch.getChannelId());
currentBatch.resetStats();
dataReader = buildExtractDataReader(sourceNode, targetNode, currentBatch, extractInfo, true);
new DataProcessor(dataReader, writer, listener, "extract").process(ctx);
} else {
throw e;
}
}
extractTimeInMs = System.currentTimeMillis() - ts;
Statistics stats = getExtractStats(writer);
if (stats != null) {
@@ -1213,6 +1228,12 @@ protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetN
new SelectFromSymDataSource(currentBatch, sourceNode, targetNode, processInfo));
}

protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo,
boolean containsBigLob) {
return new ExtractDataReader(symmetricDialect.getPlatform(),
new SelectFromSymDataSource(currentBatch, sourceNode, targetNode, processInfo, containsBigLob));
}

protected Statistics getExtractStats(IDataWriter writer) {
Map<Batch, Statistics> statisticsMap = null;
if (writer instanceof TransformWriter) {
@@ -2156,9 +2177,11 @@ public Table lookup(String routerId, TriggerHistory triggerHistory, boolean setT
private ProcessInfo processInfo;

private ColumnsAccordingToTriggerHistory columnsAccordingToTriggerHistory;

private boolean containsBigLob;

public SelectFromSymDataSource(OutgoingBatch outgoingBatch,
Node sourceNode, Node targetNode, ProcessInfo processInfo) {
Node sourceNode, Node targetNode, ProcessInfo processInfo, boolean containsBigLob) {
this.processInfo = processInfo;
this.outgoingBatch = outgoingBatch;
this.batch = new Batch(BatchType.EXTRACT, outgoingBatch.getBatchId(),
@@ -2167,6 +2190,13 @@ public SelectFromSymDataSource(OutgoingBatch outgoingBatch,
this.targetNode = targetNode;
this.columnsAccordingToTriggerHistory = new ColumnsAccordingToTriggerHistory(sourceNode, targetNode);
this.outgoingBatch.resetExtractRowStats();
this.containsBigLob = containsBigLob;
}

public SelectFromSymDataSource(OutgoingBatch outgoingBatch,
Node sourceNode, Node targetNode, ProcessInfo processInfo) {
this(outgoingBatch, sourceNode, targetNode, processInfo,
configurationService.getNodeChannel(outgoingBatch.getChannelId(), false).getChannel().isContainsBigLob());
}

public Batch getBatch() {
@@ -2183,7 +2213,7 @@ public Table getTargetTable() {

public CsvData next() {
if (this.cursor == null) {
this.cursor = dataService.selectDataFor(batch);
this.cursor = dataService.selectDataFor(batch.getBatchId(), batch.getTargetNodeId(), containsBigLob);
}

Data data = null;
@@ -2294,6 +2324,20 @@ public CsvData next() {

outgoingBatch.incrementExtractRowCount();
outgoingBatch.incrementExtractRowCount(data.getDataEventType());

if (!data.getDataEventType().equals(DataEventType.DELETE)) {
int expectedCommaCount = triggerHistory.getParsedColumnNames().length - 1;
int commaCount = StringUtils.countMatches(data.getRowData(), ",");
if (commaCount < expectedCommaCount) {
String message = "The extracted row for table %s had %d columns but expected %d. ";
if (containsBigLob) {
message += "Corrupted row for data ID " + data.getDataId() + ": " + data.getRowData();
} else {
message += "If this happens often, it might be better to isolate the table with sym_channel.contains_big_lobs enabled.";
}
throw new ProtocolException(message, data.getTableName(), commaCount + 1, expectedCommaCount);
}
}

if (data.getDataEventType() == DataEventType.CREATE && StringUtils.isBlank(data.getCsvData(CsvData.ROW_DATA))) {

@@ -2304,9 +2304,14 @@ public Data mapData(Row row) {
}

public ISqlReadCursor<Data> selectDataFor(Batch batch) {
return selectDataFor(batch.getBatchId(), batch.getTargetNodeId(), engine.getConfigurationService()
.getNodeChannel(batch.getChannelId(), false).getChannel().isContainsBigLob());
}

public ISqlReadCursor<Data> selectDataFor(Long batchId, String targetNodeId, boolean isContainsBigLob) {
return sqlTemplateDirty.queryForCursor(
getDataSelectSql(batch.getBatchId(), -1l, batch.getChannelId()), dataMapper,
new Object[] { batch.getBatchId(), batch.getTargetNodeId() },
getDataSelectSql(batchId, -1l, isContainsBigLob),
dataMapper, new Object[] { batchId, targetNodeId },
new int[] { symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
}

@@ -2319,14 +2324,18 @@ protected String getDataSelectByBatchSql(long batchId, long startDataId, String
String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : "";
return symmetricDialect.massageDataExtractionSql(
getSql("selectEventDataByBatchIdSql", startAtDataIdSql, getDataOrderBy()),
engine.getConfigurationService().getNodeChannel(channelId, false).getChannel());
engine.getConfigurationService().getNodeChannel(channelId, false).getChannel().isContainsBigLob());
}

protected String getDataSelectSql(long batchId, long startDataId, String channelId) {
return getDataSelectSql(batchId, startDataId,
engine.getConfigurationService().getNodeChannel(channelId, false).getChannel().isContainsBigLob());
}

protected String getDataSelectSql(long batchId, long startDataId, boolean isContainsBigLob) {
String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : "";
return symmetricDialect.massageDataExtractionSql(
getSql("selectEventDataToExtractSql", startAtDataIdSql, getDataOrderBy()),
engine.getConfigurationService().getNodeChannel(channelId, false).getChannel());
getSql("selectEventDataToExtractSql", startAtDataIdSql, getDataOrderBy()), isContainsBigLob);
}

protected String getDataOrderBy() {

0 comments on commit 39c7f7b

Please sign in to comment.
You can’t perform that action at this time.