Skip to content

Commit

Permalink
0003809: Data extract and routing fallback to contains_big_lob
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Nov 29, 2018
1 parent 39c7f7b commit b25fc1a
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.ProtocolException;
Expand Down Expand Up @@ -99,7 +97,7 @@ protected Map<String, String> getDataAsString(String prefix, DataMetaData dataMe
String[] columns = dataMetaData.getTriggerHistory().getParsedColumnNames();
Map<String, String> map = new LinkedCaseInsensitiveMap<String>(columns.length * 2);
if (rowData != null) {
testColumnNamesMatchValues(dataMetaData, symmetricDialect, columns, rowData);
testColumnNamesMatchValues(dataMetaData, columns, rowData);
for (int i = 0; i < columns.length; i++) {
String columnName = columns[i];
columnName = prefix != null ? prefix + columnName : columnName;
Expand All @@ -115,7 +113,7 @@ protected Map<String, String> getPkDataAsString(DataMetaData dataMetaData, ISymm
String[] rowData = dataMetaData.getData().toParsedPkData();
Map<String, String> map = new LinkedCaseInsensitiveMap<String>(columns.length);
if (rowData != null) {
testColumnNamesMatchValues(dataMetaData, symmetricDialect, columns, rowData);
testColumnNamesMatchValues(dataMetaData, columns, rowData);
for (int i = 0; i < columns.length; i++) {
String columnName = columns[i].toUpperCase();
map.put(columnName, rowData[i]);
Expand Down Expand Up @@ -198,7 +196,7 @@ protected Map<String, Object> getDataAsObject(String prefix, DataMetaData dataMe
Object[] objects = symmetricDialect.getPlatform().getObjectValues(
symmetricDialect.getBinaryEncoding(), dataMetaData.getTable(), columnNames,
rowData);
testColumnNamesMatchValues(dataMetaData, symmetricDialect, columnNames, objects);
testColumnNamesMatchValues(dataMetaData, columnNames, objects);
for (int i = 0; i < columnNames.length; i++) {
String colName = upperCase ? columnNames[i].toUpperCase() : columnNames[i];
data.put(prefix != null ? (prefix + colName) : colName, objects[i]);
Expand All @@ -209,22 +207,11 @@ protected Map<String, Object> getDataAsObject(String prefix, DataMetaData dataMe
}
}

protected void testColumnNamesMatchValues(DataMetaData dataMetaData, ISymmetricDialect symmetricDialect, String[] columnNames, Object[] values) {
protected void testColumnNamesMatchValues(DataMetaData dataMetaData, String[] columnNames, Object[] values) {
if (columnNames.length != values.length) {
String additionalErrorMessage = "";
if (symmetricDialect != null &&
symmetricDialect.getPlatform().getName().equals(DatabaseNamesConstants.ORACLE)) {
boolean isContainsBigLobs = dataMetaData.getNodeChannel().isContainsBigLob();
additionalErrorMessage += String.format("\nOne possible cause of this issue is when channel.contains_big_lobs=0 and the captured row_data size exceeds 4k, captured data will be truncated at 4k. channel.contains_big_lobs is currently set to %s.", isContainsBigLobs ? "1" : "0");
}
String message = String.format(
"The number of recorded column names (%d) did not match the number of captured data values (%d). The data_id %d failed for an %s on %s. %s\ncolumn_names:\n%s\nvalues:\n%s",
columnNames.length, values.length,
dataMetaData.getData().getDataId(),
dataMetaData.getData().getDataEventType().name(),
dataMetaData.getData().getTableName(),
additionalErrorMessage,
ArrayUtils.toString(columnNames), ArrayUtils.toString(values));
"The router row for table %s had %d columns but expected %d.",
dataMetaData.getData().getTableName(), values.length, columnNames.length);
throw new ProtocolException(message);
}
}
Expand All @@ -238,7 +225,7 @@ protected Map<String, Object> getPkDataAsObject(DataMetaData dataMetaData,
Object[] objects = symmetricDialect.getPlatform().getObjectValues(
symmetricDialect.getBinaryEncoding(), dataMetaData.getTable(), columnNames,
rowData);
testColumnNamesMatchValues(dataMetaData, symmetricDialect, columnNames, objects);
testColumnNamesMatchValues(dataMetaData, columnNames, objects);
for (int i = 0; i < columnNames.length; i++) {
data.put(columnNames[i].toUpperCase(), objects[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class ChannelRouterContext extends SimpleRouterContext {
private List<DataEvent> dataEventsToSend = new ArrayList<DataEvent>();
private boolean produceCommonBatches = false;
private boolean onlyDefaultRoutersAssigned = false;
private boolean overrideContainsBigLob = false;
private long lastLoadId = -1;
private long startDataId;
private long endDataId;
Expand Down Expand Up @@ -286,5 +287,13 @@ public boolean isOnlyDefaultRoutersAssigned() {
public List<Long> getDataIds() {
return dataIds;
}

public boolean isOverrideContainsBigLob() {
return overrideContainsBigLob;
}

public void setOverrideContainsBigLob(boolean overrideContainsBigLob) {
this.overrideContainsBigLob = overrideContainsBigLob;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,14 @@ protected ISqlReadCursor<Data> prepareCursor() {
}

if (useGreaterThanDataId) {
sql = getSql("selectDataUsingStartDataId", context.getChannel().getChannel());
sql = getSql("selectDataUsingStartDataId");
if (!lastSelectUsedGreaterThanQuery) {
log.info("Switching to select from the data table where data_id >= start gap because there were {} gaps found "
+ "which was more than the configured threshold of {}", dataGaps.size(), maxGapsBeforeGreaterThanQuery);
lastSelectUsedGreaterThanQueryByEngineName.put(parameterService.getEngineName(), Boolean.TRUE);
}
} else {
sql = qualifyUsingDataGaps(dataGaps, numberOfGapsToQualify,
getSql("selectDataUsingGapsSql", context.getChannel().getChannel()));
sql = qualifyUsingDataGaps(dataGaps, numberOfGapsToQualify, getSql("selectDataUsingGapsSql"));
if (lastSelectUsedGreaterThanQuery) {
log.info("Switching to select from the data table where data_id between gaps");
lastSelectUsedGreaterThanQueryByEngineName.put(parameterService.getEngineName(), Boolean.FALSE);
Expand Down Expand Up @@ -413,8 +412,9 @@ protected String qualifyUsingDataGaps(List<DataGap> dataGaps, int numberOfGapsTo
return FormatUtils.replace("dataRange", gapClause.toString(), sql);
}

protected String getSql(String sqlName, Channel channel) {
protected String getSql(String sqlName) {
String select = engine.getRouterService().getSql(sqlName);
Channel channel = context.getChannel().getChannel();
if (!channel.isUseOldDataToRoute() || context.isOnlyDefaultRoutersAssigned()) {
select = select.replace("d.old_data", "''");
}
Expand All @@ -425,7 +425,7 @@ protected String getSql(String sqlName, Channel channel) {
select = select.replace("d.pk_data", "''");
}
return engine.getSymmetricDialect().massageDataExtractionSql(
select, channel.isContainsBigLob());
select, context.isOverrideContainsBigLob() || channel.isContainsBigLob());
}

protected boolean fillPeekAheadQueue(List<Data> peekAheadQueue, int peekAheadCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2326,16 +2326,16 @@ public CsvData next() {
outgoingBatch.incrementExtractRowCount(data.getDataEventType());

if (!data.getDataEventType().equals(DataEventType.DELETE)) {
int expectedCommaCount = triggerHistory.getParsedColumnNames().length - 1;
int commaCount = StringUtils.countMatches(data.getRowData(), ",");
int expectedCommaCount = triggerHistory.getParsedColumnNames().length;
int commaCount = StringUtils.countMatches(data.getRowData(), ",") + 1;
if (commaCount < expectedCommaCount) {
String message = "The extracted row for table %s had %d columns but expected %d. ";
if (containsBigLob) {
message += "Corrupted row for data ID " + data.getDataId() + ": " + data.getRowData();
} else {
message += "If this happens often, it might be better to isolate the table with sym_channel.contains_big_lobs enabled.";
}
throw new ProtocolException(message, data.getTableName(), commaCount + 1, expectedCommaCount);
throw new ProtocolException(message, data.getTableName(), commaCount, expectedCommaCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.ProtocolException;
import org.jumpmind.symmetric.load.DefaultReloadGenerator;
import org.jumpmind.symmetric.load.IReloadGenerator;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
Expand Down Expand Up @@ -501,7 +502,13 @@ protected int routeDataForEachChannel() {
engine.getClusterService().refreshLock(ClusterConstants.ROUTE);
if (nodeChannel.isEnabled() && (readyChannels == null || readyChannels.contains(nodeChannel.getChannelId()))) {
processInfo.setCurrentChannelId(nodeChannel.getChannelId());
dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode);
int count = routeDataForChannel(processInfo, nodeChannel, sourceNode, false);
if (count >= 0) {
dataCount += count;
} else {
log.info("Re-attempting routing with contains_big_lobs enabled for channel " + nodeChannel.getChannelId());
dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode, true);
}
} else if (!nodeChannel.isEnabled()) {
gapDetector.setIsAllDataRead(false);
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -691,7 +698,7 @@ protected boolean onlyDefaultRoutersAssigned(Channel channel, String nodeGroupId
return onlyDefaultRoutersAssigned;
}

protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode) {
protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode, boolean isOverrideContainsBigLob) {
ChannelRouterContext context = null;
long ts = System.currentTimeMillis();
int dataCount = -1;
Expand All @@ -707,6 +714,7 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
context.setProduceCommonBatches(producesCommonBatches);
context.setOnlyDefaultRoutersAssigned(onlyDefaultRoutersAssigned);
context.setDataGaps(gapDetector.getDataGaps());
context.setOverrideContainsBigLob(isOverrideContainsBigLob);

dataCount = selectDataAndRoute(processInfo, context);
return dataCount;
Expand All @@ -731,6 +739,8 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
context.rollback();
}
return 0;
} catch (ProtocolException ex) {
return -1;
} catch (Throwable ex) {
log.error(
String.format("Failed to route and batch data on '%s' channel",
Expand Down Expand Up @@ -1024,6 +1034,12 @@ protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext
} catch (DelayRoutingException ex) {
throw ex;
} catch (RuntimeException ex) {
if (ex instanceof ProtocolException && !context.getChannel().getChannel().isContainsBigLob()
&& !context.isOverrideContainsBigLob()) {
log.warn(ex.getMessage() + " If this happens often, it might be better to isolate the table with sym_channel.contains_big_lobs enabled.");
throw ex;
}

StringBuilder failureMessage = new StringBuilder(
"Failed to route data: ");
failureMessage.append(data.getDataId());
Expand Down

0 comments on commit b25fc1a

Please sign in to comment.