Skip to content

Commit

Permalink
0004302: Conflict resolution NEWER_TIME based on capture time of row
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Mar 6, 2020
1 parent f9917ae commit 6202884
Show file tree
Hide file tree
Showing 19 changed files with 295 additions and 147 deletions.
6 changes: 6 additions & 0 deletions symmetric-assemble/src/asciidoc/appendix/dataformat.ad
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ Identifies which channel a batch belongs to. The SymmetricDS data loader expects
batch, {batch_id}::
Uniquely identifies a batch. Used to track whether a batch has been loaded before. A batch of -9999 is considered a virtual batch and will be loaded, but will not be recorded in incoming_batch.

basetime, {unix_timestamp}::
Base create time (as a unix timestamp integer) of first row (insert, update, delete) in batch.

ts, {unix_timestamp}::
Offset create time (as a unix timestamp integer) for the current row (insert, update, delete). This is a small offset integer that is added to the base time.

schema, {schema name}::
The name of the schema that is being targeted.

Expand Down
62 changes: 28 additions & 34 deletions symmetric-assemble/src/asciidoc/configuration/conflicts.ad
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ then no conflict is detected during an update or a delete. If a row already exis
|USE_TIMESTAMP|Indicates that the primary key plus a timestamp column (as configured in detect_expression ) will indicate whether
a conflict has occurred. If the target timestamp column is not equal to the old source timestamp column, then a conflict has been detected.
If a row already exists during an insert then a conflict has been detected.

ifndef::pro[]
_You must specify the name of the column containing a timestamp to use in the detect_expression._
endif::pro[]

|USE_VERSION|Indicates that the primary key plus a version column (as configured in detect_expression ) will indicate whether a conflict
has occurred. If the target version column is not equal to the old source version column, then a conflict has been detected. If a row already
exists during an insert then a conflict has been detected.

ifndef::pro[]
_You must specify the name of the column containing a version number to use in the detect_expression._
endif::pro[]

|===

Expand All @@ -69,29 +71,28 @@ Resolution Type:: The choice of how to resolve a detected conflict is configured
[cols=".^2,8"]
|===

|FALLBACK|Indicates that when a conflict is detected the system should automatically apply the changes anyways. If the source
operation was an insert, then an update will be attempted. If the source operation was an update and the row does not exist, then
an insert will be attempted. If the source operation was a delete and the row does not exist, then the delete will be ignored. The
resolve_changes_only flag controls whether all columns will be updated or only columns that have changed will be updated during a fallback operation.
|NEWER_WINS|Indicates that when a conflict is detected that the either the source or the target will win
based on which side has the newer timestamp or higher version number. With USE_TIMESTAMP detection, the column specified
in detect_expression is used, otherwise the time of capture is used.

|IGNORE|Indicates that when a conflict is detected the system should automatically ignore the incoming change. The resolve_row_only
column controls whether the entire batch should be ignore or just the row in conflict.
|FALLBACK|Indicates that when a conflict is detected the system should automatically apply the changes anyway. If the source
operation was an insert, then an update will be attempted. If the source operation was an update and the row does not exist, then
an insert will be attempted. If the source operation was a delete and the row does not exist, then the delete will be ignored.

