Skip to content

Commit

Permalink
Added logging to conflict resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Mar 26, 2012
1 parent 3281ff2 commit effa0da
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 30 deletions.
Expand Up @@ -3,6 +3,7 @@
import java.sql.Timestamp;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.DmlStatement;
Expand All @@ -11,6 +12,7 @@
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.Conflict.DetectConflict;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriter.LoadStatus;
import org.jumpmind.util.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,15 +24,37 @@ public void needsResolved(DatabaseWriter writer, CsvData data, LoadStatus loadSt
DataEventType originalEventType = data.getDataEventType();
DatabaseWriterSettings writerSettings = writer.getWriterSettings();
Conflict conflict = writerSettings.pickConflict(writer.getTargetTable(), writer.getBatch());
long statementCount = writer.getStatistics().get(writer.getBatch())
Statistics statistics = writer.getStatistics().get(writer.getBatch());
long statementCount = statistics
.get(DataWriterStatisticConstants.STATEMENTCOUNT);
long lineNumber = statistics.get(DataWriterStatisticConstants.LINENUMBER);
ResolvedData resolvedData = writerSettings.getResolvedData(statementCount);

if (log.isDebugEnabled()) {
log.debug(
"Conflict detected: {} in batch {} at line {} for table {}",
new Object[] { conflict.getConflictId() == null ? "default" : conflict
.getConflictId(), writer.getBatch().getBatchId(), lineNumber, writer.getTargetTable().getFullyQualifiedTableName() });
String csvData = data.getCsvData(CsvData.ROW_DATA);
if (StringUtils.isNotBlank(csvData)) {
log.debug("Row data: {}", csvData);
}

csvData = data.getCsvData(CsvData.OLD_DATA);
if (StringUtils.isNotBlank(csvData)) {
log.debug("Old data: {}", csvData);
}

csvData = resolvedData != null ? resolvedData.getResolvedData() : null;
if (StringUtils.isNotBlank(csvData)) {
log.debug("Resolve data: {}", csvData);
}

}

switch (originalEventType) {
case INSERT:
switch (conflict.getResolveType()) {
case MANUAL:
attemptToResolve(resolvedData, data, writer, conflict);
break;
case FALLBACK:
performFallbackToUpdate(writer, data, conflict.isResolveChangesOnly());
break;
Expand All @@ -51,22 +75,18 @@ public void needsResolved(DatabaseWriter writer, CsvData data, LoadStatus loadSt
}
break;
case IGNORE:
ignore(writer, conflict);
break;
case MANUAL:
default:
if (conflict.isResolveRowOnly()) {
writer.getStatistics().get(writer.getBatch())
.increment(DataWriterStatisticConstants.IGNORECOUNT);
} else {
throw new IgnoreBatchException();
}
attemptToResolve(resolvedData, data, writer, conflict);
break;

}
break;

case UPDATE:
switch (conflict.getResolveType()) {
case MANUAL:
attemptToResolve(resolvedData, data, writer, conflict);
break;
case FALLBACK:
performFallbackToInsert(writer, data);
break;
Expand All @@ -83,20 +103,25 @@ public void needsResolved(DatabaseWriter writer, CsvData data, LoadStatus loadSt
}
break;
case IGNORE:
ignore(writer, conflict);
break;
case MANUAL:
default:
if (conflict.isResolveRowOnly()) {
writer.getStatistics().get(writer.getBatch())
.increment(DataWriterStatisticConstants.IGNORECOUNT);
} else {
throw new IgnoreBatchException();
}
attemptToResolve(resolvedData, data, writer, conflict);
break;

}
break;

case DELETE:
switch (conflict.getResolveType()) {
case IGNORE:
writer.getStatistics().get(writer.getBatch())
.increment(DataWriterStatisticConstants.MISSINGDELETECOUNT);
ignore(writer, conflict);
break;
case MANUAL:
default:
if (resolvedData != null) {
if (!resolvedData.isIgnoreRow()) {
writer.delete(data, false);
Expand All @@ -106,17 +131,8 @@ public void needsResolved(DatabaseWriter writer, CsvData data, LoadStatus loadSt
}
}
}
default:
case IGNORE:
if (conflict.isResolveRowOnly()) {
writer.getStatistics().get(writer.getBatch())
.increment(DataWriterStatisticConstants.IGNORECOUNT);
writer.getStatistics().get(writer.getBatch())
.increment(DataWriterStatisticConstants.MISSINGDELETECOUNT);
break;
} else {
throw new IgnoreBatchException();
}
break;

}
break;

Expand All @@ -125,6 +141,15 @@ public void needsResolved(DatabaseWriter writer, CsvData data, LoadStatus loadSt
}
}

protected void ignore(DatabaseWriter writer, Conflict conflict) {
if (conflict.isResolveRowOnly()) {
writer.getStatistics().get(writer.getBatch())
.increment(DataWriterStatisticConstants.IGNORECOUNT);
} else {
throw new IgnoreBatchException();
}
}

protected void attemptToResolve(ResolvedData resolvedData, CsvData data, DatabaseWriter writer,
Conflict conflict) {
if (resolvedData != null) {
Expand Down
Expand Up @@ -43,6 +43,7 @@ public void notExpectingError() {
@Test
public void testUpdateDetectTimestampNewerWins() {
Conflict setting = new Conflict();
setting.setConflictId("unit.test");
setting.setDetectType(DetectConflict.USE_TIMESTAMP);
setting.setDetectExpression("time_value");
setting.setResolveRowOnly(true);
Expand Down Expand Up @@ -76,6 +77,7 @@ public void testUpdateDetectTimestampNewerWins() {
@Test
public void testInsertDetectTimestampNewerWins() {
Conflict setting = new Conflict();
setting.setConflictId("unit.test");
setting.setDetectType(DetectConflict.USE_TIMESTAMP);
setting.setDetectExpression("time_value");
setting.setResolveRowOnly(true);
Expand Down Expand Up @@ -107,6 +109,7 @@ public void testInsertDetectTimestampNewerWins() {
@Test
public void testUpdateDetectVersionNewWins() {
Conflict setting = new Conflict();
setting.setConflictId("unit.test");
setting.setDetectType(DetectConflict.USE_VERSION);
setting.setDetectExpression("integer_value");
setting.setResolveRowOnly(true);
Expand Down Expand Up @@ -139,6 +142,7 @@ public void testUpdateDetectVersionNewWins() {
@Test
public void testUpdateDetectVersionIgnoreBatch() {
Conflict setting = new Conflict();
setting.setConflictId("unit.test");
setting.setDetectType(DetectConflict.USE_VERSION);
setting.setDetectExpression("integer_value");
setting.setResolveRowOnly(false);
Expand Down
4 changes: 4 additions & 0 deletions symmetric/symmetric-util/src/test/resources/log4j.xml
Expand Up @@ -13,6 +13,10 @@
<priority value="INFO" />
</category>

<category name="org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriterConflictResolver">
<priority value="DEBUG" />
</category>

<category name="org.jumpmind.symmetric.ClientSymmetricEngine$PropertiesFactoryBean">
<priority value="ERROR" />
</category>
Expand Down

0 comments on commit effa0da

Please sign in to comment.