Skip to content

NIFI-6638: Empty multiple queues at once at different flow levels#3700

Closed
VinceCastro wants to merge 6 commits intoapache:masterfrom
VinceCastro:empty-queues
Closed

NIFI-6638: Empty multiple queues at once at different flow levels#3700
VinceCastro wants to merge 6 commits intoapache:masterfrom
VinceCastro:empty-queues

Conversation

@VinceCastro
Copy link

@VinceCastro VinceCastro commented Sep 6, 2019

This PR also refers to NIFI-3632, NIFI-5329 and NIFI-4308.

Description of PR

Replaced the current "Empty queue" button in the context menu with an "Empty queues" sub-menu providing different options to empty multiple queues at different levels of the flow depending on the selected components.

In particular, different buttons have been added to the new "Empty queues" sub-menu:

  • selected queue: empty the selected queue
  • selected queues: empty the selected queues
  • current process group: empty all queues inside the current process group
  • current process group (recursive): empty all queues inside the current process group recursively, which means all queues inside the current process group, and its sub process groups, and their sub process groups [and so on] will be emptied
  • selected process group: empty all queues inside the selected process group
  • selected process group (recursive): empty all queues inside the selected process group recursively
  • selected process groups: empty all queues inside the selected process groups
  • selected process groups (recursive): empty all queues inside the selected process groups recursively

Errors management: if the emptying process fails to empty one or more queues, it keeps going till all the selected queues are processed. After that, the final summary dialog eventually provides information about the errors faced in emptying one or more queues.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • Have you verified that the full build is successful on both JDK 8 and JDK 11?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

Added method "containsConnections" and "containsProcessGroups" to check if a selection contains respectively one or more connections and one or more process groups.

Added method "toReadableBytes" to convert a certain number indicating bytes to a human readable string.

Added methods to check if one or more queues can be emptied or not.
Enriched the context menu with functions to empty queues at different levels of the flow.

Added methods to get all connections from one or more process groups.
Added a method to empty one or more connections.
Added actions to empty the selected connections, one or more process groups connections, one or more process groups connections in a recursive way.

Widen context menu in order to make room for the new empty queue menu entry labels.

Tiny css modification in order to allow dialog headers to contain an always automatically centered text that can now go on a new line if allowed width has been exceeded.

Updated doc with images showing the menu supporting the new empty queues functionality.
@adarmiento
Copy link
Contributor

adarmiento commented Sep 9, 2019

Are we sure we need that many functionalities?

selected queue: empty the selected queue
selected queues: empty the selected queues
current process group: empty all queues inside the current process group
current process group (recursive): empty all queues inside the current process group recursively, > which means all queues inside the current process group, and its sub process groups, and their sub process groups [and so on] will be emptied
selected process group: empty all queues inside the selected process group
selected process group (recursive): empty all queues inside the selected process group recursively
selected process groups: empty all queues inside the selected process groups
selected process groups (recursive): empty all queues inside the selected process groups recursively

Are there use cases in which you are interested into emptying all queue of a processor group which is not the current one AND at the same time we do not want to delete the queue recursively?

I think that the most useful features are:

selected queue: empty the selected queue
selected queues: empty the selected queues
current process group (recursive): empty all queues inside the current process group recursively

And therefore I'd start by introducing these ones (which are by chance the most comprehensive, so it would be easy to add the other ones in a second moment) and potentially add the other cases with another PR

* @Param {string} actionName
* @param {Array} connections
* @Param {Array} errors
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove empty line

Suggested change
*

