Skip to content

Commit

Permalink
0003142: Sync Columns Between Incoming and Outgoing Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
maxwellpettit committed Jun 30, 2017
1 parent bb6ea02 commit a76dac8
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 13 deletions.
Expand Up @@ -70,6 +70,10 @@ public void setValues(Statistics readerStatistics, Statistics writerStatistics,
failedRowNumber = getLoadRowCount();
failedLineNumber = writerStatistics.get(DataWriterStatisticConstants.LINENUMBER);
}

setLoadInsertRowCount(writerStatistics.get(DataWriterStatisticConstants.INSERTCOUNT));
setLoadUpdateRowCount(writerStatistics.get(DataWriterStatisticConstants.UPDATECOUNT));
setLoadDeleteRowCount(writerStatistics.get(DataWriterStatisticConstants.DELETECOUNT));
}
}

Expand Down
Expand Up @@ -943,7 +943,9 @@ public LoadIntoDatabaseOnArrivalListener(ProcessInfo processInfo, String sourceN
public void start(DataContext ctx, Batch batch) {
batchStartsToArriveTimeInMs = System.currentTimeMillis();
processInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING);
processInfo.setDataCount(ctx.getStatistics().get(DataReaderStatistics.DATA_ROW_COUNT));
if (processInfo != null && ctx != null && ctx.getStatistics() != null) {
processInfo.setDataCount(ctx.getStatistics().get(DataReaderStatistics.DATA_ROW_COUNT));
}
}

public void end(final DataContext ctx, final Batch batch, final IStagedResource resource) {
Expand Down
Expand Up @@ -256,14 +256,15 @@ public void insertIncomingBatch(ISqlTransaction transaction, IncomingBatch batch
batch.getTransformLoadMillis(), batch.getReloadRowCount(), batch.getOtherRowCount(), batch.getDataRowCount(),
batch.getDataInsertRowCount(), batch.getDataUpdateRowCount(), batch.getDataDeleteRowCount(),
batch.getExtractRowCount(), batch.getExtractInsertRowCount(), batch.getExtractUpdateRowCount(),
batch.getLoadInsertRowCount(), batch.getLoadUpdateRowCount(), batch.getLoadDeleteRowCount(),
batch.getExtractDeleteRowCount(), batch.getFailedDataId() },
new int[] { Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC });
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC });
}
}
}
Expand Down Expand Up @@ -336,15 +337,16 @@ public int updateIncomingBatch(ISqlTransaction transaction, IncomingBatch batch)
batch.getTransformExtractMillis(), batch.getTransformLoadMillis(), batch.getReloadRowCount(),
batch.getOtherRowCount(), batch.getDataRowCount(), batch.getDataInsertRowCount(), batch.getDataUpdateRowCount(),
batch.getDataDeleteRowCount(), batch.getExtractRowCount(), batch.getExtractInsertRowCount(),
batch.getExtractUpdateRowCount(), batch.getExtractDeleteRowCount(), batch.getFailedDataId(), batch.getBatchId(),
batch.getExtractUpdateRowCount(), batch.getExtractDeleteRowCount(), batch.getLoadInsertRowCount(),
batch.getLoadUpdateRowCount(), batch.getLoadDeleteRowCount(), batch.getFailedDataId(), batch.getBatchId(),
batch.getNodeId() },
new int[] { Types.CHAR, Types.SMALLINT, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
}
return count;
}
Expand Down
Expand Up @@ -71,17 +71,19 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String
" load_row_count, fallback_insert_count, fallback_update_count, ignore_count, ignore_row_count, missing_delete_count, skip_count, sql_state, sql_code, sql_message, " +
" last_update_hostname, last_update_time, summary, create_time, load_flag, extract_count, sent_count, load_count, load_id, common_flag, router_millis, extract_millis, " +
" transform_extract_millis, transform_load_millis, reload_row_count, other_row_count, data_row_count, data_insert_row_count, data_update_row_count, " +
" data_delete_row_count, extract_row_count, extract_insert_row_count, extract_update_row_count, extract_delete_row_count, failed_data_id) " +
" data_delete_row_count, extract_row_count, extract_insert_row_count, extract_update_row_count, extract_delete_row_count, load_insert_row_count, " +
" load_update_row_count, load_delete_row_count, failed_data_id) " +
" values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, ?, current_timestamp, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " +
" ?, ?, ?)");
" ?, ?, ?, ?, ?, ?)");

putSql("updateIncomingBatchSql" ,"" +
"update $(incoming_batch) set status = ?, error_flag=?, network_millis = ?, filter_millis = ?, load_millis = ?, failed_row_number = ?, failed_line_number = ?, byte_count = ?, " +
" load_row_count = ?, fallback_insert_count = ?, fallback_update_count = ?, ignore_count = ?, ignore_row_count = ?, missing_delete_count = ?, skip_count = ?, sql_state = ?, sql_code = ?, sql_message = ?, " +
" last_update_hostname = ?, last_update_time = current_timestamp, summary = ?, load_flag = ?, extract_count = ?, sent_count = ?, load_count = ?, load_id = ?, common_flag = ?, " +
" router_millis = ?, extract_millis = ?, transform_extract_millis = ?, transform_load_millis = ?, reload_row_count = ?, other_row_count = ?, data_row_count = ?, data_insert_row_count = ?, " +
" data_update_row_count = ?, data_delete_row_count = ?, extract_row_count = ?, extract_insert_row_count = ?, extract_update_row_count = ?, extract_delete_row_count = ?, failed_data_id = ? " +
" where batch_id = ? and node_id = ? " );
"update $(incoming_batch) set status = ?, error_flag=?, network_millis = ?, filter_millis = ?, load_millis = ?, failed_row_number = ?, failed_line_number = ?, byte_count = ?, " +
" load_row_count = ?, fallback_insert_count = ?, fallback_update_count = ?, ignore_count = ?, ignore_row_count = ?, missing_delete_count = ?, skip_count = ?, sql_state = ?, " +
" sql_code = ?, sql_message = ?, last_update_hostname = ?, last_update_time = current_timestamp, summary = ?, load_flag = ?, extract_count = ?, sent_count = ?, " +
" load_count = ?, load_id = ?, common_flag = ?, router_millis = ?, extract_millis = ?, transform_extract_millis = ?, transform_load_millis = ?, reload_row_count = ?, " +
" other_row_count = ?, data_row_count = ?, data_insert_row_count = ?, data_update_row_count = ?, data_delete_row_count = ?, extract_row_count = ?, " +
" extract_insert_row_count = ?, extract_update_row_count = ?, extract_delete_row_count = ?, load_insert_row_count = ?, load_update_row_count = ?, load_delete_row_count = ?, " +
" failed_data_id = ? where batch_id = ? and node_id = ? " );

putSql("deleteIncomingBatchSql" ,"" +
"delete from $(incoming_batch) where batch_id = ? and node_id = ? " );
Expand Down

0 comments on commit a76dac8

Please sign in to comment.