Skip to content

Commit

Permalink
0003142: Sync Columns Between Incoming and Outgoing Batch
Browse files Browse the repository at this point in the history
Use the extract_row_count to seend the process info object with the
correct number of total rows that will be loaded.
  • Loading branch information
chenson42 committed Sep 13, 2017
1 parent b3cbcb3 commit 39d85e6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
Expand Up @@ -170,7 +170,9 @@ public void setBatchCount(long batchCount) {

public void incrementCurrentDataCount() {
this.currentDataCount++;
this.totalDataCount++;
if (totalDataCount < currentDataCount) {
totalDataCount = currentDataCount;
}
}

public void incrementBatchCount() {
Expand Down
Expand Up @@ -526,7 +526,7 @@ public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean for
*/
protected List<IncomingBatch> loadDataFromTransport(final ProcessInfo processInfo,
final Node sourceNode, IIncomingTransport transport, OutputStream out) throws IOException {
final ManageIncomingBatchListener listener = new ManageIncomingBatchListener();
final ManageIncomingBatchListener listener = new ManageIncomingBatchListener(processInfo);
final DataContext ctx = new DataContext();
Throwable error = null;
try {
Expand Down Expand Up @@ -1050,6 +1050,12 @@ class ManageIncomingBatchListener implements IDataProcessorListener {
protected List<IncomingBatch> batchesProcessed = new ArrayList<IncomingBatch>();

protected IncomingBatch currentBatch;

protected ProcessInfo processInfo;

public ManageIncomingBatchListener(ProcessInfo processInfo) {
this.processInfo = processInfo;
}

public void beforeBatchEnd(DataContext context) {
enableSyncTriggers(context);
Expand All @@ -1074,6 +1080,12 @@ public boolean beforeBatchStarted(DataContext context) {
}
IncomingBatch incomingBatch = new IncomingBatch(batch);
this.batchesProcessed.add(incomingBatch);

if (context.getStatistics() != null) {
incomingBatch.mergeInjectedBatchStatistics(context.getStatistics());
processInfo.setTotalDataCount(incomingBatch.getExtractRowCount());
}

if (incomingBatchService.acquireIncomingBatch(incomingBatch)) {
this.currentBatch = incomingBatch;
context.put("currentBatch", this.currentBatch);
Expand Down Expand Up @@ -1101,10 +1113,6 @@ public void batchSuccessful(DataContext context) {
statisticManager.incrementDataBytesLoaded(this.currentBatch.getChannelId(),
this.currentBatch.getByteCount());
Status oldStatus = this.currentBatch.getStatus();

if (context.getStatistics() != null) {
currentBatch.mergeInjectedBatchStatistics(context.getStatistics());
}

try {
this.currentBatch.setStatus(Status.OK);
Expand Down

0 comments on commit 39d85e6

Please sign in to comment.