@@ -564,7 +564,7 @@ div.nifi-tooltip {
background-color:rgba(249,250,251,0.97); /*tint base-color 96%*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
background-color:rgba(249,250,251,0.97); /*tint base-color 96%*/
background-color: rgba(249,250,251,0.97); /*tint base-color 96%*/

@@ -564,7 +564,7 @@ div.nifi-tooltip {
background-color:rgba(249,250,251,0.97); /*tint base-color 96%*/
border:1px solid #004849; /*link-color*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
border:1px solid #004849; /*link-color*/
border: 1px solid #004849; /*link-color*/

width: 215px;
width: 235px;
max-height: inherit;
color:#004849
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
color:#004849
color: #004849;


// set the progress bar to a certain percentage
var setProgressBar = function (percentComplete) {
if($("#drop-request-percent-complete .progress-label").length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if($("#drop-request-percent-complete .progress-label").length) {
if ($("#drop-request-percent-complete.progress-label").length) {

var actionName = '';
var dialogContent = '';

if(selectionSize === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(selectionSize === 0) {
if (selectionSize === 0) {

dialogContent = 'Are you sure you want to empty all queues inside the current process group and all its sub process groups (recursive)? All FlowFiles waiting at the time of the request will be removed.';
connections = d3.selectAll('g.connection').data();
}
else if(selectionSize === 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
else if(selectionSize === 1) {
else if (selectionSize === 1) {

.map(function (processGroup) {
return processGroup.id;
})
:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you rather prefer the following?

var processGroupIDs;

if (selectionSize === 0) {
   processGroupIDs = ...
}
else {
   processGroupIDs = ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more readable and coherent with the rest of the codebase IMO

nfErrorHandler.handleAjaxError(xhr, status, error);
} else {
completeDropRequest()
if(connections.length === 0 && errors.length === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(connections.length === 0 && errors.length === 0) {
if (connections.length === 0 && errors.length === 0) {

toReadableBytes: function (bytes) {
var sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];

if(bytes === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(bytes === 0) {
if (bytes === 0) {

@adarmiento
Copy link
Contributor

Some minor style fixes in the suggestions

@mcgilman
Copy link
Contributor

@VinceCastro Thanks for the PR! This capability is important and is something that many folks have requested. The proposed solution will perform an unbounded number of requests in order to clear all nested queues. I think we need to change this as this solution likely won't scale well with larger data flows.

When the subject to the action is a process group, we should issue a single request to the backend with a flag indicating if the request is recursive. The clear queue action is an asynchronous request that the UI polls while the action completes. When the action encompasses multiple queues we should be able to provide this context with the same asynchronous request/response model.

@VinceCastro
Copy link
Author

Got it! So you suggest to move the empty multiple queues logic from the front-end to the back-end side by adding one or more rest api to the exsisting endpoints. We may have cases where the user wants to empty different selected process groups at once, or different selected queues at once, so I would suggest to add two rest api to the "FlowFile Queues" endpoint: the first one should accept a list of process group IDs in order to empty the queues inside them(recursively or not); the second one should accept a list of connections IDs to empty. We may collect all IDs into a json body.

This way we can efficiently cover all cases where the user wants to:

  • empty the selected queue
  • empty the selected queues
  • empty the selected process group (recursively or not)
  • empty the selected process groups (recursively or not)
  • empty the current process group (recursively or not)

What do you think about?

@adarmiento you stated:

I think that the most useful features are:

selected queue: empty the selected queue
selected queues: empty the selected queues
current process group (recursive): empty all queues inside the current process group recursively

What if I want to empty only a subset of the flow I'm currently viewing? In that case features like "empty the selected process group" or "empty the selected process groups" are necessary, do you agree?

However, as mentioned by @adarmiento

Are there use cases in which you are interested into emptying all queue of a processor group which is not the current one AND at the same time we do not want to delete the queue recursively?

I have doubts about the usefulness of the non recursive actions. Are there use cases where we do not really need to empty a process group recursively? Do you guys @mcgilman @adarmiento suggest to add the non recursive actions or postpone them to another pr?

@adarmiento
Copy link
Contributor

I'd say that to empty a subset of the flow I am currently viewing, I'd go for the "selected queues: empty the selected queues", however, I understand your urge to add more convenience methods.
I'd say I would add all the non-core feature in 1+ separate PRs, keeping this "core" implementation as simple as possible.

@patricker
Copy link
Contributor

@VinceCastro I'd leave the choice to keep it as a single PR, or split it, up to your comfort level with the back-end code. This is your first contribution, have you looked at the back-end layer?

A few file pointers in case you want to look into this:

  • The main end point is in nifi-web-api/org.apache.nifi.web.api.FlowFileQueueResource
  • Do a quick search for @Path("{id}/drop-requests") and you'll find the end point.

This is a POST for a single Queue. The ID of the queue goes in the URL. You might be able to replace this with the ID of a process group and then do a lookup, but probably easier to create a new end point for @Path("{id}/process-group-drop-requests.

But what about submitting multiple queues async? I think you would need another endpoint there too, and have the body of the post contain some JSON describing the list.

Interestingly, the DropRequestDTO does not specify which Queue it's for, so you wouldn't need to worry about that part too much. The harder part is probably the actual drop logic in NiFi, and updating the other internal objects like StandardNiFiServiceFacade.createFlowFileDropRequest.

@VinceCastro
Copy link
Author

Hi and thanks for the useful tips! I'm currently checking the back-end out, I'm going to decide later whether or not to split the pr.

@VinceCastro
Copy link
Author

@adarmiento @mcgilman @patricker I moved the multiple queues emptying logic from the front-end to the back-end side, could you please check the changes out?

@adarmiento
Copy link
Contributor

Could you please add some more test cases enforcing your expected behavior?

DropFlowFileState state = null;
boolean allFinished = true;
String failureReason = null;
List<DropRequestDTO.FailureReason> failureReason = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a List, maybe it would be more explanatory to refactor this as failureReasons


public DropFlowFileRequest(final String identifier) {
this.identifier = identifier;
public DropFlowFileRequest(final String requestIdentifier,final String connectionIdentifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
public DropFlowFileRequest(final String requestIdentifier,final String connectionIdentifier) {
public DropFlowFileRequest(final String requestIdentifier, final String connectionIdentifier) {

Comment on lines +39 to +45
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Collections;
import java.util.Comparator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Usually import are left in alphabetical order (package then classes)

}
}

multiQueueDropRequestMap.put(requestIdentifier,new HashSet<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
multiQueueDropRequestMap.put(requestIdentifier,new HashSet<>());
multiQueueDropRequestMap.put(requestIdentifier, new HashSet<>());

public synchronized LoadBalanceCompression getLoadBalanceCompression() {
return compression;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

.max()
.orElseGet(() -> dto.getSubmissionTime().getTime()))
);
//dto.setState(dropRequest.getState().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

dto.setFailureReasons(dropRequests.stream()
.filter(dropRequest -> dropRequest.getFailureReason() != null)
.map(dropRequest -> {
return new DropRequestDTO.FailureReason(dropRequest.getConnectionIdentifier(),dropRequest.getFailureReason()); })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return new DropRequestDTO.FailureReason(dropRequest.getConnectionIdentifier(),dropRequest.getFailureReason()); })
return new DropRequestDTO.FailureReason(dropRequest.getConnectionIdentifier(), dropRequest.getFailureReason()); })

dto.setFinished(isDropRequestComplete(
dropRequests.stream()
.map(DropFlowFileStatus::getState)
.reduce(DropFlowFileState.COMPLETE,(a,b) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.reduce(DropFlowFileState.COMPLETE,(a,b) -> {
.reduce(DropFlowFileState.COMPLETE, (a,b) -> {

return dropRequest;
}

public static Set<DropFlowFileStatus> dropFlowFiles(final Set<FlowFileQueue> flowFileQueues, final String requestIdentifier, final String requestor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a little doc to this method?

if (multiQueueDropRequestMap.size() > 50) {
final Set<String> toDrop = new HashSet<>();

for (final Map.Entry<String, Set<DropFlowFileRequest>> entry : multiQueueDropRequestMap.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part looks very complex. Could you please add a little comment?

@joewitt
Copy link
Contributor

joewitt commented Mar 25, 2021

closing due to inactivity. but looks like it was a great effort/contrib. hopefully restored at some point

@joewitt joewitt closed this Mar 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants