NIFI-4651 Add error message and code to PutSQL FlowFile attributes#7007
NIFI-4651 Add error message and code to PutSQL FlowFile attributes#7007krisztina-zsihovszki wants to merge 2 commits intoapache:mainfrom
Conversation
|
recheck |
fbbe9e8 to
f277264
Compare
|
Thanks for the contribution @krisztina-zsihovszki. Reviewing. |
| switch (errorTypesResult.destination()) { | ||
| case Failure: | ||
| getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {i, e}, e); | ||
| getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, exception}, exception); |
There was a problem hiding this comment.
The logger can handle arbitrary number of arguments.
| getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, exception}, exception); | |
| getLogger().error("Failed to update database for {}; routing to failure", flowFile, exception); |
| getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", | ||
| new Object[] {i, e}, e); | ||
| new Object[] {flowFile, exception}, exception); |
| break; | ||
| case Self: | ||
| getLogger().error("Failed to update database for {} due to {};", new Object[] {i, e}, e); | ||
| getLogger().error("Failed to update database for {} due to {};", new Object[] {flowFile, exception}, exception); |
| private ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { | ||
| ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError | ||
| = ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); | ||
| onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> { | ||
|
|
||
| switch (errorTypesResult.destination()) { | ||
| case Failure: | ||
| List<FlowFile> flowFilesToFailure = getFlowFilesOnRelationShip(result, REL_FAILURE); | ||
| Optional.ofNullable(flowFilesToFailure).map(flowFiles -> | ||
| result.getRoutedFlowFiles().put(REL_FAILURE, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); | ||
| break; | ||
| case Retry: | ||
| List<FlowFile> flowFilesToRetry = getFlowFilesOnRelationShip(result, REL_RETRY); | ||
| Optional.ofNullable(flowFilesToRetry).map(flowFiles -> | ||
| result.getRoutedFlowFiles().put(REL_RETRY, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); | ||
| break; | ||
| } | ||
| }); | ||
| return onGroupError; | ||
| } |
There was a problem hiding this comment.
The RoutingResult::getRoutedFlowFiles already returns an empty map, so the getFlowFilesOnRelationShip does not need to use Optional and should return an empty list in case the flowfiles for the specified relationship are missing. This means that flowFilesToFailure variable in onGroupError cannot be null and also doesn't need to be wrapped in Optionals. I'd also recommend simplifying the 2-branched switch to an if statement for visibility and extracting the condition on getting the Relationship based on the ErrorTypesResult.Destination since both branches depend on that value i.e. -> the if statement is not needed.
| private ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { | |
| ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError | |
| = ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); | |
| onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> { | |
| switch (errorTypesResult.destination()) { | |
| case Failure: | |
| List<FlowFile> flowFilesToFailure = getFlowFilesOnRelationShip(result, REL_FAILURE); | |
| Optional.ofNullable(flowFilesToFailure).map(flowFiles -> | |
| result.getRoutedFlowFiles().put(REL_FAILURE, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); | |
| break; | |
| case Retry: | |
| List<FlowFile> flowFilesToRetry = getFlowFilesOnRelationShip(result, REL_RETRY); | |
| Optional.ofNullable(flowFilesToRetry).map(flowFiles -> | |
| result.getRoutedFlowFiles().put(REL_RETRY, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); | |
| break; | |
| } | |
| }); | |
| return onGroupError; | |
| } | |
| private ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { | |
| ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError = | |
| ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); | |
| onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> { | |
| Relationship relationship = errorTypesResult.destination() == ErrorTypes.Destination.Failure ? REL_FAILURE : REL_RETRY; | |
| List<FlowFile> flowFilesToRelationship = getFlowFilesOnRelationship(result, relationship); | |
| result.getRoutedFlowFiles().put(relationship, addErrorAttributesToFlowFilesInGroup(session, flowFilesToRelationship, flowFileGroup.getFlowFiles(), exception)); | |
| }); | |
| return onGroupError; | |
| } |
| private List<FlowFile> getFlowFilesOnRelationShip(RoutingResult result, final Relationship relationship) { | ||
| return Optional.of(result.getRoutedFlowFiles()) | ||
| .orElse(emptyMap()) | ||
| .get(relationship); | ||
| } |
There was a problem hiding this comment.
Since the getRoutedFlowFiles already returns an empty map, the optional is not necessary here.
| private List<FlowFile> getFlowFilesOnRelationShip(RoutingResult result, final Relationship relationship) { | |
| return Optional.of(result.getRoutedFlowFiles()) | |
| .orElse(emptyMap()) | |
| .get(relationship); | |
| } | |
| private List<FlowFile> getFlowFilesOnRelationship(RoutingResult result, final Relationship relationship) { | |
| return Optional.ofNullable(result.getRoutedFlowFiles().get(relationship)) | |
| .orElse(emptyList()); | |
| } |
| attributes.put(ERROR_SQL_STATE_ATTR, valueOf(((SQLException) exception).getSQLState())); | ||
| } | ||
|
|
||
| return session.putAllAttributes(flowFile, attributes); |
There was a problem hiding this comment.
Is it guaranteed that the flowfile doesn't have original attributes before being updated with the error ones or does the method only update and not replace the flowfile attributes?
There was a problem hiding this comment.
The method adds extra attributes to the FlowFile, it does not remove any existing attribute. (Added a check in unit tests to verify this.)
Lehel44
left a comment
There was a problem hiding this comment.
Thanks for making the changes @krisztina-zsihovszki!
LGTM+1
|
+1 LGTM, Merging to main and support/nifi-1.x |
Summary
NIFI-4651
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
mvn clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation