Skip to content

Commit

Permalink
Fixed PutElasticsearchHttpRecord, same bug as JoltTransformRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Dec 4, 2019
1 parent 43f8f9d commit d807aee
Showing 1 changed file with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -576,17 +576,6 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
i++;
}
}

session.putAttribute(successFlowFile, "record.count", Integer.toString(recordCount - failures.size()));

// Normal behavior is to output with record.count. In order to not break backwards compatibility, set both here.
session.putAttribute(failedFlowFile, "record.count", Integer.toString(failures.size()));
session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));

session.transfer(successFlowFile, REL_SUCCESS);
session.transfer(failedFlowFile, REL_FAILURE);
session.remove(inputFlowFile);

} catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
// We failed while handling individual failures. Not much else we can do other than log, and route the whole thing to failure.
getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", new Object[] {flowFile, e});
Expand All @@ -597,7 +586,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (failedFlowFile != null) {
session.remove(failedFlowFile);
}
return;
}
session.putAttribute(successFlowFile, "record.count", Integer.toString(recordCount - failures.size()));

// Normal behavior is to output with record.count. In order to not break backwards compatibility, set both here.
session.putAttribute(failedFlowFile, "record.count", Integer.toString(failures.size()));
session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));
session.transfer(successFlowFile, REL_SUCCESS);
session.transfer(failedFlowFile, REL_FAILURE);
session.remove(inputFlowFile);
}
}

Expand Down

0 comments on commit d807aee

Please sign in to comment.