Skip to content

Commit

Permalink
0000987: Add the "current" (conflicting) data to a column in sym_inco…
Browse files Browse the repository at this point in the history
…ming_error
  • Loading branch information
chenson42 committed Feb 4, 2013
1 parent a52c5c9 commit baed5d1
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void exportTestDatabaseSQL() throws Exception {
String output = export.exportTables(tables).toLowerCase();

Assert.assertEquals(output, 36, StringUtils.countMatches(output, "create table \"sym_"));
Assert.assertEquals(34,
Assert.assertEquals(35,
StringUtils.countMatches(output, "varchar(" + Integer.MAX_VALUE + ")"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private ParameterConstants() {
public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates";
public final static String DATA_LOADER_ENABLED = "dataloader.enable";
public final static String DATA_LOADER_IGNORE_MISSING_TABLES = "dataloader.ignore.missing.tables";
public final static String DATA_LOADER_ERROR_RECORD_CUR_VAL = "dataloader.error.save.curval";
public final static String DATA_LOADER_NUM_OF_ACK_RETRIES = "num.of.ack.retries";
public final static String DATA_LOADER_TIME_BETWEEN_ACK_RETRIES = "time.between.ack.retries.ms";
public final static String DATA_LOADER_MAX_ROWS_BEFORE_COMMIT = "dataloader.max.rows.before.commit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

public class IncomingError implements Serializable {

private static final String CUR_DATA = "curData";

private static final long serialVersionUID = 1L;

private long batchId;
Expand Down Expand Up @@ -56,6 +58,8 @@ public class IncomingError implements Serializable {
private CsvData csvData = new CsvData();

private boolean resolveIgnore = false;

private String conflictId;

private Date createTime = new Date();

Expand Down Expand Up @@ -94,6 +98,14 @@ public String getOldData() {
public void setOldData(String oldData) {
csvData.putCsvData(CsvData.OLD_DATA, oldData);
}

public String getCurData() {
return csvData.getCsvData(CUR_DATA);
}

public void setCurData(String curData) {
csvData.putCsvData(CUR_DATA, curData);
}

public String getResolveData() {
return csvData.getCsvData(CsvData.RESOLVE_DATA);
Expand Down Expand Up @@ -250,5 +262,13 @@ public void setBinaryEncoding(BinaryEncoding binaryEncoding) {
public BinaryEncoding getBinaryEncoding() {
return binaryEncoding;
}

public void setConflictId(String conflictId) {
this.conflictId = conflictId;
}

public String getConflictId() {
return conflictId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.DmlStatement;
import org.jumpmind.db.sql.DmlStatement.DmlType;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.SqlException;
Expand All @@ -51,6 +55,7 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.DataProcessor;
Expand Down Expand Up @@ -108,6 +113,7 @@
import org.jumpmind.symmetric.transport.internal.InternalIncomingTransport;
import org.jumpmind.symmetric.web.WebConstants;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.CollectionUtils;

/**
* Responsible for writing batch data to the database
Expand Down Expand Up @@ -597,8 +603,10 @@ public void insertIncomingError(ISqlTransaction transaction, IncomingError incom
incomingError.getEventType().getCode(), incomingError.getBinaryEncoding()
.name(), incomingError.getColumnNames(), incomingError
.getPrimaryKeyColumnNames(), incomingError.getRowData(), incomingError
.getOldData(), incomingError.getResolveData(), incomingError
.getResolveData(), incomingError.getCreateTime(), incomingError
.getOldData(), incomingError.getCurData(),
incomingError.getResolveData(), incomingError
.getResolveData(), incomingError.getConflictId(),
incomingError.getCreateTime(), incomingError
.getLastUpdateBy(), incomingError.getLastUpdateTime());
}
}
Expand Down Expand Up @@ -657,8 +665,10 @@ public IncomingError mapRow(Row rs) {
incomingError.setPrimaryKeyColumnNames(rs.getString("pk_column_names"));
incomingError.setRowData(rs.getString("row_data"));
incomingError.setOldData(rs.getString("old_data"));
incomingError.setCurData(rs.getString("cur_data"));
incomingError.setResolveData(rs.getString("resolve_data"));
incomingError.setResolveIgnore(rs.getBoolean("resolve_ignore"));
incomingError.setConflictId(rs.getString("conflict_id"));
incomingError.setCreateTime(rs.getDateTime("create_time"));
incomingError.setLastUpdateBy(rs.getString("last_update_by"));
incomingError.setLastUpdateTime(rs.getDateTime("last_update_time"));
Expand Down Expand Up @@ -836,13 +846,21 @@ public void batchInError(DataContext context, Exception ex) {
error.setPrimaryKeyColumnNames(Table.getCommaDeliminatedColumns(context
.getTable().getPrimaryKeyColumns()));
error.setCsvData(context.getData());
error.setCurData(getCurData(error));
error.setBinaryEncoding(context.getBatch().getBinaryEncoding());
error.setEventType(context.getData().getDataEventType());
error.setFailedLineNumber(this.currentBatch.getFailedLineNumber());
error.setFailedRowNumber(this.currentBatch.getFailedRowNumber());
error.setTargetCatalogName(context.getTable().getCatalog());
error.setTargetSchemaName(context.getTable().getSchema());
error.setTargetTableName(context.getTable().getName());
if (ex instanceof ConflictException) {
ConflictException conflictEx = (ConflictException) ex;
Conflict conflict = conflictEx.getConflict();
if (conflict != null) {
error.setConflictId(conflict.getConflictId());
}
}
if (transaction != null) {
insertIncomingError(transaction, error);
} else {
Expand All @@ -868,6 +886,55 @@ public void batchInError(DataContext context, Exception ex) {
.getBatch().getSourceNodeBatchId(), e);
}
}

protected String getCurData(IncomingError error) {
String curVal = null;
if (parameterService.is(ParameterConstants.DATA_LOADER_ERROR_RECORD_CUR_VAL, false)) {
String[] keyNames = error.getParsedPrimaryKeyColumnNames();
String[] columnNames = error.getParsedColumnNames();

org.jumpmind.db.model.Table targetTable = platform.getTableFromCache(
error.getTargetCatalogName(), error.getTargetSchemaName(),
error.getTargetTableName(), false);

targetTable = targetTable.copyAndFilterColumns(columnNames, keyNames, true);

String[] data = error.getParsedOldData();
if (data == null) {
data = error.getParsedRowData();
}

Column[] columns = targetTable.getColumns();

Object[] objectValues = platform.getObjectValues(error.getBinaryEncoding(), data,
columns);

Map<String, Object> columnDataMap = CollectionUtils
.toMap(columnNames, objectValues);

Column[] pkColumns = targetTable.getPrimaryKeyColumns();
Object[] args = new Object[pkColumns.length];
for (int i = 0; i < pkColumns.length; i++) {
args[i] = columnDataMap.get(pkColumns[i].getName());
}

DmlStatement sqlStatement = platform
.createDmlStatement(DmlType.SELECT, targetTable);
ISqlTemplate sqlTemplate = platform.getSqlTemplate();

Row row = sqlTemplate.queryForRow(sqlStatement.getSql(), args);

if (row != null) {
String[] existData = platform.getStringValues(error.getBinaryEncoding(),
columns, row, false);
if (existData != null) {
curVal = CsvUtils.escapeCsvData(existData);
}
}
}
return curVal;

}

public List<IncomingBatch> getBatchesProcessed() {
return batchesProcessed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ public class DataLoaderServiceSqlMap extends AbstractSqlMap {

putSql("selectIncomingErrorSql",
"select batch_id, node_id, failed_row_number, failed_line_number, target_catalog_name, target_schema_name, " +
"target_table_name, event_type, binary_encoding, column_names, pk_column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"target_table_name, event_type, binary_encoding, column_names, pk_column_names, row_data, " +
"old_data, cur_data, resolve_data, resolve_ignore, conflict_id, " +
"create_time, last_update_by, last_update_time from $(incoming_error) where batch_id = ? and node_id = ?");

putSql("selectCurrentIncomingErrorSql",
"select e.batch_id, e.node_id, e.failed_row_number, e.failed_line_number, e.target_catalog_name, e.target_schema_name, " +
"e.target_table_name, e.event_type, e.binary_encoding, e.column_names, e.pk_column_names, e.row_data, e.old_data, e.resolve_data, e.resolve_ignore, " +
"e.target_table_name, e.event_type, e.binary_encoding, e.column_names, e.pk_column_names, e.row_data, " +
"e.old_data, e.cur_data, e.resolve_data, e.resolve_ignore, e.conflict_id, " +
"e.create_time, e.last_update_by, e.last_update_time " +
"from $(incoming_error) e inner join $(incoming_batch) b on b.batch_id = e.batch_id " +
"and b.node_id = e.node_id and b.failed_row_number = e.failed_row_number " +
Expand All @@ -54,8 +56,8 @@ public class DataLoaderServiceSqlMap extends AbstractSqlMap {
putSql("insertIncomingErrorSql",
"insert into $(incoming_error) " +
"(batch_id, node_id, failed_row_number, failed_line_number, target_catalog_name, target_schema_name, " +
"target_table_name, event_type, binary_encoding, column_names, pk_column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"create_time, last_update_by, last_update_time) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
"target_table_name, event_type, binary_encoding, column_names, pk_column_names, row_data, old_data, cur_data, resolve_data, resolve_ignore, conflict_id, " +
"create_time, last_update_by, last_update_time) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");

putSql("updateIncomingErrorSql",
"update $(incoming_error) set resolve_data = ?, resolve_ignore = ? " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,13 @@ dataloader.max.rows.before.commit=10000
# Tags: load
dataloader.sleep.time.after.early.commit=5

# Indicates that the current value of the row should be recorded in the incoming_error table
#
# DatabaseOverridable: true
# Tags: load
# Type: boolean
dataloader.error.save.curval=false

# The number of milliseconds parameters will be cached by the ParameterService before they are reread from the
# file system and database.
#
Expand Down
21 changes: 5 additions & 16 deletions symmetric-core/src/main/resources/symmetric-schema.xml
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,11 @@
<column name="column_names" type="LONGVARCHAR" required="true" description="The column names defined on the table. The column names are stored in comma-separated values (CSV) format." />
<column name="pk_column_names" type="LONGVARCHAR" required="true" description="The primary key column names defined on the table. The column names are stored in comma-separated values (CSV) format." />
<column name="row_data" type="LONGVARCHAR" description="The row data from the batch as captured from the source. The column values are stored in comma-separated values (CSV) format." />
<column name="old_data" type="LONGVARCHAR" description="The old row data prior to update from the batch as captured from the source. The column values are stored in CSV format." />
<column name="old_data" type="LONGVARCHAR" description="The old row data prior to update from the batch as captured from the source. The column values are stored in CSV format." />
<column name="cur_data" type="LONGVARCHAR" description="The current row data that caused the error to occur. The column values are stored in CSV format." />
<column name="resolve_data" type="LONGVARCHAR" description="The capture data change from the user that is used instead of row_data. This is useful when resolving a conflict manually by specifying the data that should load." />
<column name="resolve_ignore" type="BOOLEANINT" size="1" default="0" description="Indication from the user that the row_data should be ignored and the batch can continue loading with the next row." />
<column name="resolve_ignore" type="BOOLEANINT" size="1" default="0" description="Indication from the user that the row_data should be ignored and the batch can continue loading with the next row." />
<column name="conflict_id" type="VARCHAR" size="50" description="Unique identifier for the conflict detection setting that caused the error" />
<column name="create_time" type="TIMESTAMP" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" required="true" description="Timestamp when a user last updated this entry." />
Expand Down Expand Up @@ -617,18 +619,5 @@
<reference local="router_id" foreign="router_id" />
</foreign-key>
</table>

<!--
<table name="grouplet_defaults" description="This tables defines what grouplets a newly registered node should be assigned to.">
<column name="grouplet_id" type="VARCHAR" size="50" required="true" primaryKey="true" description="Unique identifier for the grouplet." />
<column name="node_group_id" type="VARCHAR" size="50" required="true" primaryKey="true" description="The node group that the registering node belongs to." />
<column name="create_time" type="TIMESTAMP" required="true" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" required="true" description="Timestamp when a user last updated this entry." />
<foreign-key foreignTable="grouplet" name="fk_gpltdft_2_gplt">
<reference local="grouplet_id" foreign="grouplet_id" />
</foreign-key>
</table>
-->


</database>
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ public class ConflictException extends RuntimeException {
protected Table table;

protected boolean fallbackOperationFailed = false;

protected Conflict conflict;

public ConflictException(CsvData data, Table table, boolean fallbackOperationFailed) {
public ConflictException(CsvData data, Table table, boolean fallbackOperationFailed, Conflict conflict) {
super(message(data, table, fallbackOperationFailed));
this.data = data;
this.table = table;
this.fallbackOperationFailed = fallbackOperationFailed;
this.conflict = conflict;
}

protected static String message(CsvData data, Table table, boolean fallbackOperationFailed) {
Expand All @@ -66,6 +69,10 @@ public CsvData getData() {
public Table getTable() {
return table;
}

public Conflict getConflict() {
return conflict;
}

public boolean isFallbackOperationFailed() {
return fallbackOperationFailed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void write(CsvData data) {
if (conflictResolver != null) {
conflictResolver.needsResolved(this, data, loadStatus);
} else {
throw new ConflictException(data, targetTable, false);
throw new ConflictException(data, targetTable, false, writerSettings.pickConflict(targetTable, batch));
}
} else {
uncommittedCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void needsResolved(DatabaseWriter writer, CsvData data, LoadStatus loadSt
}
}
} else {
throw new ConflictException(data, writer.getTargetTable(), false);
throw new ConflictException(data, writer.getTargetTable(), false, conflict);
}
break;

Expand Down Expand Up @@ -226,7 +226,7 @@ protected void attemptToResolve(ResolvedData resolvedData, CsvData data, Databas
}
}
} else {
throw new ConflictException(data, writer.getTargetTable(), false);
throw new ConflictException(data, writer.getTargetTable(), false, conflict);
}
}

Expand Down Expand Up @@ -302,7 +302,7 @@ protected void performFallbackToUpdate(DatabaseWriter writer, CsvData data, Conf
beforeResolutionAttempt(conflict);
LoadStatus loadStatus = writer.update(data, conflict.isResolveChangesOnly(), false);
if (loadStatus != LoadStatus.SUCCESS) {
throw new ConflictException(data, writer.getTargetTable(), true);
throw new ConflictException(data, writer.getTargetTable(), true, conflict);
} else {
writer.getStatistics().get(writer.getBatch())
.increment(DataWriterStatisticConstants.FALLBACKUPDATECOUNT);
Expand All @@ -317,7 +317,7 @@ protected void performFallbackToInsert(DatabaseWriter writer, CsvData csvData, C
beforeResolutionAttempt(conflict);
LoadStatus loadStatus = writer.insert(csvData);
if (loadStatus != LoadStatus.SUCCESS) {
throw new ConflictException(csvData, writer.getTargetTable(), true);
throw new ConflictException(csvData, writer.getTargetTable(), true, conflict);
} else {
writer.getStatistics().get(writer.getBatch())
.increment(DataWriterStatisticConstants.FALLBACKINSERTCOUNT);
Expand Down

0 comments on commit baed5d1

Please sign in to comment.