|MANUAL|Indicates that when a conflict is detected the batch will remain in error until manual intervention occurs. A row in error
is inserted into the INCOMING_ERROR table. The conflict detection id that detected the conflict is recorded (i.e., the conflict_id value from CONFLICT),
along with the old data, new data, and the "current data" (by current data, we mean the unexpected data at the target which doesn't match the old data as expected)
in columns old_data, new_data, and cur_data. In order to resolve, the resolve_data column can be manually filled out which will be used on
the next load attempt instead of the original source data. The resolve_ignore flag can also be used to indicate that the row should be ignored on the next load attempt.
|IGNORE|Indicates that when a conflict is detected the system should automatically ignore the incoming change. Use IGNORE between two node groups
in one direction, and FALLBACK in the other direction to establish which group wins a conflict.

|NEWER_WINS|Indicates that when a conflict is detected by USE_TIMESTAMP or USE_VERSION that the either the source or the target will win
based on the which side has the newer timestamp or higher version number. The resolve_row_only column controls whether the entire batch
should be ignore or just the row in conflict.
|MANUAL|Indicates that when a conflict is detected, the batch will remain in error until manual intervention occurs. A row in error
is inserted into the INCOMING_ERROR table, which includes the conflict ID, old data, new data, and current data at the target.
The user can specify the resolve data to use on the next load attempt.
The resolve_ignore flag can also be used to indicate that the row should be ignored.

|===

TIP: To make a primary node group always win a conflict, use a "fallback" resolution on group links where primary is the source
and an "ignore" resolution on group links where primary is the target.

ifndef::pro[]
Ping Back:: For each configured conflict, you also have the ability to control if and how much "resolved" data is sent back to the
node whose data change is in conflict. This "ping back" behavior is specified by the following options.

Expand All @@ -101,24 +102,28 @@ node whose data change is in conflict. This "ping back" behavior is specified by

|REMAINING_ROWS|The resolved data of the single row in the batch in conflict, along with the entire remainder of the batch, is sent back to the originating node.

|SINGLE_ROW|The resolved data of the single row in the batch that caused the conflict is sent back to the originating node.
|SINGLE_ROW|The resolved data of the single row in the batch that caused the conflict is sent back to the originating node. Recommended use with MANUAL resolution, so the resolved data is sent back to the originating node.

|OFF|No data is sent back to the originating node, even if the resolved data doesn't match the data the node sent.
|OFF|No data is sent back to the originating node, even if the resolved data doesn't match the data the node sent. Recommended use with resolution types that choose a winning row, including NEWER_WINS and when IGNORE and FALLBACK are used on opposing group links.

|===

ifdef::pro[]
.Advanced Options
endif::pro[]

Detection Expression:: An expression that provides additional information about the detection mechanism. If the detection
mechanism is use_timestamp or use_version then this expression will be the name of the timestamp or version column.
The detect_expression is also used to exclude certain column names from being used. For example, to exclude column1 and column2,
the expression is "excluded_column_names=column1,column2".

ifdef::pro[]
.Advanced Options
endif::pro[]

ifndef::pro[]
Resolve Changes Only:: Indicates that when applying changes during an update that only data that has changed should be applied.
Otherwise, all the columns will be updated. This really only applies to updates.
Resolve Row Only:: When 'resolve row only' is set to true, the system will ignore only the rows in conflict. When 'resolve row only' is set to false, the system will ignore the entire batch.
This applies to a resolve type of 'ignore'.
Resolve Row Only:: Ignore only the row in conflict when true, or ignore the entire batch when false. Used by IGNORE and NEWER_WINS resolvers. Recommended setting is true.
endif::pro[]

Channel:: Optional channel that this setting will be applied to.
Target Catalog:: Optional database catalog that the target table belongs to. Only use this if the target table is not in the default catalog.
Target Schema:: Optional database schema that the target table belongs to. Only use this if the target table is not in the default schema.
Expand All @@ -130,14 +135,3 @@ In addition, some databases do not allow comparisons of binary columns whether u
WARNING: Some platforms do not support comparisons of binary columns. Conflicts in binary column values will not be detected on the
following platforms: DB2, DERBY, ORACLE, and SQLSERVER.












Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void open() {
try {
transport = new HttpOutgoingTransport(new HttpTransportManager(), new URL(buildUrl()), 30000, true, 0, -1, null,
null, false, -1, false);
writer = new ProtocolDataWriter(nodeId, transport.openWriter(), false);
writer = new ProtocolDataWriter(nodeId, transport.openWriter(), false, false, false);
writer.start(batch);
} catch (Exception ex) {
throw new IoException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ private ParameterConstants() {

public final static String EXTRACT_CHECK_ROW_SIZE = "extract.check.row.size";
public final static String EXTRACT_ROW_MAX_LENGTH = "extract.row.max.length";
public final static String EXTRACT_ROW_CAPTURE_TIME = "extract.row.capture.time";

public final static String CREATE_TABLE_WITHOUT_DEFAULTS = "create.table.without.defaults";
public final static String CREATE_TABLE_WITHOUT_FOREIGN_KEYS = "create.table.without.foreign.keys";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,11 @@ public boolean requires13Compatiblity() {
}
return false;
}


public boolean allowCaptureTimeInProtocol() {
return isVersionGreaterThanOrEqualTo(3, 12);
}

public boolean isVersionGreaterThanOrEqualTo(int... targetVersion) {
if (symmetricVersion != null) {
if (symmetricVersion.equals("development")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer,
this.symmetricDialect.getPlatform(), source);

ProtocolDataWriter dataWriter = new ProtocolDataWriter(
nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity());
nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity(), false, false);
DataProcessor processor = new DataProcessor(dataReader, dataWriter,
"configuration extract");
DataContext ctx = new DataContext();
Expand Down Expand Up @@ -561,7 +561,8 @@ public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, Str
if (activeBatches.size() > 0) {
BufferedWriter writer = transport.openWriter();
IDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
writer, targetNode.requires13Compatiblity());
writer, targetNode.requires13Compatiblity(), targetNode.allowCaptureTimeInProtocol(),
parameterService.is(ParameterConstants.EXTRACT_ROW_CAPTURE_TIME));

return extract(extractInfo, targetNode, activeBatches, dataWriter, writer, ExtractMode.FOR_SYM_CLIENT);
}
Expand Down Expand Up @@ -649,7 +650,8 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ
OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId, nodeId);
if (batch != null) {
IDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
writer, targetNode.requires13Compatiblity());
writer, targetNode.requires13Compatiblity(), targetNode.allowCaptureTimeInProtocol(),
parameterService.is(ParameterConstants.EXTRACT_ROW_CAPTURE_TIME));
List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(1);
batches.add(batch);
batches = extract(new ProcessInfo(), targetNode, batches, dataWriter, null,
Expand Down Expand Up @@ -1294,7 +1296,8 @@ protected IDataWriter wrapWithTransformWriter(Node sourceNode, Node targetNode,
targetNode,
new ProcessInfoDataWriter(new StagingDataWriter(memoryThresholdInBytes, true, nodeService
.findIdentityNodeId(), Constants.STAGING_CATEGORY_OUTGOING,
stagingManager), processInfo));
stagingManager, targetNode.allowCaptureTimeInProtocol(),
parameterService.is(ParameterConstants.EXTRACT_ROW_CAPTURE_TIME)), processInfo));
} else {
transformExtractWriter = createTransformDataWriter(sourceNode, targetNode,
new ProcessInfoDataWriter(dataWriter, processInfo));
Expand Down Expand Up @@ -1731,7 +1734,7 @@ public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId
new DataProcessor(dataReader, createTransformDataWriter(
nodeService.findIdentity(), targetNode,
new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer,
targetNode.requires13Compatiblity())), "extract range").process(ctx);
targetNode.requires13Compatiblity(), false, false)), "extract range").process(ctx);
foundBatch = true;
}
}
Expand Down Expand Up @@ -1760,7 +1763,7 @@ public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTi
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity());
new DataProcessor(dataReader, createTransformDataWriter(nodeService.findIdentity(),
targetNode, new ProtocolDataWriter(nodeService.findIdentityNodeId(),
writer, targetNode.requires13Compatiblity())), "extract range").process(ctx);
writer, targetNode.requires13Compatiblity(), false, false)), "extract range").process(ctx);
foundBatch = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void open(DataContext context) {

protected IDataWriter buildWriter() {
return new StagingDataWriter(memoryThresholdInBytes, false, sourceNodeId, Constants.STAGING_CATEGORY_OUTGOING, stagingManager,
(IProtocolDataWriterListener[]) null);
false, false, (IProtocolDataWriterListener[]) null);
}

