Skip to content

Commit

Permalink
0003256: Extract stats aren't updated on common outgoing batch rows for
Browse files Browse the repository at this point in the history
nodes that did not do the actual extract
  • Loading branch information
chenson42 committed Sep 22, 2017
1 parent 5e15b69 commit b00627a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 5 deletions.
Expand Up @@ -80,6 +80,8 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, N

public void updateOutgoingBatch(OutgoingBatch batch);

public void updateCommonBatchExtractStatistics(OutgoingBatch batch);

public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch);

public void updateOutgoingBatches(List<OutgoingBatch> batches);
Expand Down
Expand Up @@ -637,15 +637,16 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
Future<FutureOutgoingBatch> future = futures.get(i);
currentBatch = activeBatchIter.next();
transferInfo.setTotalDataCount(currentBatch.getExtractRowCount());
if (i == futures.size() - 1) {
extractInfo.setStatus(ProcessStatus.OK);
}
boolean isProcessed = false;
while (!isProcessed) {
try {
FutureOutgoingBatch extractBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS);
currentBatch = extractBatch.getOutgoingBatch();

if (i == futures.size() - 1) {
extractInfo.setStatus(ProcessStatus.OK);
}

if (extractBatch.isExtractSkipped) {
break;
}
Expand All @@ -663,7 +664,7 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node

if (currentBatch.getStatus() != Status.OK) {
currentBatch.setLoadCount(currentBatch.getLoadCount() + 1);
changeBatchStatus(Status.LD, currentBatch, mode);
changeBatchStatus(Status.LD, currentBatch, mode);
}

transferInfo.setCurrentTableName(currentBatch.getSummary());
Expand Down Expand Up @@ -937,6 +938,11 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe
statisticManager.incrementDataBytesExtracted(currentBatch.getChannelId(), byteCount);
statisticManager.incrementDataExtracted(currentBatch.getChannelId(),
stats.get(DataWriterStatisticConstants.ROWCOUNT));
currentBatch.setByteCount(byteCount);

if (currentBatch.isCommonFlag()) {
outgoingBatchService.updateCommonBatchExtractStatistics(currentBatch);
}
}

}
Expand Down
Expand Up @@ -203,6 +203,16 @@ public void updateOutgoingBatch(OutgoingBatch outgoingBatch) {
close(transaction);
}
}

public void updateCommonBatchExtractStatistics(OutgoingBatch outgoingBatch) {
sqlTemplate.update(getSql("updateCommonBatchExtractStatsSql"),
new Object[] { outgoingBatch.getByteCount(), outgoingBatch.getDataRowCount(), outgoingBatch.getDataInsertRowCount(), outgoingBatch.getDataUpdateRowCount(),
outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getOtherRowCount(), outgoingBatch.getExtractRowCount(), outgoingBatch.getExtractInsertRowCount(),
outgoingBatch.getExtractUpdateRowCount(), outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getBatchId(),
outgoingBatch.getNodeId() },
new int[] { Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
}

public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch) {
outgoingBatch.setLastUpdatedTime(new Date());
Expand Down Expand Up @@ -231,7 +241,6 @@ public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC,

symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
}

Expand Down
Expand Up @@ -66,6 +66,12 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
+ " fallback_insert_count=?, fallback_update_count=?, ignore_row_count=?, missing_delete_count=?, "
+ " skip_count=?, extract_row_count=?, extract_insert_row_count=?, extract_update_row_count=?, extract_delete_row_count=? "
+ " where batch_id=? and node_id=? ");

putSql("updateCommonBatchExtractStatsSql",
"update $(outgoing_batch) set byte_count=?, data_row_count=?, "
+ " data_insert_row_count=?, data_update_row_count=?, data_delete_row_count=?, other_row_count=?, "
+ " extract_row_count=?, extract_insert_row_count=?, extract_update_row_count=?, extract_delete_row_count=? "
+ " where batch_id=? and node_id != ? ");

putSql("findOutgoingBatchSql", "where batch_id=? and node_id=? ");

Expand Down

0 comments on commit b00627a

Please sign in to comment.