Skip to content

Commit

Permalink
0004544: Batch acknowledgement lookup failed data ID only once
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Sep 12, 2020
1 parent 5b7ccc4 commit d06144c
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 27 deletions.
Expand Up @@ -152,6 +152,8 @@ public String toString() {
private long extractDeleteRowCount;

private long failedDataId;

private long failedLineNumber;

private transient Map<String, Integer> tableCounts = new LinkedHashMap<String, Integer>();

Expand Down Expand Up @@ -700,6 +702,14 @@ public void setFailedDataId(long failedDataId) {
this.failedDataId = failedDataId;
}

public void setFailedLineNumber(long failedLineNumber) {
this.failedLineNumber = failedLineNumber;
}

public long getFailedLineNumber() {
return failedLineNumber;
}

public long getProcessedRowCount() {
return processedRowCount;
}
Expand Down
Expand Up @@ -36,8 +36,6 @@ public class IncomingBatch extends AbstractBatch {

private long failedRowNumber;

private long failedLineNumber;

private long startTime;

private boolean retry;
Expand Down Expand Up @@ -71,7 +69,7 @@ public void setValues(Statistics readerStatistics, Statistics writerStatistics,
setLastUpdatedTime(new Date());
if (!isSuccess) {
failedRowNumber = getLoadRowCount();
failedLineNumber = writerStatistics.get(DataWriterStatisticConstants.LINENUMBER);
setFailedLineNumber(writerStatistics.get(DataWriterStatisticConstants.LINENUMBER));
}

setLoadInsertRowCount(writerStatistics.get(DataWriterStatisticConstants.INSERTCOUNT));
Expand Down Expand Up @@ -148,14 +146,6 @@ public boolean isPersistable() {
return getBatchId() >= 0;
}

public void setFailedLineNumber(long failedLineNumber) {
this.failedLineNumber = failedLineNumber;
}

public long getFailedLineNumber() {
return failedLineNumber;
}

@Override
public String toString() {
return "IncomingBatch " + getBatchId();
Expand Down
Expand Up @@ -116,7 +116,7 @@ public BatchAckResult ack(final BatchAck batch) {
if (!batch.isOk() && batch.getErrorLine() != 0) {
if (outgoingBatch.isLoadFlag()) {
isNewError = outgoingBatch.getSentCount() == 1;
} else {
} else if (batch.getErrorLine() != outgoingBatch.getFailedLineNumber()){
String sql = getSql("selectDataIdSql");
if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) {
sql = getSql("selectDataIdByCreateTimeSql");
Expand All @@ -132,6 +132,7 @@ public BatchAckResult ack(final BatchAck batch) {
}
outgoingBatch.setFailedDataId(failedDataId);
}
outgoingBatch.setFailedLineNumber(batch.getErrorLine());
}
}

Expand Down
Expand Up @@ -206,8 +206,8 @@ public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
outgoingBatch.getIgnoreCount(), outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(),
outgoingBatch.getFilterMillis(), outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(),
outgoingBatch.getExtractStartTime(), outgoingBatch.getTransferStartTime(), outgoingBatch.getLoadStartTime(),
outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(),
FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()), outgoingBatch.getFailedDataId(),
outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(), FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()),
outgoingBatch.getFailedDataId(), outgoingBatch.getFailedLineNumber(),
outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getSummary(), outgoingBatch.getLoadRowCount(),
outgoingBatch.getLoadInsertRowCount(), outgoingBatch.getLoadUpdateRowCount(), outgoingBatch.getLoadDeleteRowCount(),
outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getIgnoreRowCount(),
Expand All @@ -218,7 +218,7 @@ public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.TIMESTAMP, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC,
Types.TIMESTAMP, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC,
Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Expand All @@ -229,7 +229,7 @@ public void updateOutgoingBatches(ISqlTransaction transaction, List<OutgoingBatc
int[] types = new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.TIMESTAMP, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC,
Types.TIMESTAMP, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC,
Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Expand All @@ -249,8 +249,8 @@ public void updateOutgoingBatches(ISqlTransaction transaction, List<OutgoingBatc
outgoingBatch.getIgnoreCount(), outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(),
outgoingBatch.getFilterMillis(), outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(),
outgoingBatch.getExtractStartTime(), outgoingBatch.getTransferStartTime(), outgoingBatch.getLoadStartTime(),
outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(),
FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()), outgoingBatch.getFailedDataId(),
outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(), FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()),
outgoingBatch.getFailedDataId(), outgoingBatch.getFailedLineNumber(),
outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getSummary(), outgoingBatch.getLoadRowCount(),
outgoingBatch.getLoadInsertRowCount(), outgoingBatch.getLoadUpdateRowCount(), outgoingBatch.getLoadDeleteRowCount(),
outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getIgnoreRowCount(),
Expand Down Expand Up @@ -839,6 +839,7 @@ public OutgoingBatch mapRow(Row rs) {
batch.setSqlCode(rs.getInt("sql_code"));
batch.setSqlMessage(rs.getString("sql_message"));
batch.setFailedDataId(rs.getLong("failed_data_id"));
batch.setFailedLineNumber(rs.getLong("failed_line_number"));
batch.setLastUpdatedHostName(rs.getString("last_update_hostname"));
batch.setLastUpdatedTime(rs.getDateTime("last_update_time"));
batch.setCreateTime(rs.getDateTime("create_time"));
Expand All @@ -854,7 +855,6 @@ public OutgoingBatch mapRow(Row rs) {
batch.setIgnoreRowCount(rs.getLong("ignore_row_count"));
batch.setMissingDeleteCount(rs.getLong("missing_delete_count"));
batch.setSkipCount(rs.getLong("skip_count"));

}
return batch;
} else {
Expand Down
Expand Up @@ -57,7 +57,7 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
+ " ignore_count=?, router_millis=?, network_millis=?, filter_millis=?, "
+ " load_millis=?, extract_millis=?, extract_start_time=?, transfer_start_time=?, load_start_time=?, "
+ " sql_state=?, sql_code=?, sql_message=?, "
+ " failed_data_id=?, last_update_hostname=?, last_update_time=current_timestamp, summary=?, "
+ " failed_data_id=?, failed_line_number=?, last_update_hostname=?, last_update_time=current_timestamp, summary=?, "
+ " load_row_count=?, load_insert_row_count=?, load_update_row_count=?, load_delete_row_count=?, "
+ " fallback_insert_count=?, fallback_update_count=?, ignore_row_count=?, missing_delete_count=?, "
+ " skip_count=?, extract_row_count=?, extract_insert_row_count=?, extract_update_row_count=?, extract_delete_row_count=?, "
Expand Down Expand Up @@ -107,18 +107,18 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
"where node_id=? and channel_id=? and create_time >= ? and create_time <= ? ");

putSql("selectOutgoingBatchPrefixSql",
"select b.node_id, b.channel_id, b.status, "
+ " b.byte_count, b.extract_count, b.sent_count, b.load_count, b.data_row_count, "
+ " b.reload_row_count, b.data_insert_row_count, b.data_update_row_count, b.data_delete_row_count, b.other_row_count, "
"select b.node_id, b.channel_id, b.status, "
+ " b.byte_count, b.extract_count, b.sent_count, b.load_count, b.data_row_count, "
+ " b.reload_row_count, b.data_insert_row_count, b.data_update_row_count, b.data_delete_row_count, b.other_row_count, "
+ " b.ignore_count, b.router_millis, b.network_millis, b.filter_millis, b.load_millis, b.extract_millis, "
+ " b.extract_start_time, b.transfer_start_time, b.load_start_time, b.sql_state, b.sql_code, "
+ " b.extract_start_time, b.transfer_start_time, b.load_start_time, b.sql_state, b.sql_code, "
+ " b.sql_message, b.load_insert_row_count, b.load_update_row_count, b.load_delete_row_count, b.load_row_count, "
+ " b.extract_insert_row_count, b.extract_update_row_count, b.extract_delete_row_count, b.extract_row_count, "
+ " b.transform_extract_millis, b.transform_load_millis, b.fallback_insert_count, b.fallback_update_count, "
+ " b.ignore_row_count, b.missing_delete_count, b.skip_count, "
+ " b.failed_data_id, b.last_update_hostname, b.last_update_time, b.create_time, b.batch_id, b.extract_job_flag, "
+ " b.load_flag, b.error_flag, b.common_flag, b.load_id, b.create_by, b.summary from "
+ " $(outgoing_batch) b ");
+ " b.failed_data_id, b.failed_line_number, b.last_update_hostname, b.last_update_time, b.create_time, b.batch_id, "
+ " b.extract_job_flag, b.load_flag, b.error_flag, b.common_flag, b.load_id, b.create_by, b.summary from "
+ " $(outgoing_batch) b ");

putSql("selectOutgoingBatchErrorsSql", " where error_flag=1 order by batch_id ");

Expand Down

0 comments on commit d06144c

Please sign in to comment.