Skip to content

Commit

Permalink
0001040: When use_stream_lobs is on use_changed_data conflict resolut…
Browse files Browse the repository at this point in the history
…ion doesn't work because the lobs old data is always null
  • Loading branch information
chenson42 committed Feb 8, 2013
1 parent cb380e1 commit ffcdc99
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 37 deletions.
Expand Up @@ -34,6 +34,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
Expand Down Expand Up @@ -783,6 +784,9 @@ public CsvData next() {
routerId, triggerHistory, true);
this.requiresLobSelectedFromSource = trigger.isUseStreamLobs();
}

data.setNoBinaryOldData(requiresLobSelectedFromSource ||
symmetricDialect.getName().equals(DatabaseNamesConstants.MSSQL));
} else {
log.error(
"Could not locate a trigger with the id of {} for {}. It was recorded in the hist table with a hist id of {}",
Expand Down
Expand Up @@ -25,6 +25,8 @@ final public class CsvConstants {
private CsvConstants() {
}

public static final String NO_BINARY_OLD_DATA = "no_binary_old_data";

public static final String BINARY = "binary";

public static final String NODEID = "nodeid";
Expand Down
Expand Up @@ -42,15 +42,17 @@ public class CsvData {
public static final String ATTRIBUTE_TX_ID = "transactionId";
public static final String ATTRIBUTE_SOURCE_NODE_ID = "sourceNodeId";
public static final String ATTRIBUTE_EXTERNAL_DATA = "externalData";
public static final String ATTRIBUTE_ROUTER_ID = "routerId";
public static final String ATTRIBUTE_ROUTER_ID = "routerId";
public static final String ATTRIBUTE_DATA_ID = "dataId";
public static final String ATTRIBUTE_CREATE_TIME = "createTime";

private Map<String, String[]> parsedCsvData = null;

private Map<String, String> csvData = null;

private Map<String, Object> attributes;

private boolean noBinaryOldData = false;

protected DataEventType dataEventType;

Expand Down Expand Up @@ -236,4 +238,12 @@ public Map<String, String> toColumnNameValuePairs(String[] keyNames, String key)
public boolean requiresTable() {
return dataEventType != null && dataEventType != DataEventType.CREATE;
}

public boolean isNoBinaryOldData() {
return noBinaryOldData;
}

public void setNoBinaryOldData(boolean noBinaryOldData) {
this.noBinaryOldData = noBinaryOldData;
}
}
Expand Up @@ -70,6 +70,7 @@ public class ProtocolDataReader extends AbstractDataReader implements IDataReade
protected String sourceNodeId;
protected String targetNodeId;
protected BinaryEncoding binaryEncoding;
protected boolean noBinaryOldData = false;
protected BatchType batchType;
protected int lineNumber = 0;

Expand All @@ -81,12 +82,13 @@ public ProtocolDataReader(BatchType batchType, String targetNodeId, InputStream
this(batchType, targetNodeId, toReader(is));
}

public ProtocolDataReader(BatchType batchType, String targetNodeId, IStagedResource stagedResource) {
public ProtocolDataReader(BatchType batchType, String targetNodeId,
IStagedResource stagedResource) {
this.stagedResource = stagedResource;
this.targetNodeId = targetNodeId;
this.batchType = batchType;
}

public ProtocolDataReader(BatchType batchType, String targetNodeId, String input) {
this(batchType, targetNodeId, new BufferedReader(new StringReader(input)));
}
Expand Down Expand Up @@ -136,15 +138,48 @@ protected Object readNext() {
String[] tokens = csvReader.getValues();
bytesRead += logDebugAndCountBytes(tokens);
if (batch != null) {
statistics.get(batch).increment(DataReaderStatistics.READ_BYTE_COUNT,
bytesRead);
statistics.get(batch)
.increment(DataReaderStatistics.READ_BYTE_COUNT, bytesRead);
bytesRead = 0;
}
if (tokens[0].equals(CsvConstants.BATCH)) {
Batch batch = new Batch(batchType, Long.parseLong(tokens[1]), channelId, binaryEncoding,
sourceNodeId, targetNodeId, false);
if (tokens[0].equals(CsvConstants.INSERT)) {
CsvData data = new CsvData();
data.setNoBinaryOldData(noBinaryOldData);
data.setDataEventType(DataEventType.INSERT);
data.putParsedData(CsvData.ROW_DATA,
CollectionUtils.copyOfRange(tokens, 1, tokens.length));
return data;
} else if (tokens[0].equals(CsvConstants.OLD)) {
parsedOldData = CollectionUtils.copyOfRange(tokens, 1, tokens.length);
} else if (tokens[0].equals(CsvConstants.UPDATE)) {
CsvData data = new CsvData();
data.setNoBinaryOldData(noBinaryOldData);
data.setDataEventType(DataEventType.UPDATE);
// TODO check for invalid range and print results
data.putParsedData(CsvData.ROW_DATA,
CollectionUtils.copyOfRange(tokens, 1, table.getColumnCount() + 1));
data.putParsedData(CsvData.PK_DATA, CollectionUtils.copyOfRange(tokens,
table.getColumnCount() + 1, tokens.length));
data.putParsedData(CsvData.OLD_DATA, parsedOldData);
return data;
} else if (tokens[0].equals(CsvConstants.DELETE)) {
CsvData data = new CsvData();
data.setNoBinaryOldData(noBinaryOldData);
data.setDataEventType(DataEventType.DELETE);
data.putParsedData(CsvData.PK_DATA,
CollectionUtils.copyOfRange(tokens, 1, tokens.length));
data.putParsedData(CsvData.OLD_DATA, parsedOldData);
return data;

} else if (tokens[0].equals(CsvConstants.BATCH)) {
Batch batch = new Batch(batchType, Long.parseLong(tokens[1]), channelId,
binaryEncoding, sourceNodeId, targetNodeId, false);
statistics.put(batch, new DataReaderStatistics());
return batch;
} else if (tokens[0].equals(CsvConstants.NO_BINARY_OLD_DATA)) {
if (tokens.length > 1) {
noBinaryOldData = Boolean.parseBoolean(tokens[1]);
}
} else if (tokens[0].equals(CsvConstants.NODEID)) {
this.sourceNodeId = tokens[1];
} else if (tokens[0].equals(CsvConstants.BINARY)) {
Expand Down Expand Up @@ -187,43 +222,21 @@ protected Object readNext() {
batch.setComplete(true);
}
return null;
} else if (tokens[0].equals(CsvConstants.INSERT)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.INSERT);
data.putParsedData(CsvData.ROW_DATA,
CollectionUtils.copyOfRange(tokens, 1, tokens.length));
return data;
} else if (tokens[0].equals(CsvConstants.OLD)) {
parsedOldData = CollectionUtils.copyOfRange(tokens, 1, tokens.length);
} else if (tokens[0].equals(CsvConstants.UPDATE)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.UPDATE);
// TODO check for invalid range and print results
data.putParsedData(CsvData.ROW_DATA,
CollectionUtils.copyOfRange(tokens, 1, table.getColumnCount() + 1));
data.putParsedData(CsvData.PK_DATA, CollectionUtils.copyOfRange(tokens,
table.getColumnCount() + 1, tokens.length));
data.putParsedData(CsvData.OLD_DATA, parsedOldData);
return data;
} else if (tokens[0].equals(CsvConstants.DELETE)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.DELETE);
data.putParsedData(CsvData.PK_DATA,
CollectionUtils.copyOfRange(tokens, 1, tokens.length));
data.putParsedData(CsvData.OLD_DATA, parsedOldData);
return data;
} else if (tokens[0].equals(CsvConstants.SQL)) {
CsvData data = new CsvData();
data.setNoBinaryOldData(noBinaryOldData);
data.setDataEventType(DataEventType.SQL);
data.putParsedData(CsvData.ROW_DATA, new String[] { tokens[1] });
return data;
} else if (tokens[0].equals(CsvConstants.BSH)) {
CsvData data = new CsvData();
data.setNoBinaryOldData(noBinaryOldData);
data.setDataEventType(DataEventType.BSH);
data.putParsedData(CsvData.ROW_DATA, new String[] { tokens[1] });
return data;
} else if (tokens[0].equals(CsvConstants.CREATE)) {
CsvData data = new CsvData();
data.setNoBinaryOldData(noBinaryOldData);
data.setDataEventType(DataEventType.CREATE);
data.putParsedData(CsvData.ROW_DATA, new String[] { tokens[1] });
return data;
Expand Down Expand Up @@ -310,6 +323,6 @@ public void close() {

public Map<Batch, Statistics> getStatistics() {
return statistics;
}
}

}
Expand Up @@ -58,6 +58,8 @@ abstract public class AbstractProtocolDataWriter implements IDataWriter {
protected List<IProtocolDataWriterListener> listeners;

protected String sourceNodeId;

protected boolean noBinaryOldData = false;

public AbstractProtocolDataWriter(String sourceNodeId,
List<IProtocolDataWriterListener> listeners) {
Expand Down Expand Up @@ -124,6 +126,12 @@ public boolean start(Table table) {

public void write(CsvData data) {
if (!batch.isIgnored()) {

if (noBinaryOldData != data.isNoBinaryOldData()) {
noBinaryOldData = data.isNoBinaryOldData();
println(CsvConstants.NO_BINARY_OLD_DATA, Boolean.toString(noBinaryOldData));
}

statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
switch (data.getDataEventType()) {
Expand Down
Expand Up @@ -539,7 +539,8 @@ protected LoadStatus delete(CsvData data, boolean useConflictDetection) {
lookupKeys = targetTable.getColumnsAsList();
}

if (!platform.getDatabaseInfo().isBlobsWorkInWhereClause()) {
if (!platform.getDatabaseInfo().isBlobsWorkInWhereClause()
|| data.isNoBinaryOldData()) {
Iterator<Column> it = lookupKeys.iterator();
while (it.hasNext()) {
Column col = it.next();
Expand Down Expand Up @@ -667,7 +668,8 @@ protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useC
lookupKeys = targetTable.getColumnsAsList();
}

if (!platform.getDatabaseInfo().isBlobsWorkInWhereClause()) {
if (!platform.getDatabaseInfo().isBlobsWorkInWhereClause()
|| data.isNoBinaryOldData()) {
Iterator<Column> it = lookupKeys.iterator();
while (it.hasNext()) {
Column col = it.next();
Expand Down

0 comments on commit ffcdc99

Please sign in to comment.