Skip to content

Commit

Permalink
add pk column names
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Mar 23, 2012
1 parent 90fd525 commit aadb5bc
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
Expand Up @@ -48,6 +48,8 @@ public class IncomingError implements Serializable {

private String columnNames;

private String primaryKeyColumnNames;

private CsvData csvData = new CsvData();

private boolean resolveIgnore = false;
Expand Down Expand Up @@ -212,4 +214,12 @@ public void setColumnNames(String columnNames) {
this.columnNames = columnNames;
}

public String getPrimaryKeyColumnNames() {
return primaryKeyColumnNames;
}

public void setPrimaryKeyColumnNames(String primaryKeyColumnNames) {
this.primaryKeyColumnNames = primaryKeyColumnNames;
}

}
Expand Up @@ -517,7 +517,8 @@ public void insertIncomingError(IncomingError incomingError) {
incomingError.getNodeId(), incomingError.getFailedRowNumber(),
incomingError.getFailedLineNumber(), incomingError.getTargetCatalogName(),
incomingError.getTargetSchemaName(), incomingError.getTargetTableName(),
incomingError.getEventType().getCode(), incomingError.getColumnNames(), incomingError.getRowData(),
incomingError.getEventType().getCode(), incomingError.getColumnNames(),
incomingError.getPrimaryKeyColumnNames(), incomingError.getRowData(),
incomingError.getOldData(), incomingError.getResolveData(),
incomingError.getResolveData(), incomingError.getCreateTime(),
incomingError.getLastUpdateBy(), incomingError.getLastUpdateTime());
Expand Down Expand Up @@ -573,6 +574,7 @@ public IncomingError mapRow(Row rs) {
incomingError.setTargetTableName(rs.getString("target_table_name"));
incomingError.setEventType(DataEventType.getEventType(rs.getString("event_type")));
incomingError.setColumnNames(rs.getString("column_names"));
incomingError.setPrimaryKeyColumnNames(rs.getString("pk_column_names"));
incomingError.setRowData(rs.getString("row_data"));
incomingError.setOldData(rs.getString("old_data"));
incomingError.setResolveData(rs.getString("resolve_data"));
Expand Down Expand Up @@ -736,6 +738,7 @@ public void batchInError(DataContext context, Exception ex) {
error.setBatchId(this.currentBatch.getBatchId());
error.setNodeId(this.currentBatch.getNodeId());
error.setColumnNames(Table.getCommaDeliminatedColumns(context.getTable().getColumns()));
error.setPrimaryKeyColumnNames(Table.getCommaDeliminatedColumns(context.getTable().getPrimaryKeyColumns()));
error.setCsvData(context.getData());
error.setEventType(context.getData().getDataEventType());
error.setFailedLineNumber(this.currentBatch.getFailedLineNumber());
Expand Down
Expand Up @@ -40,12 +40,12 @@ public class DataLoaderServiceSqlMap extends AbstractSqlMap {

putSql("selectIncomingErrorSql",
"select batch_id, node_id, failed_row_number, failed_line_number, target_catalog_name, target_schema_name, " +
"target_table_name, event_type, column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"target_table_name, event_type, column_names, pk_column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"create_time, last_update_by, last_update_time from $(incoming_error) where batch_id = ? and node_id = ?");

putSql("selectCurrentIncomingErrorSql",
"select e.batch_id, e.node_id, e.failed_row_number, e.failed_line_number, e.target_catalog_name, e.target_schema_name, " +
"e.target_table_name, e.event_type, e.column_names, e.row_data, e.old_data, e.resolve_data, e.resolve_ignore, " +
"e.target_table_name, e.event_type, e.column_names, e.pk_column_names, e.row_data, e.old_data, e.resolve_data, e.resolve_ignore, " +
"e.create_time, e.last_update_by, e.last_update_time " +
"from $(incoming_error) e inner join $(incoming_batch) b on b.batch_id = e.batch_id " +
"and b.node_id = e.node_id and b.failed_row_number = e.failed_row_number " +
Expand All @@ -54,8 +54,8 @@ public class DataLoaderServiceSqlMap extends AbstractSqlMap {
putSql("insertIncomingErrorSql",
"insert into $(incoming_error) " +
"(batch_id, node_id, failed_row_number, failed_line_number, target_catalog_name, target_schema_name, " +
"target_table_name, event_type, column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"create_time, last_update_by, last_update_time) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
"target_table_name, event_type, column_names, pk_column_names, row_data, old_data, resolve_data, resolve_ignore, " +
"create_time, last_update_by, last_update_time) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");

putSql("updateIncomingErrorSql",
"update $(incoming_error) set resolve_data = ?, resolve_ignore = ? " +
Expand Down
Expand Up @@ -457,6 +457,7 @@
<column name="target_table_name" type="VARCHAR" size="50" required="true" description="The table name for the table being loaded." />
<column name="event_type" type="CHAR" size="1" required="true" description="The type of event captured by this entry. For triggers, this is the change that occurred, which is 'I' for insert, 'U' for update, or 'D' for delete. Other events include: 'R' for reloading the entire table (or subset of the table) to the node; 'S' for running dynamic SQL at the node, which is used for adhoc administration." />
<column name="column_names" type="LONGVARCHAR" required="true" description="The column names defined on the table. The column names are stored in comma-separated values (CSV) format." />
<column name="pk_column_names" type="LONGVARCHAR" required="true" description="The primary key column names defined on the table. The column names are stored in comma-separated values (CSV) format." />
<column name="row_data" type="LONGVARCHAR" description="The row data from the batch as captured from the source. The column values are stored in comma-separated values (CSV) format." />
<column name="old_data" type="LONGVARCHAR" description="The old row data prior to update from the batch as captured from the source. The column values are stored in CSV format." />
<column name="resolve_data" type="LONGVARCHAR" description="The capture data change from the user that is used instead of row_data. This is useful when resolving a conflict manually by specifying the data that should load." />
Expand Down

0 comments on commit aadb5bc

Please sign in to comment.