@Override
Expand Down
12 changes: 11 additions & 1 deletion symmetric-core/src/main/resources/symmetric-default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2826,4 +2826,14 @@ extract.check.row.size=false
# DatabaseOverridable: true
# Tags: extract
extract.row.max.length=1000000000


# Extract the capture time of each row and put it in the batch to be used by the
# conflict manager for picking the winning row during a conflict. Enable for the best precision
# in resolution, which includes a unix timestamp for each occurrence of insert, update, and delete.
# Disable to include a single unix timestamp to represent the entire batch, when accuracy is less
# important or conflict management isn't needed.
#
# DatabaseOverridable: true
# Tags: extract
# Type: boolean
extract.row.capture.time=true
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ private CsvConstants() {

public static final String STATS_COLUMNS = "stats_columns";

public static final String BASETIME = "basetime";

public static final String TIME = "ts";

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -74,6 +75,8 @@ public class ProtocolDataReader extends AbstractDataReader implements IDataReade
protected int lineNumber = 0;
protected String[] tokens;
protected boolean streamToFile = true;
protected long baseTime;
protected Timestamp createTime;

public ProtocolDataReader(BatchType batchType, String targetNodeId, StringBuilder input) {
this(batchType, targetNodeId, new BufferedReader(new StringReader(input.toString())));
Expand Down Expand Up @@ -172,6 +175,9 @@ public Object readNext() {
data.setNoBinaryOldData(noBinaryOldData);
data.setDataEventType(DataEventType.INSERT);
data.putParsedData(CsvData.ROW_DATA, CollectionUtils.copyOfRange(tokens, 1, tokens.length));
if (createTime != null) {
data.putAttribute(CsvData.ATTRIBUTE_CREATE_TIME, createTime);
}
tokens = null;
return data;
} else if (tokens[0].equals(CsvConstants.OLD)) {
Expand All @@ -193,6 +199,9 @@ public Object readNext() {
data.putParsedData(CsvData.ROW_DATA, CollectionUtils.copyOfRange(tokens, 1, columnCount + 1));
data.putParsedData(CsvData.PK_DATA, CollectionUtils.copyOfRange(tokens, columnCount + 1, tokens.length));
data.putParsedData(CsvData.OLD_DATA, parsedOldData);
if (createTime != null) {
data.putAttribute(CsvData.ATTRIBUTE_CREATE_TIME, createTime);
}
tokens = null;
return data;
} else if (tokens[0].equals(CsvConstants.DELETE)) {
Expand All @@ -201,9 +210,13 @@ public Object readNext() {
data.setDataEventType(DataEventType.DELETE);
data.putParsedData(CsvData.PK_DATA, CollectionUtils.copyOfRange(tokens, 1, tokens.length));
data.putParsedData(CsvData.OLD_DATA, parsedOldData);
if (createTime != null) {
data.putAttribute(CsvData.ATTRIBUTE_CREATE_TIME, createTime);
}
tokens = null;
return data;

} else if (tokens[0].equals(CsvConstants.TIME)) {
createTime = new Timestamp(Long.parseLong(tokens[1]) + baseTime);
} else if (tokens[0].equals(CsvConstants.BATCH) || tokens[0].equals(CsvConstants.RETRY)) {

Batch batch = new Batch(batchType, Long.parseLong(tokens[1]), channelId, binaryEncoding, sourceNodeId, targetNodeId,
Expand Down Expand Up @@ -298,6 +311,9 @@ public Object readNext() {
statsValues = CollectionUtils.copyOfRange(tokens, 1, tokens.length);
stats = stats != null ? stats : new DataReaderStatistics();
putStats(stats, statsColumns, statsValues);
} else if (tokens[0].equals(CsvConstants.BASETIME)) {
baseTime = Long.parseLong(tokens[1]);
createTime = new Timestamp(baseTime);
} else {
log.info("Unable to handle unknown csv values: " + Arrays.toString(tokens));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict.DetectConflict;
import org.jumpmind.symmetric.io.data.writer.Conflict.PingBack;
import org.jumpmind.symmetric.io.data.writer.Conflict.ResolveConflict;
import org.jumpmind.util.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -239,7 +239,8 @@ public void write(CsvData data) {
statistics.get(batch).increment(DataWriterStatisticConstants.IGNOREROWCOUNT);
} else if (conflictResolver != null && resolvedData != null) {
Conflict conflict = new Conflict();
conflict.setPingBack(PingBack.REMAINING_ROWS);
conflict.setDetectType(DetectConflict.USE_PK_DATA);
conflict.setResolveType(ResolveConflict.FALLBACK);
conflictResolver.attemptToResolve(resolvedData, data, this, conflict);
} else {
if (filterError(data, ex)) {
Expand Down
Loading

0 comments on commit 6202884

Please sign in to comment.