From c80948ee68c8bf47af58875144c05ddaa729f33f Mon Sep 17 00:00:00 2001 From: Jon Kessler Date: Tue, 12 Nov 2019 13:59:23 +0000 Subject: [PATCH 1/2] NIFI-6831 Creating a global priority-aware flowfile queue implementation. --- .../org/apache/nifi/action/Component.java | 3 +- .../org/apache/nifi/util/NiFiProperties.java | 10 + .../nifi/web/api/dto/PriorityRuleDTO.java | 87 +++ .../web/api/entity/CurrentUserEntity.java | 13 + .../web/api/entity/PriorityRuleEntity.java | 42 ++ .../resource/ResourceFactory.java | 41 ++ .../authorization/resource/ResourceType.java | 1 + .../endpoints/CurrentUserEndpointMerger.java | 1 + .../CurrentUserEndpointMergerTest.java | 2 + .../nifi-framework-core/pom.xml | 12 + .../nifi/controller/FlowController.java | 30 +- .../queue/GlobalPriorityFlowFileQueue.java | 658 ++++++++++++++++++ .../java/org/apache/nifi/priority/Rule.java | 218 ++++++ .../apache/nifi/priority/RulesManager.java | 337 +++++++++ .../GlobalPriorityFlowFileQueueTest.java | 144 ++++ .../nifi/priority/RulesManagerTest.java | 143 ++++ .../src/test/resources/priorityRules.json | 15 + .../nifi-framework/nifi-resources/pom.xml | 5 + .../src/main/resources/conf/nifi.properties | 6 + .../nifi/prioritizer/BucketPrioritizer.java | 27 + ...g.apache.nifi.flowfile.FlowFilePrioritizer | 3 +- .../nifi/audit/PriorityRulesAuditor.java | 114 +++ .../authorization/AuthorizableLookup.java | 7 + .../StandardAuthorizableLookup.java | 21 +- .../nifi/web/NiFiWebApiResourceConfig.java | 1 + .../nifi/web/StandardNiFiServiceFacade.java | 4 + .../nifi/web/api/PriorityRuleResource.java | 198 ++++++ .../main/resources/nifi-web-api-context.xml | 13 + .../nifi-web/nifi-web-ui/pom.xml | 40 ++ .../filters/priority-rules-min.properties | 21 + .../filters/priority-rules.properties | 27 + .../webapp/WEB-INF/pages/priority-rules.jsp | 66 ++ .../WEB-INF/partials/canvas/canvas-header.jsp | 8 + .../priority-rules/priority-rules-content.jsp | 62 ++ .../src/main/webapp/WEB-INF/web.xml | 10 + .../src/main/webapp/css/priority-rules.css | 98 +++ .../nf-ng-canvas-global-menu-controller.js | 13 + .../js/nf/canvas/nf-policy-management.js | 5 +- .../src/main/webapp/js/nf/nf-common.js | 12 + .../priority-rules/nf-priority-rules-table.js | 390 +++++++++++ .../js/nf/priority-rules/nf-priority-rules.js | 351 ++++++++++ 41 files changed, 3252 insertions(+), 7 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/PriorityRuleDTO.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PriorityRuleEntity.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/GlobalPriorityFlowFileQueue.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/priority/Rule.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/priority/RulesManager.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/GlobalPriorityFlowFileQueueTest.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/priority/RulesManagerTest.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/priorityRules.json create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/BucketPrioritizer.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/PriorityRulesAuditor.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/PriorityRuleResource.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/priority-rules-min.properties create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/priority-rules.properties create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/priority-rules.jsp create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/priority-rules/priority-rules-content.jsp create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/priority-rules.css create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/priority-rules/nf-priority-rules-table.js create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/priority-rules/nf-priority-rules.js diff --git a/nifi-api/src/main/java/org/apache/nifi/action/Component.java b/nifi-api/src/main/java/org/apache/nifi/action/Component.java index 95d8ba066dae..d8221083178f 100644 --- a/nifi-api/src/main/java/org/apache/nifi/action/Component.java +++ b/nifi-api/src/main/java/org/apache/nifi/action/Component.java @@ -34,5 +34,6 @@ public enum Component { ParameterContext, AccessPolicy, User, - UserGroup; + UserGroup, + PriorityRule; } diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index b67c61a03233..991dac43daf6 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -249,6 +249,11 @@ public abstract class NiFiProperties { public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME= "nifi.analytics.connection.model.score.name"; public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold"; + // global priority flowfile queue properties + public static final String CONTROLLER_FLOWFILEQUEUE_BUCKETS = "nifi.controller.flowfilequeue.buckets"; + public static final String CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES = "nifi.controller.flowfilequeue.buckets.cache.expiration.minutes"; + public static final String PRIORITY_RULESMANAGER_CONFIG = "nifi.priority.rulesmanager.config"; + // defaults public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml"; @@ -326,6 +331,11 @@ public abstract class NiFiProperties { public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = "rSquared"; public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = .90; + // global priority queue defaults + public static final String DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS = "false"; + public static final int DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES = 5; + public static final String DEFAULT_PRIORITY_RULESMANAGER_CONFIG = "./conf/priorityRules.json"; + /** * Retrieves the property value for the given property key. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/PriorityRuleDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/PriorityRuleDTO.java new file mode 100644 index 000000000000..26dedd355538 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/PriorityRuleDTO.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +@XmlType(name = "priorityRule") +public class PriorityRuleDTO { + private String id; + private String expression; + private String label; + private int rateOfThreadUsage; + private boolean expired; + + @ApiModelProperty(value = "The id of the priority rule") + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @ApiModelProperty(value = "The expression of the priority rule") + public String getExpression() { + return expression; + } + + public void setExpression(String expression) { + this.expression = expression; + } + + @ApiModelProperty(value = "An optional human readable label for the priority rule") + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + @ApiModelProperty(value = "The % of the time files of this priority will be given to an available thread for processing when polled") + public int getRateOfThreadUsage() { + return rateOfThreadUsage; + } + + public void setRateOfThreadUsage(int rateOfThreadUsage) { + this.rateOfThreadUsage = rateOfThreadUsage; + } + + @ApiModelProperty(value = "Whether or not the priority rule is expired") + public boolean isExpired() { + return expired; + } + + public void setExpired(boolean expired) { + this.expired = expired; + } + + @Override + public String toString() { + return "PriorityRuleDTO{" + + "id='" + id + '\'' + + ", expression='" + expression + '\'' + + ", label='" + label + '\'' + + ", rateOfThreadUsage=" + rateOfThreadUsage + + ", expired=" + expired + + '}'; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CurrentUserEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CurrentUserEntity.java index a020a487b5d8..1be0ec02d543 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CurrentUserEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CurrentUserEntity.java @@ -40,6 +40,7 @@ public class CurrentUserEntity extends Entity { private PermissionsDTO systemPermissions; private PermissionsDTO parameterContextPermissions; private PermissionsDTO restrictedComponentsPermissions; + private PermissionsDTO priorityRulesPermissions; private Set componentRestrictionPermissions; private boolean canVersionFlows; @@ -80,6 +81,18 @@ public void setProvenancePermissions(PermissionsDTO provenancePermissions) { this.provenancePermissions = provenancePermissions; } + /* + * @return permissions for accessing priority rules + */ + @ApiModelProperty("Permissions for accessing priority rules") + public PermissionsDTO getPriorityRulesPermissions() { + return priorityRulesPermissions; + } + + public void setPriorityRulesPermissions(PermissionsDTO priorityRulesPermissions) { + this.priorityRulesPermissions = priorityRulesPermissions; + } + /** * @return permissions for accessing counters */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PriorityRuleEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PriorityRuleEntity.java new file mode 100644 index 000000000000..68d5ad90994a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PriorityRuleEntity.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.PriorityRuleDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "priorityRuleEntity") +public class PriorityRuleEntity { + private PriorityRuleDTO priorityRule; + + public PriorityRuleDTO getPriorityRule() { + return priorityRule; + } + + public void setPriorityRule(PriorityRuleDTO priorityRule) { + this.priorityRule = priorityRule; + } + + @Override + public String toString() { + return "PriorityRuleEntity{" + + "priorityRule=" + priorityRule + + '}'; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java index 386f90a30c1e..3335b1a49590 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java @@ -74,6 +74,23 @@ public String getSafeDescription() { } }; + private final static Resource PRIORITY_RULES_RESOURCE = new Resource() { + @Override + public String getIdentifier() { + return ResourceType.PriorityRules.getValue(); + } + + @Override + public String getName() { + return "Priority Rules for "; + } + + @Override + public String getSafeDescription() { + return "priority rules"; + } + }; + private final static Resource COUNTERS_RESOURCE = new Resource() { @Override public String getIdentifier() { @@ -468,6 +485,30 @@ public String getSafeDescription() { }; } + /** + * Gets a Resource for accessing priority rules. + * + * @return The resource + */ + public static Resource getPriorityRulesResource() { + return new Resource() { + @Override + public String getIdentifier() { + return PRIORITY_RULES_RESOURCE.getIdentifier(); + } + + @Override + public String getName() { + return PRIORITY_RULES_RESOURCE.getName(); + } + + @Override + public String getSafeDescription() { + return PRIORITY_RULES_RESOURCE.getSafeDescription(); + } + }; + } + /** * Gets a Resource for accessing component operations. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceType.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceType.java index 45582fceeaf7..ab390b50933a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceType.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceType.java @@ -26,6 +26,7 @@ public enum ResourceType { Label("/labels"), OutputPort("/output-ports"), Policy("/policies"), + PriorityRules("/priority-rules"), Processor("/processors"), ProcessGroup("/process-groups"), Provenance("/provenance"), diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMerger.java index 03691c4bc717..0920a58cd469 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMerger.java @@ -56,6 +56,7 @@ protected void mergeResponses(final CurrentUserEntity clientEntity, final Map clientEntityComponentRestrictionsPermissions = clientEntity.getComponentRestrictionPermissions(); final Set entityComponentRestrictionsPermissions = entity.getComponentRestrictionPermissions(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java index a93cd68e6650..024b3b8dc0ae 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java @@ -48,6 +48,7 @@ public void testMergeUserPermissions() { userNode1.setRestrictedComponentsPermissions(buildPermissions(false, false)); userNode1.setSystemPermissions(buildPermissions(true, true)); userNode1.setTenantsPermissions(buildPermissions(false, true)); + userNode1.setPriorityRulesPermissions(buildPermissions(true, true)); final Set componentRestrictionsNode1 = new HashSet<>(); componentRestrictionsNode1.add(buildComponentRestriction(RequiredPermission.ACCESS_KEYTAB, true, true)); @@ -64,6 +65,7 @@ public void testMergeUserPermissions() { userNode2.setRestrictedComponentsPermissions(buildPermissions(true, true)); userNode2.setSystemPermissions(buildPermissions(false, false)); userNode2.setTenantsPermissions(buildPermissions(true, true)); + userNode2.setPriorityRulesPermissions(buildPermissions(true, true)); final Set componentRestrictionsNode2 = new HashSet<>(); componentRestrictionsNode2.add(buildComponentRestriction(RequiredPermission.ACCESS_KEYTAB, true, false)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index d9b6659ec400..f548d33d9f24 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -247,6 +247,17 @@ 1.0.1 compile + + com.google.code.gson + gson + 2.8.2 + + + org.apache.nifi + nifi-standard-prioritizers + 1.10.0-SNAPSHOT + compile + @@ -262,6 +273,7 @@ src/test/resources/old-swap-file.swap src/test/resources/xxe_template.xml src/test/resources/swap/444-old-swap-file.swap + src/test/resources/priorityRules.json diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 86cfab9da17c..866cd5f8ae61 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -17,6 +17,12 @@ package org.apache.nifi.controller; import static java.util.Objects.requireNonNull; +import static org.apache.nifi.util.NiFiProperties.CONTROLLER_FLOWFILEQUEUE_BUCKETS; +import static org.apache.nifi.util.NiFiProperties.CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES; +import static org.apache.nifi.util.NiFiProperties.DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS; +import static org.apache.nifi.util.NiFiProperties.DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES; +import static org.apache.nifi.util.NiFiProperties.DEFAULT_PRIORITY_RULESMANAGER_CONFIG; +import static org.apache.nifi.util.NiFiProperties.PRIORITY_RULESMANAGER_CONFIG; import java.io.ByteArrayInputStream; import java.io.File; @@ -98,6 +104,7 @@ import org.apache.nifi.controller.queue.ConnectionEventListener; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueueFactory; +import org.apache.nifi.controller.queue.GlobalPriorityFlowFileQueue; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.StandardFlowFileQueue; @@ -177,6 +184,7 @@ import org.apache.nifi.parameter.ParameterContextManager; import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.parameter.StandardParameterContextManager; +import org.apache.nifi.priority.RulesManager; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ComponentIdentifierLookup; @@ -267,6 +275,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node private final StateManagerProvider stateManagerProvider; private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started private final VariableRegistry variableRegistry; + private final RulesManager rulesManager; private final ConnectionLoadBalanceServer loadBalanceServer; private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry; @@ -502,6 +511,13 @@ private FlowController( throw new RuntimeException(e); } + String rulesManagerConfigString = nifiProperties.getProperty(PRIORITY_RULESMANAGER_CONFIG, DEFAULT_PRIORITY_RULESMANAGER_CONFIG); + File rulesManagerConfigFile = new File(rulesManagerConfigString); + this.rulesManager = new RulesManager(rulesManagerConfigFile, stateManagerProvider); + // The following purge only does something if stateManagerProvider is not null + this.rulesManager.purgeExpiredRulesFromState(); + this.rulesManager.readRules(); + processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, encryptor, stateManagerProvider, this.nifiProperties); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); @@ -734,6 +750,10 @@ public void run() { } } + public RulesManager getRulesManager() { + return this.rulesManager; + } + @Override public Authorizable getParentAuthorizable() { return null; @@ -1861,7 +1881,15 @@ public EventReporter getEventReporter() { public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) { final FlowFileQueue flowFileQueue; - if (clusterCoordinator == null) { + String useBucketQueues = nifiProperties.getProperty(CONTROLLER_FLOWFILEQUEUE_BUCKETS, DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS); + + if(useBucketQueues.equalsIgnoreCase("true")) { + int cacheExpiration = nifiProperties.getIntegerProperty(CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES, + DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES); + flowFileQueue = new GlobalPriorityFlowFileQueue(id, processScheduler, flowFileRepository, provenanceRepository, resourceClaimManager, rulesManager, cacheExpiration, + nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold(), swapManager, nifiProperties.getQueueSwapThreshold(), + eventReporter); + } else if (clusterCoordinator == null) { flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager, eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold()); } else { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/GlobalPriorityFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/GlobalPriorityFlowFileQueue.java new file mode 100644 index 000000000000..f37b051865c5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/GlobalPriorityFlowFileQueue.java @@ -0,0 +1,658 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.queue; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.SwapContents; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.swap.StandardSwapSummary; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.prioritizer.BucketPrioritizer; +import org.apache.nifi.priority.Rule; +import org.apache.nifi.priority.RulesManager; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.reporting.Severity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/* + * This connection works by sorting records based on satisfying globally defined priority rules. Those rule evaluations are tracked + * statically so that they persist as records move from queue to queue unless they are forced to be re-evaluated. When a poll + * occurs this connection will consider the highest priority record it currently holds and determine whether or not data with a higher + * priority was recently polled elsewhere. If that is the case there is a chance that this connection will give up its thread rather + * than returning a record so as not to thread-starve higher priority data elsewhere in the system. + */ +public class GlobalPriorityFlowFileQueue extends AbstractFlowFileQueue { + private static final Logger LOGGER = LoggerFactory.getLogger(GlobalPriorityFlowFileQueue.class); + // Manages the set of rules against which flowfiles are evaluated + private final RulesManager rulesManager; + + // This cache will track what rule a FlowFileRecord satisfied the last time it was evaluated by any GlobalPriorityFlowFileQueue + static final AtomicReference> FLOW_FILE_RULE_CACHE = new AtomicReference<>(); + // ruleBuckets has essentially replaced the priority queue from other connection implementations. This is where we will track all + // FlowFileRecords that this connection is aware of. + private final AtomicReference>> ruleBuckets = new AtomicReference<>(new ConcurrentHashMap<>()); + + private final AtomicLong bytesInBuckets = new AtomicLong(0L); + private final AtomicInteger objectsInBuckets = new AtomicInteger(0); + private final AtomicInteger numUnacknowledgedFlowFiles = new AtomicInteger(0); + private final AtomicLong numUnacknowledgedBytes = new AtomicLong(0L); + private final AtomicInteger numSwapFlowFiles = new AtomicInteger(0); + private final AtomicLong numSwapFileBytes = new AtomicLong(0L); + + // Members used to slow processing of lower priority files while higher priority files exist + private static final Map RULE_LAST_POLL_TIME = new ConcurrentHashMap<>(); + private static final Random RANDOM = new Random(); + + private volatile List prioritizers = Collections.emptyList(); + + // When set to true all incoming records will be re-evaluated against the global ruleset during a put operation + volatile boolean alwaysReevaluate = false; + + private final FlowFileSwapManager flowFileSwapManager; + + final static int MAX_EXPIRED_RECORDS = 10_000; + private static final int SWAP_RECORD_POLL_SIZE = 10_000; + private static final int SWAP_RECORD_PUSH_SIZE = SWAP_RECORD_POLL_SIZE / 2; + private final int queueSwapThreshold; + private final EventReporter eventReporter; + + private final Queue swapLocations = new LinkedBlockingQueue<>(); + + public GlobalPriorityFlowFileQueue(String identifier, ProcessScheduler processScheduler, FlowFileRepository flowFileRepository, + ProvenanceEventRepository provenanceEventRepository, ResourceClaimManager resourceClaimManager, + RulesManager rulesManager, long cacheAccessExpiration, long defaultBackPressureObjectThreshold, + final String defaultBackPressureDataSizeThreshold, FlowFileSwapManager flowFileSwapManager, + int queueSwapThreshold, EventReporter eventReporter) { + super(identifier, processScheduler, flowFileRepository, provenanceEventRepository, resourceClaimManager); + this.rulesManager = rulesManager; + this.flowFileSwapManager = flowFileSwapManager; + this.queueSwapThreshold = queueSwapThreshold; + this.eventReporter = eventReporter; + setBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold); + setBackPressureObjectThreshold(defaultBackPressureObjectThreshold); + if(FLOW_FILE_RULE_CACHE.get() == null) { + FLOW_FILE_RULE_CACHE.compareAndSet(null, CacheBuilder.newBuilder() + .expireAfterAccess(cacheAccessExpiration, TimeUnit.MINUTES) + .build(new CacheLoader() { + @Override + public Rule load(FlowFileRecord flowFileRecord) throws Exception { + return Rule.UNEVALUATED; + } + })); + } + } + + /** + * @return A list of FlowFileRecords that this connection contains sorted in order by priority + */ + @Override + protected List getListableFlowFiles() { + List listableFlowFiles = new ArrayList<>(objectsInBuckets.get()); + rulesManager.getRuleList().forEach(rule -> { + if(ruleBuckets.get().containsKey(rule)) { + listableFlowFiles.addAll(ruleBuckets.get().get(rule)); + } + }); + return listableFlowFiles; + } + + @Override + protected void dropFlowFiles(DropFlowFileRequest dropFlowFileRequest, String requestor) { + dropFlowFiles(dropFlowFileRequest, requestor, true); + } + + protected void dropFlowFiles(DropFlowFileRequest dropFlowFileRequest, String requestor, boolean setCompletionState) { + // Drop files from the system in batches of this size + int batchSize = 1000; + Map> currentRuleBuckets; + + currentRuleBuckets = ruleBuckets.getAndSet(new ConcurrentHashMap<>()); + + LOGGER.debug("Executing DropFlowFileRequest for requestor {}", requestor); + try { + dropFlowFileRequest.setState(DropFlowFileState.DROPPING_FLOWFILES); + + // Used to track how many flowfiles we've batched so far to determine whether or not its time to perform a drop + long index = 0; + List recordsToDrop = new ArrayList<>(batchSize); + final FlowFileRecord[] flowFileRecord = new FlowFileRecord[1]; + + // Start dropping flowfiles from these queues in batches of batchSize + for(Queue recordQueue : currentRuleBuckets.values()) { + while(!dropFlowFileRequest.getState().equals(DropFlowFileState.CANCELED) && (flowFileRecord[0] = recordQueue.poll()) != null) { + recordsToDrop.add(flowFileRecord[0]); + // We've hit batchSize. Perform a drop. + if(++index % batchSize == 0) { + dropFlowFileRequest.setDroppedSize(dropFlowFileRequest.getDroppedSize().add(dropFlowFiles(recordsToDrop, requestor))); + recordsToDrop.clear(); + } + } + + // Drop was cancelled, return these records to the appropriate bucket + if(dropFlowFileRequest.getState().equals(DropFlowFileState.CANCELED)) { + recordsToDrop.forEach(record -> put(record, false)); + return; + } + } + + if(!recordsToDrop.isEmpty()) { + dropFlowFileRequest.setDroppedSize(dropFlowFileRequest.getDroppedSize().add(dropFlowFiles(recordsToDrop, requestor))); + } + + if(!dropFlowFileRequest.getState().equals(DropFlowFileState.CANCELED) && swapInFilesIfNecessary()) { + dropFlowFiles(dropFlowFileRequest, requestor, false); + } + + // We only want the top level of recursion to set this state + if(setCompletionState) { + dropFlowFileRequest.setState(DropFlowFileState.COMPLETE); + } + } catch(IOException e) { + LOGGER.error("Exception during dropRequest {}", dropFlowFileRequest, e); + dropFlowFileRequest.setState(DropFlowFileState.FAILURE, e.getMessage()); + } finally { + // If this collection contains any records at this point then the operation was either cancelled + // or there was an exception. Return the remaining records from whence they came + currentRuleBuckets.values().stream().flatMap(Queue::stream).forEach(record -> put(record, false)); + } + } + + private QueueSize dropFlowFiles(List recordsToDrop, String requestor) throws IOException { + QueueSize droppedQueueSize = drop(recordsToDrop, requestor); + LOGGER.debug("objectsInBuckets: {}, bytesInBuckets: {}\nobjectsDropped: {}, bytesDropped: {}", + objectsInBuckets.get(), bytesInBuckets.get(), droppedQueueSize.getObjectCount(), droppedQueueSize.getByteCount()); + objectsInBuckets.addAndGet(-droppedQueueSize.getObjectCount()); + bytesInBuckets.addAndGet(-droppedQueueSize.getByteCount()); + + return droppedQueueSize; + } + + @Override + public List getPriorities() { + return prioritizers; + } + + @Override + public SwapSummary recoverSwappedFlowFiles() { + try { + List swapLocations = flowFileSwapManager.recoverSwapLocations(this, null); + AtomicInteger queueSize = new AtomicInteger(); + AtomicLong totalQueueBytes = new AtomicLong(); + AtomicReference maxFlowFileId = new AtomicReference<>(null); + List resourceClaims = new ArrayList<>(); + swapLocations.forEach(swapLocation -> { + try { + SwapSummary swapSummary = flowFileSwapManager.getSwapSummary(swapLocation); + LOGGER.info("Located {} flowfiles, {} bytes in swap location {} for queue {}", + swapSummary.getQueueSize().getByteCount(), swapSummary.getQueueSize().getByteCount(), + swapLocation, getIdentifier()); + queueSize.addAndGet(swapSummary.getQueueSize().getObjectCount()); + totalQueueBytes.addAndGet(swapSummary.getQueueSize().getByteCount()); + final Long maxSwapRecordId = swapSummary.getMaxFlowFileId(); + if(maxSwapRecordId != null) { + if(maxFlowFileId.get() == null || maxSwapRecordId > maxFlowFileId.get()) { + maxFlowFileId.set(maxSwapRecordId); + } + } + } catch (IOException e) { + LOGGER.error("Error getting swap summary for location {}", swapLocation, e); + if(eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", + "Failed to recover FlowFiles from SwapFile " + swapLocation + + "; the file appears to be corrupt. See logs for more details."); + } + } + }); + + this.swapLocations.addAll(swapLocations); + numSwapFlowFiles.addAndGet(queueSize.get()); + numSwapFileBytes.addAndGet(totalQueueBytes.get()); + LOGGER.info("Located {} files, {} bytes swapped for queue {}.", queueSize.get(), totalQueueBytes.get(), getIdentifier()); + return new StandardSwapSummary(new QueueSize(queueSize.get(), totalQueueBytes.get()), maxFlowFileId.get(), resourceClaims); + } catch (IOException e) { + LOGGER.error("Error recovering swap locations for queue {}.", getIdentifier(), e); + if(eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", + "Failed to determine whether or not any Swap Files exist for FlowFile Queue " + + this.getIdentifier() + "; see logs for more details."); + } + return null; + } + } + + @Override + public void purgeSwapFiles() { + flowFileSwapManager.purge(); + } + + @Override + public void setPriorities(List newPriorities) { + prioritizers = newPriorities; + alwaysReevaluate = prioritizers.stream().anyMatch(prioritizer -> prioritizer instanceof BucketPrioritizer); + } + + @Override + public QueueSize size() { + return new QueueSize(objectsInBuckets.get() + numSwapFlowFiles.get(), bytesInBuckets.get() + numSwapFlowFiles.get()); + } + + @Override + public boolean isEmpty() { + return objectsInBuckets.get() == 0; + } + + @Override + public boolean isActiveQueueEmpty() { + return isEmpty(); + } + + @Override + public void acknowledge(FlowFileRecord flowFileRecord) { + numUnacknowledgedFlowFiles.decrementAndGet(); + numUnacknowledgedBytes.addAndGet(-flowFileRecord.getSize()); + } + + @Override + public void acknowledge(Collection flowFileRecords) { + numUnacknowledgedFlowFiles.addAndGet(-flowFileRecords.size()); + numUnacknowledgedBytes.addAndGet(-flowFileRecords.stream().mapToLong(FlowFile::getSize).sum()); + } + + @Override + public boolean isUnacknowledgedFlowFile() { + return numUnacknowledgedFlowFiles.get() > 0; + } + + @Override + public void put(FlowFileRecord flowFileRecord) { + put(flowFileRecord, true); + } + + /** Adds a FlowFileRecord to this connection's backing collection + * @param flowFileRecord The record to add + * @param updateMetrics Whether or not to adjust the object and byte counts of this connection using this flowFileRecord + */ + public void put(FlowFileRecord flowFileRecord, boolean updateMetrics) { + Rule rule = FLOW_FILE_RULE_CACHE.get().getUnchecked(flowFileRecord); + + if(alwaysReevaluate || rule.isExpired()) { + rule = rulesManager.getRule(flowFileRecord); + } + + FLOW_FILE_RULE_CACHE.get().put(flowFileRecord, rule); + + ruleBuckets.get().computeIfAbsent(rule, r-> new LinkedBlockingQueue<>()).add(flowFileRecord); + + if(updateMetrics) { + objectsInBuckets.incrementAndGet(); + bytesInBuckets.addAndGet(flowFileRecord.getSize()); + } + + swapOutFilesIfNecessary(); + } + + @Override + public void putAll(Collection flowFileRecords) { + flowFileRecords.forEach(this::put); + } + + public void putAll(Collection flowFileRecords, boolean updateMetrics) { + flowFileRecords.forEach(flowFileRecord -> put(flowFileRecord, updateMetrics)); + } + + /** Will return the highest priority FlowFileRecord from the backing collection that is not expired or penalized + * @param expiredRecords Up to 10,000 expired records that were encountered, at least in part, as part of this poll operation + * @param penalizedFlowFiles Any amount of penalized records that were encountered, at least in part, during this poll operation. + * These records MUST be returned to the collection when polling is done. + * @param stagger Whether or not to consider giving up this thread based on if higher priority data was recently processed + * elsewhere. Set this to false in order to always use the available thread. + * @return An unexpired, un-penalized FlowFileRecord or null if none was found + */ + public FlowFileRecord poll(Set expiredRecords, Set penalizedFlowFiles, boolean stagger) { + swapInFilesIfNecessary(); + + if(objectsInBuckets.get() == 0) { + return null; + } + + // Rules in the RulesManager are ordered by priority + List rules = rulesManager.getRuleList(); + + // Used to track whether or not higher priority data was recently polled so we know whether or not to reduce the odds + // of this thread receiving a FlowFileRecord + boolean higherPriorityDataRecentlyPolled = false; + + // Attempt to poll rules in order of priority + for(Rule rule: rules) { + if(ruleBuckets.get().containsKey(rule) && ruleBuckets.get().get(rule).size() > 0) { + // If higher priority data was recently polled elsewhere, reduce the odds of this thread processing data. + // If we've already encountered expiredRecords then we already beat the odds. Continue purging expired records + // until we find a good one or hit the limit. + // We do not reduce odds if we've already encountered expired or penalized records because that means we have already + // beaten the odds to earn a thread. + if(stagger && higherPriorityDataRecentlyPolled && expiredRecords.isEmpty() && penalizedFlowFiles.isEmpty()) { + LOGGER.debug("Polling while higher priority files are waiting elsewhere."); + + if(RANDOM.nextInt(100) >= rule.getRateOfThreadUsage()) { + LOGGER.debug("Random int didn't allow this to process, better luck next time."); + return null; + } + } + + Queue queue = ruleBuckets.get().get(rule); + FlowFileRecord recordToReturn = queue != null ? queue.poll() : null; + + // Aggregate expired and penalized records as we encounter them + while(recordToReturn != null && (isExpired(recordToReturn) || isPenalized(recordToReturn))) { + // Though we are adjusting counts for the penalized records here they will be adjusted again + // later when the penalized records are put back in. + objectsInBuckets.decrementAndGet(); + bytesInBuckets.addAndGet(-recordToReturn.getSize()); + + // Expired takes precedence as the record will be removed from the system + if(isExpired(recordToReturn)) { + expiredRecords.add(recordToReturn); + if(expiredRecords.size() >= MAX_EXPIRED_RECORDS) { + return null; + } + } else { + // Whether this is part of a single poll or ap oll operation that will return multiple results we will + // aggregate a collection of any penalized flowfiles we encounter. These MUST be returned to their buckets when the operation + // is complete. + penalizedFlowFiles.add(recordToReturn); + } + + recordToReturn = queue.poll(); + } + + if(recordToReturn != null) { + objectsInBuckets.decrementAndGet(); + bytesInBuckets.addAndGet(-recordToReturn.getSize()); + + RULE_LAST_POLL_TIME.put(rule, System.currentTimeMillis()); + numUnacknowledgedFlowFiles.incrementAndGet(); + numUnacknowledgedBytes.addAndGet(recordToReturn.getSize()); + + return recordToReturn; + } + } + + // For each rule for which we do not have data, track the last time anyone polled a file of that priority. Once we know that any + // data of higher priority was recently polled then we no longer have to do this check. + // Do not do this for UNEVALUATED files as we do not want that to cause other files to be staggered. + if(stagger && rule != Rule.UNEVALUATED && !higherPriorityDataRecentlyPolled && RULE_LAST_POLL_TIME.containsKey(rule)) { + long ruleLastPollTime = RULE_LAST_POLL_TIME.get(rule); + long currentTime = System.currentTimeMillis(); + long timeDiff = currentTime - ruleLastPollTime; + + if(timeDiff < TimeUnit.SECONDS.toMillis(5)) { + higherPriorityDataRecentlyPolled = true; + } + } + } + + return null; + } + + private boolean swapInFilesIfNecessary() { + boolean swappedInFiles = false; + List failedSwapLocations = new ArrayList<>(); + // Only swap files in while we have files to swap in and we have room to do so. We don't want to create a situation where we swap in 10,000 files and then + // immediately swap them back out. + while(objectsInBuckets.get() < queueSwapThreshold && numSwapFlowFiles.get() > 0 && !swapLocations.isEmpty() + && (objectsInBuckets.get() < Math.max(queueSwapThreshold/2, queueSwapThreshold - SWAP_RECORD_PUSH_SIZE))) { + String swapLocation = swapLocations.remove(); + try { + SwapContents swapContents = flowFileSwapManager.swapIn(swapLocation, this); + LOGGER.info("Swapped in {} flowfiles", swapContents.getFlowFiles().size()); + putAll(swapContents.getFlowFiles()); + QueueSize swapQueueSize = swapContents.getSummary().getQueueSize(); + numSwapFlowFiles.addAndGet(-swapQueueSize.getObjectCount()); + numSwapFileBytes.addAndGet(-swapQueueSize.getByteCount()); + swappedInFiles = true; + } catch (IOException e) { + LOGGER.error("Error while attempting to swap in files from location {}. Will try again later as necessary.", swapLocation, e); + failedSwapLocations.add(swapLocation); + } + } + + swapLocations.addAll(failedSwapLocations); + + if(failedSwapLocations.size() > 0) { + return false; + } else { + return swappedInFiles; + } + } + + private void swapOutFilesIfNecessary() { + LOGGER.debug("{} objects in buckets, swapping out if >= {}", objectsInBuckets.get(), (queueSwapThreshold + SWAP_RECORD_POLL_SIZE)); + // Swap until we are no longer at or above the queue swap threshold plus poll size + while(objectsInBuckets.get() >= (queueSwapThreshold + SWAP_RECORD_POLL_SIZE)) { + LOGGER.info("Beginning swap out of {} flowfiles.", SWAP_RECORD_POLL_SIZE); + List filesToSwap = getFilesToSwap(); + try { + swapOutFiles(filesToSwap); + LOGGER.info("Completed swapping out {} flowfiles.", filesToSwap.size()); + } catch(IOException e) { + LOGGER.error("Error while attempting to swap flowfiles. Returning flowfiles to buckets.", e); + // Because these were not retrieved via the normal poll method the object and byte counts were + // not updated. Therefore return them without updating said counts. + putAll(filesToSwap, false); + // We don't want to be stuck in an infinite loop. If there's an exception then bail. + return; + } + } + } + + private List getFilesToSwap() { + List filesToSwap = new ArrayList<>(); + + List ruleList = rulesManager.getRuleList(); + ListIterator ruleListIterator = ruleList.listIterator(); + // Start at the end of the list as we want to swap files in order of least to greatest priority + while(ruleListIterator.hasNext()) { + ruleListIterator.next(); + } + + Rule rule; + while(ruleListIterator.hasPrevious() && filesToSwap.size() < SWAP_RECORD_POLL_SIZE) { + rule = ruleListIterator.previous(); + Queue bucket = ruleBuckets.get().get(rule); + if(bucket != null) { + FlowFileRecord flowFileToSwap; + + // The order of boolean clauses matters here. We don't want to poll a file to swap but then forget about it due + // to having already hit our SWAP_RECORD_POLL_SIZE + while(filesToSwap.size() < SWAP_RECORD_POLL_SIZE && (flowFileToSwap = bucket.poll()) != null) { + filesToSwap.add(flowFileToSwap); + } + } + } + + return filesToSwap; + } + + private void swapOutFiles(List filesToSwap) throws IOException { + long bytesToSwap = filesToSwap.stream().mapToLong(FlowFile::getSize).sum(); + String swapLocation = flowFileSwapManager.swapOut(filesToSwap, this, null); + swapLocations.add(swapLocation); + objectsInBuckets.addAndGet(-filesToSwap.size()); + numSwapFlowFiles.addAndGet(filesToSwap.size()); + bytesInBuckets.addAndGet(-bytesToSwap); + numSwapFileBytes.addAndGet(bytesToSwap); + // As these files are swapped back in later they will once again be considered unacknowledged. + acknowledge(filesToSwap); + } + + @Override + public FlowFileRecord poll(Set expiredRecords) { + Set penalizedFlowFiles = new HashSet<>(); + + try { + return poll(expiredRecords, penalizedFlowFiles, true); + } finally { + LOGGER.debug("Encountered {} penalized flowfiles during poll operation.", penalizedFlowFiles.size()); + putAll(penalizedFlowFiles); + } + } + + private boolean isExpired(FlowFileRecord flowFileRecord) { + return getFlowFileExpiration(TimeUnit.MILLISECONDS) != 0 && (System.currentTimeMillis() - flowFileRecord.getEntryDate()) > getFlowFileExpiration(TimeUnit.MILLISECONDS); + } + + private boolean isPenalized(FlowFileRecord flowFileRecord) { + long penaltyTime = flowFileRecord.getPenaltyExpirationMillis(); + return penaltyTime > 0 && System.currentTimeMillis() <= penaltyTime; + } + + @Override + public List poll(int maxResults, Set expiredRecords) { + if(maxResults == 0) { + return Collections.emptyList(); + } else { + Set penalizedFlowFiles = new HashSet<>(); + boolean stagger = true; + List results = new ArrayList<>(); + FlowFileRecord lastPolled; + while((lastPolled = poll(expiredRecords, penalizedFlowFiles, stagger)) != null && results.size() <= maxResults) { + results.add(lastPolled); + // We beat the odds at least once so we've earned a thread + stagger = false; + } + + LOGGER.debug("Encountered {} penalized flowfiles during poll operation.", penalizedFlowFiles.size()); + putAll(penalizedFlowFiles); + + return results; + } + } + + /** Returns a list of all unexpired, un-penalized FlowFileRecords that this collection contains that pass the provided filter. + * + * @param filter The filter to apply + * @param expiredRecords Up to 10,000 expired records that were encountered, at least in past, as part of this poll operation + * @return A list of unexpired un-penalized FlowFileRecords that pass the filter + */ + @Override + public List poll(FlowFileFilter filter, Set expiredRecords) { + List acceptedFiles = new ArrayList<>(); + List rejectedFiles = new ArrayList<>(); + Set penalizedFlowFiles = new HashSet<>(); + + boolean stagger = true; + FlowFileRecord flowFileRecord; + while((flowFileRecord = poll(expiredRecords, penalizedFlowFiles, stagger)) != null) { + if(filter.filter(flowFileRecord).isAccept()) { + acceptedFiles.add(flowFileRecord); + } else { + rejectedFiles.add(flowFileRecord); + } + + // We beat the odds at least once so we've earned a thread + stagger = false; + } + + // Return the files that were not accepted by the filter + putAll(rejectedFiles); + + LOGGER.debug("Encountered {} penalized flowfiles during poll operation.", penalizedFlowFiles.size()); + putAll(penalizedFlowFiles); + + return acceptedFiles; + } + + @Override + public FlowFileRecord getFlowFile(String flowFileUuid) throws IOException { + if(flowFileUuid == null) { + return null; + } else { + return getListableFlowFiles() + .stream() + .filter(flowFileRecord -> flowFileUuid.equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key()))) + .findAny() + .orElse(null); + } + } + + @Override + public QueueDiagnostics getQueueDiagnostics() { + FlowFileQueueSize flowFileQueueSize = new FlowFileQueueSize(objectsInBuckets.get(), bytesInBuckets.get(), 0, 0, 0, numUnacknowledgedFlowFiles.get(), numUnacknowledgedBytes.get()); + LocalQueuePartitionDiagnostics localQueuePartitionDiagnostics = new StandardLocalQueuePartitionDiagnostics(flowFileQueueSize, false, false); + return new StandardQueueDiagnostics(localQueuePartitionDiagnostics, Collections.emptyList()); + } + + @Override + public void lock() { + } + + @Override + public void unlock() { + } + + @Override + public void offloadQueue() { + } + + @Override + public void resetOffloadedQueue() { + } + + @Override + public void startLoadBalancing() { + } + + @Override + public void stopLoadBalancing() { + } + + @Override + public boolean isActivelyLoadBalancing() { + return false; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/priority/Rule.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/priority/Rule.java new file mode 100644 index 000000000000..40b683387216 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/priority/Rule.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.priority; + +import org.apache.nifi.attribute.expression.language.PreparedQuery; +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; + +import java.util.Objects; +import java.util.UUID; + +public class Rule implements Comparable { + public final static int DEFAULT_RATE_OF_THREAD_USAGE = 20; + + // UNEVALUATED is meant to be used when a FlowFileRecord has yet to be evaluated against any rules. Typically when it + // first enters the system. Do not stagger these files. + public static final Rule UNEVALUATED = new Rule("unevaluated", 100) { + @Override + public Rule setExpired(boolean expired) { + return this; + } + }; + + // DEFAULT is meant to be used when a FlowFileRecord has been evaluated against all rules but did not match any. + public static final Rule DEFAULT = new Rule("default", DEFAULT_RATE_OF_THREAD_USAGE) { + @Override + public Rule setExpired(boolean expired) { + return this; + } + }; + private final String uuid; + private final String expression; + private final PreparedQuery query; + private volatile String label; + // How frequently to allow a thread to process data associated with this rule. + // A rateOfThreadUsage of 100 or greater is the equivalent to always process. + private volatile int rateOfThreadUsage; + private volatile boolean expired = false; + + public Rule(String label, int rateOfThreadUsage) { + expression = null; + query = null; + this.label = label; + this.rateOfThreadUsage = rateOfThreadUsage; + this.uuid = UUID.randomUUID().toString(); + } + + public Rule(String expression, String label, int rateOfThreadUsage, String uuid, boolean expired) { + this.expression = expression; + this.label = label; + this.rateOfThreadUsage = rateOfThreadUsage; + this.uuid = uuid; + this.query = Query.prepare(expression); + this.expired = expired; + } + + public String getUuid() { + return uuid; + } + + public String getExpression() { + return expression; + } + + public PreparedQuery getQuery() { + return query; + } + + public String getLabel() { + return label; + } + + public Rule setLabel(String label) { + this.label = label; + return this; + } + + public int getRateOfThreadUsage() { + return rateOfThreadUsage; + } + + public boolean isExpired() { + return expired; + } + + public Rule setExpired(boolean expired) { + this.expired = expired; + return this; + } + + public Rule setRateOfThreadUsage(int rateOfThreadUsage) { + if(rateOfThreadUsage > 0) { + this.rateOfThreadUsage = rateOfThreadUsage; + } + + return this; + } + + @Override + public String toString() { + return "Rule{" + + "uuid='" + uuid + '\'' + + ", expression='" + expression + '\'' + + ", query=" + query + + ", label='" + label + '\'' + + ", rateOfThreadUsage=" + rateOfThreadUsage + + ", expired=" + expired + + '}'; + } + + public boolean equivalent(Object o) { + if(this == o) return true; + if(o == null || getClass() != o.getClass()) return false; + Rule rule = (Rule)o; + return rateOfThreadUsage == rule.getRateOfThreadUsage() && expression.equals(rule.getExpression()); + } + + @Override + public boolean equals(Object o) { + if(this == o) return true; + if(o == null || getClass() != o.getClass()) return false; + Rule rule = (Rule)o; + return Objects.equals(uuid, rule.getUuid()); + } + + @Override + public int hashCode() { + return Objects.hash(uuid); + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public int compareTo(Rule rule) { + if(rule == null) { + return 1; + } + + return Integer.compare(rule.getRateOfThreadUsage(), rateOfThreadUsage); + } + + public static class Builder { + String expression = ""; + String label; + int rateOfThreadUsage = Rule.DEFAULT_RATE_OF_THREAD_USAGE; + String uuid; + boolean expired = false; + + public Builder() { } + + public Builder(String expression) { + this.expression = expression; + } + + public Builder(Rule originalRule) { + this.expression = originalRule.getExpression(); + this.label = originalRule.getLabel(); + this.rateOfThreadUsage = originalRule.getRateOfThreadUsage(); + } + + public Builder expression(String expression) { + this.expression = expression; + return this; + } + + public Builder label(String label) { + this.label = label; + return this; + } + + public Builder uuid(String uuid) { + this.uuid = uuid; + return this; + } + + public Builder rateOfThreadUsage(int rateOfThreadUsage) { + this.rateOfThreadUsage = rateOfThreadUsage; + return this; + } + + public Builder expired(boolean expired) { + this.expired = expired; + return this; + } + + public Rule build() throws IllegalArgumentException { + try { + Query.validateExpression(expression, false); + if(rateOfThreadUsage <= 0) { + throw new IllegalArgumentException("Priority value must be greater than 0"); + } + } catch (AttributeExpressionLanguageParsingException e) { + throw new IllegalArgumentException(e); + } + label = label == null ? "" : label; + if(uuid == null) { + uuid = UUID.randomUUID().toString(); + } + return new Rule(expression, label, rateOfThreadUsage, uuid, expired); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/priority/RulesManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/priority/RulesManager.java new file mode 100644 index 000000000000..6bc7c67ec4f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/priority/RulesManager.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.priority; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.commons.io.FileUtils; +import org.apache.nifi.attribute.expression.language.EvaluationContext; +import org.apache.nifi.attribute.expression.language.StandardEvaluationContext; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.processor.exception.ProcessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +// This class will be used to manage the global ruleset for use with the GlobalPriorityFlowFileQueue. +public class RulesManager { + private static final Logger LOGGER = LoggerFactory.getLogger(RulesManager.class); + + private volatile List ruleList = Collections.synchronizedList(new ArrayList<>()); + private final File configFile; + private static final Gson GSON = new Gson(); + private static final JsonParser JSON_PARSER = new JsonParser(); + private static final String UUID = "uuid"; + private static final String EXPRESSION = "expression"; + private static final String RATE_OF_THREAD_USAGE = "rateOfThreadUsage"; + private static final String LABEL = "label"; + private static final String EXPIRED = "expired"; + private final ReentrantReadWriteLock.WriteLock writeLock = new ReentrantReadWriteLock(true).writeLock(); + private final StateManagerProvider stateManagerProvider; + private final String stateId = "RulesManagerID"; + private final Scope stateScope = Scope.CLUSTER; + private ScheduledExecutorService executorService; + private static final long STATE_CHECK_FREQUENCY = TimeUnit.SECONDS.toMillis(1); + private volatile boolean useState; + private volatile Map lastStateMapPolled; + + public RulesManager(File configFile, StateManagerProvider stateManagerProvider) { + addRules(Arrays.asList(Rule.DEFAULT, Rule.UNEVALUATED)); + this.configFile = configFile; + this.stateManagerProvider = stateManagerProvider; + useState = stateManagerProvider != null; + } + + public void readRules() { + if(useState) { + if(executorService == null) { + executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.scheduleAtFixedRate(this::readRulesFromStateManagerProvider, 0, STATE_CHECK_FREQUENCY, TimeUnit.MILLISECONDS); + } else { + // Assume we're already monitoring rule changes so by calling this method we want to force a re-read of what is there + readRulesFromStateManagerProvider(); + } + } else if(configFile != null && configFile.exists() && configFile.canRead()) { + readRulesFromConfigFile(); + } + } + + private void readRulesFromStateManagerProvider() { + LOGGER.trace("readRulesFromStateManagerProvider"); + + try { + Map ruleSet = stateManagerProvider.getStateManager(stateId).getState(stateScope).toMap(); + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("Received the following rules from state: {}", GSON.toJson(ruleList)); + } + + // No need to update our ruleset if the state hasn't changed + if(!ruleSet.equals(lastStateMapPolled)) { + ruleSet.values().stream().map(this::convertJsonStringToRule).filter(Objects::nonNull).forEach(this::addOrModifyRule); + LOGGER.debug("Loaded {} rules to add or update.", ruleSet.size()); + lastStateMapPolled = ruleSet; + } + } catch (Exception e) { + LOGGER.error("Exception occurred when trying to update rules from the stateManagerProvider.", e); + } + } + + private void readRulesFromConfigFile() { + try { + String configString = FileUtils.readFileToString(configFile, "UTF-8"); + JsonArray jsonArray = GSON.fromJson(configString, JsonArray.class).getAsJsonArray(); + List ruleList = new ArrayList<>(jsonArray.size()); + jsonArray.forEach(jsonElement -> { + if(jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + String uuid = jsonObject.has(UUID) ? jsonObject.get(UUID).getAsString() : null; + String expression = jsonObject.has(EXPRESSION) ? jsonObject.get(EXPRESSION).getAsString() : null; + String label = jsonObject.has(LABEL) ? jsonObject.get(LABEL).getAsString() : ""; + int rateOfThreadUsage = jsonObject.has(RATE_OF_THREAD_USAGE) ? jsonObject.get(RATE_OF_THREAD_USAGE).getAsInt() : Rule.DEFAULT_RATE_OF_THREAD_USAGE; + try { + ruleList.add(Rule.builder() + .uuid(uuid) + .expression(expression) + .label(label) + .rateOfThreadUsage(rateOfThreadUsage) + .build()); + } catch (IllegalArgumentException e) { + LOGGER.warn("Unable to create rule using the following due to an exception:\n" + + "uuid: {}\n" + + "label: {}\n" + + "expression: {}\n" + + "rateOfThreadUsage: {}", uuid, label, expression, rateOfThreadUsage, e); + } + } + }); + + LOGGER.debug("Loaded {} rules into memory.", ruleList.size()); + + addRules(ruleList); + } catch (Exception e) { + LOGGER.error("Exception while trying to load ruleset. Any attempt to updates rules in the system may overwrite " + + "the existing rules on disk at {}. If this is not desired, stop the system and address the error.", + configFile.getAbsolutePath(), e); + } + } + + public void purgeExpiredRulesFromState() { + if(useState) { + try { + Map rulesFromStateManagement = new HashMap<>(stateManagerProvider.getStateManager(stateId).getState(stateScope).toMap()); + List expiredRuleList = rulesFromStateManagement.values().stream() + .map(this::convertJsonStringToRule) + .filter(Objects::nonNull) + .filter(Rule::isExpired) + .map(Rule::getUuid) + .collect(Collectors.toList()); + expiredRuleList.forEach(rulesFromStateManagement::remove); + stateManagerProvider.getStateManager(stateId).setState(rulesFromStateManagement, stateScope); + } catch(Exception e) { + LOGGER.warn("Exception occurred when attempting to purge expired rules from state management.", e); + } + } + } + + Rule convertJsonStringToRule(String ruleString) { + try { + JsonObject jsonObject = JSON_PARSER.parse(ruleString).getAsJsonObject(); + boolean expired = jsonObject.has(EXPIRED) && jsonObject.get(EXPIRED).getAsBoolean(); + String uuid = jsonObject.has(UUID) ? jsonObject.get(UUID).getAsString() : null; + String expression = jsonObject.has((EXPRESSION)) ? jsonObject.get(EXPRESSION).getAsString() : null; + String label = jsonObject.has(LABEL) ? jsonObject.get(LABEL).getAsString() : ""; + int rateOfThreadUsage = jsonObject.has(RATE_OF_THREAD_USAGE) ? jsonObject.get(RATE_OF_THREAD_USAGE).getAsInt() : Rule.DEFAULT_RATE_OF_THREAD_USAGE; + + Rule newRule = Rule.builder() + .uuid(uuid) + .expression(expression) + .expired(expired) + .label(label) + .rateOfThreadUsage(rateOfThreadUsage) + .build(); + + LOGGER.debug("convertJsonStringToRule(): Parsed Rule {} from String {}", newRule, ruleString); + + return newRule; + } catch(Exception e) { + LOGGER.warn("Cannot convert \"{}\" to a Rule due to exception.", ruleString, e); + return null; + } + } + + String convertRuleToJsonString(Rule rule) { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty(UUID, rule.getUuid()); + jsonObject.addProperty(EXPRESSION, rule.getExpression()); + jsonObject.addProperty(LABEL, rule.getLabel()); + jsonObject.addProperty(RATE_OF_THREAD_USAGE, rule.getRateOfThreadUsage()); + jsonObject.addProperty(EXPIRED, rule.isExpired()); + return GSON.toJson(jsonObject); + } + + public void writeRules() { + if(useState) { + writeRulesToState(); + } else { + writeRulesToConfigFile(); + } + } + + private void writeRulesToState() { + try { + writeLock.lock(); + Map ruleMap = ruleList.stream().filter(rule -> rule != Rule.UNEVALUATED && rule != Rule.DEFAULT) + .collect(Collectors.toMap(Rule::getUuid, this::convertRuleToJsonString)); + stateManagerProvider.getStateManager(stateId).setState(ruleMap, stateScope); + } catch (IOException e) { + LOGGER.error("Exception when writing rules to state manager: ", e); + } finally { + writeLock.unlock(); + } + } + + private void writeRulesToConfigFile() { + if(configFile != null) { + writeLock.lock(); + try { + JsonArray jsonArray = new JsonArray(ruleList.size()); + ruleList.stream().filter(rule -> (rule != Rule.DEFAULT && rule != Rule.UNEVALUATED && !rule.isExpired())) + .forEach(rule -> { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty(UUID, rule.getUuid()); + jsonObject.addProperty(EXPRESSION, rule.getExpression()); + jsonObject.addProperty(RATE_OF_THREAD_USAGE, rule.getRateOfThreadUsage()); + jsonObject.addProperty(LABEL, rule.getLabel()); + jsonArray.add(jsonObject); + }); + + FileUtils.write(configFile, jsonArray.toString(), "UTF-8"); + } catch (IOException e) { + LOGGER.error("Exception when writing rules to config file:", e); + } finally { + writeLock.unlock(); + } + } + } + + public List getRuleList() { + return Collections.unmodifiableList(ruleList); + } + + public List getActiveRuleList() { + return ruleList.stream().filter(rule -> !rule.isExpired()).collect(Collectors.toList()); + } + + public void addOrModifyRule(Rule targetRule) { + if(ruleList.stream().noneMatch(targetRule::equals)) { + addRule(targetRule); + } else { + modifyRule(targetRule); + } + } + + public void addRule(Rule newRule) { + writeLock.lock(); + try { + ruleList.add(newRule); + ruleList.sort(Comparator.naturalOrder()); + } finally { + writeLock.unlock(); + } + } + + public void addRules(List newRules) { + writeLock.lock(); + try { + ruleList.addAll(newRules); + ruleList.sort(Comparator.naturalOrder()); + } finally { + writeLock.unlock(); + } + } + + public void modifyRule(Rule modifiedRule) { + writeLock.lock(); + try { + Optional targetRuleOptional = ruleList.stream().filter(modifiedRule::equals).findFirst(); + if(!targetRuleOptional.isPresent()) { + // Nothing to modify, no-op + return; + } + + Rule targetRule = targetRuleOptional.get(); + + // We are not changing the expression so we may modify the existing rule versus replacing it + if(targetRule.getExpression().equals(modifiedRule.getExpression())) { + ruleList.stream().filter(modifiedRule::equals).findAny().ifPresent(rule -> + rule.setLabel(modifiedRule.getLabel()).setRateOfThreadUsage(modifiedRule.getRateOfThreadUsage()).setExpired(modifiedRule.isExpired())); + } else { + // Because we are modifying the expression what we really need is a new rule as any flowfiles already associated with the existing + // rule now need to be re-evaluated + Rule newRule = Rule.builder() + .expression(modifiedRule.getExpression()) + .rateOfThreadUsage(modifiedRule.getRateOfThreadUsage()) + .label(modifiedRule.getLabel()) + .expired(modifiedRule.isExpired()) + .build(); + targetRule.setExpired(true); + ruleList.add(newRule); + } + } finally { + writeLock.unlock(); + } + } + + public Rule getRule(FlowFileRecord flowFileRecord) { + for(Rule rule: ruleList) { + if(rule != Rule.DEFAULT && rule != Rule.UNEVALUATED && !rule.isExpired()) { + try { + final EvaluationContext evaluationContext = new StandardEvaluationContext(flowFileRecord.getAttributes()); + + final String evaluated = rule.getQuery().evaluateExpressions(evaluationContext, null); + if(Boolean.parseBoolean(evaluated)) { + return rule; + } + } catch(ProcessException e) { + LOGGER.warn("Exception while evaluating expression for rule {}", rule, e); + } + } + } + + return Rule.DEFAULT; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/GlobalPriorityFlowFileQueueTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/GlobalPriorityFlowFileQueueTest.java new file mode 100644 index 000000000000..0aac1ef4e04d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/GlobalPriorityFlowFileQueueTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import org.apache.nifi.controller.MockFlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.priority.Rule; +import org.apache.nifi.priority.RulesManager; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.controller.queue.GlobalPriorityFlowFileQueue.FLOW_FILE_RULE_CACHE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class GlobalPriorityFlowFileQueueTest { + private GlobalPriorityFlowFileQueue globalPriorityFlowFileQueue; + + @Before + public void before() { + globalPriorityFlowFileQueue = new GlobalPriorityFlowFileQueue("id", null, null, null, null, new RulesManager(null, null), 5, 1000L, "0 B", null, 100, null); + } + + @Test + // In this test we'll load the connection up with expired flowfiles and verify that as soon as an expired flowfile is encountered + // that poll continues to burn off expired records even if they are of a lower priority until it hits the limit. + public void testExpiredLimit() { + // Expire everything quickly + globalPriorityFlowFileQueue.setFlowFileExpiration("1ms"); + + Map highPriority = new HashMap<>(1); + highPriority.put("priority", "0"); + FlowFileRecord highPriorityRecord = new MockFlowFileRecord(highPriority, 0L); + Map mediumPriority = new HashMap<>(1); + mediumPriority.put("priority", "1"); + + globalPriorityFlowFileQueue.put(highPriorityRecord); + List flowFileList = new ArrayList<>(GlobalPriorityFlowFileQueue.MAX_EXPIRED_RECORDS); + for(int i = 0; i < GlobalPriorityFlowFileQueue.MAX_EXPIRED_RECORDS; i++) { + FlowFileRecord flowFileRecord = new MockFlowFileRecord(mediumPriority, 0L); + flowFileList.add(flowFileRecord); + } + globalPriorityFlowFileQueue.putAll(flowFileList); + + // Sanity check, ensure we have GlobalPriorityFlowFileQueue.MAX_EXPIRED_RECORDS + 1 files in the connection + assertEquals(GlobalPriorityFlowFileQueue.MAX_EXPIRED_RECORDS + 1, globalPriorityFlowFileQueue.size().getObjectCount()); + Set expiredFlowFileSet = new HashSet<>(GlobalPriorityFlowFileQueue.MAX_EXPIRED_RECORDS); + + // We do not have any unexpired flowfiles in the connection + assertNull(globalPriorityFlowFileQueue.poll(expiredFlowFileSet)); + + // There were GlobalPriorityFlowFileQueue.MAX_EXPIRED_RECORDS expired flowfiles when we polled (the limit) + assertEquals(GlobalPriorityFlowFileQueue.MAX_EXPIRED_RECORDS, expiredFlowFileSet.size()); + expiredFlowFileSet.clear(); + + // 1 should be left after we burned off the max expired + assertEquals(1, globalPriorityFlowFileQueue.size().getObjectCount()); + } + + @Test + // Verify that when there's an expired record ahead of an unexpired record, the unexpired one is returned and the expired + // one is added to the appropriate set + public void testExpired() throws InterruptedException { + Set expiredFlowFileSet = new HashSet<>(1); + globalPriorityFlowFileQueue.setFlowFileExpiration("500ms"); + + Map highPriority = new HashMap<>(1); + highPriority.put("priority", "0"); + FlowFileRecord expiredRecord = new MockFlowFileRecord(highPriority, 0L); + + globalPriorityFlowFileQueue.put(expiredRecord); + + Thread.sleep(globalPriorityFlowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS) + 1); + + FlowFileRecord unexpiredRecord = new MockFlowFileRecord(highPriority, 0L); + globalPriorityFlowFileQueue.put(unexpiredRecord); + + assertEquals(unexpiredRecord, globalPriorityFlowFileQueue.poll(expiredFlowFileSet)); + assertEquals(1, expiredFlowFileSet.size()); + assertEquals(0, globalPriorityFlowFileQueue.size().getObjectCount()); + } + + @Test + public void testPut() throws ExecutionException { + MockFlowFileRecord flowFileNotAssociatedWithRule = new MockFlowFileRecord(0L); + MockFlowFileRecord flowFileForExpiredRule = new MockFlowFileRecord(1L); + MockFlowFileRecord flowFileForUnexpiredRule1 = new MockFlowFileRecord(2L); + MockFlowFileRecord flowFileForUnexpiredRule2 = new MockFlowFileRecord(3L); + + Rule expiredRule = Rule.builder().expression("${x:equals('expired')}").expired(true).label("expired").build(); + FLOW_FILE_RULE_CACHE.get().put(flowFileForExpiredRule, expiredRule); + Rule unexpiredRule = Rule.builder().expression("${x:equals('unexpired')}").label("unexpired").build(); + FLOW_FILE_RULE_CACHE.get().put(flowFileForUnexpiredRule1, unexpiredRule); + FLOW_FILE_RULE_CACHE.get().put(flowFileForUnexpiredRule2, unexpiredRule); + + // Case 1: Rule has yet to be evaluated and alwaysReevaluate is false so should map to UNEVALUATED + globalPriorityFlowFileQueue.put(flowFileNotAssociatedWithRule); + assert(FLOW_FILE_RULE_CACHE.get().asMap().containsKey(flowFileNotAssociatedWithRule)); + assertEquals(Rule.UNEVALUATED, FLOW_FILE_RULE_CACHE.get().get(flowFileNotAssociatedWithRule)); + assertEquals(1, globalPriorityFlowFileQueue.size().getObjectCount()); + + // Case 2: Rule exists and is not expired. Rule should not change in the map + globalPriorityFlowFileQueue.put(flowFileForUnexpiredRule1); + assertEquals(unexpiredRule, FLOW_FILE_RULE_CACHE.get().get(flowFileForUnexpiredRule1)); + assertEquals(2, globalPriorityFlowFileQueue.size().getObjectCount()); + + // Case 3: Rule exists and is expired. Map should be updated with new appropriate rule (DEFAULT in this case) + globalPriorityFlowFileQueue.put(flowFileForExpiredRule); + assertEquals(Rule.DEFAULT, FLOW_FILE_RULE_CACHE.get().get(flowFileForExpiredRule)); + assertEquals(3, globalPriorityFlowFileQueue.size().getObjectCount()); + + // Case 4: Rule exists and is not expired but alwaysReevaluate is true + globalPriorityFlowFileQueue.alwaysReevaluate = true; + globalPriorityFlowFileQueue.put(flowFileForUnexpiredRule2); + // Even though this file was already in the global static map, the rule manager is unaware of it so when its reevaluated + // you should get the DEFAULT rule in return since there's no priority attribute on this file. + assertEquals(Rule.DEFAULT, FLOW_FILE_RULE_CACHE.get().get(flowFileForUnexpiredRule2)); + assertEquals(4, globalPriorityFlowFileQueue.size().getObjectCount()); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/priority/RulesManagerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/priority/RulesManagerTest.java new file mode 100644 index 000000000000..089a8c7575a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/priority/RulesManagerTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.priority; + +import org.apache.nifi.controller.MockFlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class RulesManagerTest { + private RulesManager rulesManager; + private Rule high, medium, low; + + @Before + public void before() { + rulesManager = new RulesManager(null, null); + high = Rule.builder().expression("${priority:equals('0')}").label("high").rateOfThreadUsage(101).build(); + medium = Rule.builder().expression("${priority:equals('1')}").label("medium").rateOfThreadUsage(40).build(); + low = Rule.builder().expression("${priority:equals('2')}").label("low").rateOfThreadUsage(25).build(); + } + + @Test + public void testGetRule() { + rulesManager.addRule(high); + Map attributes = new HashMap(1) {{put("priority", "0");}}; + FlowFileRecord flowFileRecord = new MockFlowFileRecord(attributes, 0L); + Rule rule = rulesManager.getRule(flowFileRecord); + assertNotNull(rule); + assertEquals("high", rule.getLabel()); + } + + @Test + public void testAddRules() { + rulesManager.addRules(Arrays.asList(high, medium)); + rulesManager.addRule(low); + + // Rule.DEFAULT and Rule.UNEVALUATED will also be in the list + assertEquals(5, rulesManager.getRuleList().size()); + assertTrue(rulesManager.getRuleList().stream().noneMatch(Rule::isExpired)); + } + + @Test + public void testModifyRule() { + String shmedium = "shmedium"; + rulesManager.addRules(Arrays.asList(high, medium, low)); + + // Case 1: UUID of modifiedRule does not match anything in the list. Therefore nothing was modified or added to the list + Rule modifiedRule = Rule.builder().expression(medium.getExpression()).label(shmedium).rateOfThreadUsage(50).build(); + rulesManager.modifyRule(modifiedRule); + assertEquals(5, rulesManager.getRuleList().size()); + assertNotEquals(shmedium, medium.getLabel()); + assertNotEquals(50, medium.getRateOfThreadUsage()); + + // Case 2: UUID matched and so did expression. The existing rule is modified + modifiedRule = Rule.builder().uuid(medium.getUuid()).expression(medium.getExpression()).label(shmedium).rateOfThreadUsage(50).build(); + rulesManager.modifyRule(modifiedRule); + assertEquals(5, rulesManager.getRuleList().size()); + assertEquals(shmedium, medium.getLabel()); + assertEquals(50, medium.getRateOfThreadUsage()); + + // Case 3: UUID matched but the expression did not. The existing rule will be marked as expired and a new one is added + Rule finalModifiedRule = Rule.builder().uuid(medium.getUuid()).expression("${x:equals('10')}").label("new label").rateOfThreadUsage(55).build(); + rulesManager.modifyRule(finalModifiedRule); + assertEquals(6, rulesManager.getRuleList().size()); + assertTrue(medium.isExpired()); + + // The list will NOT contain modifiedRule (using == is deliberate) because a new rule had to be created. One should exist, + // however, with the correct label, expression and rate` + assertTrue(rulesManager.getRuleList().stream().noneMatch(rule -> rule == finalModifiedRule)); + assertNotEquals(medium.getExpression(), finalModifiedRule.getExpression()); + assertNotEquals(medium.getLabel(), finalModifiedRule.getLabel()); + assertNotEquals(medium.getRateOfThreadUsage(), finalModifiedRule.getRateOfThreadUsage()); + + assertTrue(rulesManager.getRuleList().stream().anyMatch(rule -> finalModifiedRule.getExpression().equals(rule.getExpression()) + && finalModifiedRule.getLabel().equals(rule.getLabel()) + && finalModifiedRule.getRateOfThreadUsage() == rule.getRateOfThreadUsage())); + } + + @Test + public void testReadRules() { + RulesManager rulesManager = new RulesManager(new File("src/test/resources/priorityRules.json"), null); + rulesManager.readRules(); + assertEquals(5, rulesManager.getRuleList().size()); + } + + @Test + public void testWriteRules() throws IOException { + TemporaryFolder temporaryFolder = new TemporaryFolder(); + + try { + temporaryFolder.create(); + File tempFile = temporaryFolder.newFile(); + + List newRules = new ArrayList<>(1); + newRules.add(medium); + RulesManager rulesManager1 = new RulesManager(tempFile, null); + rulesManager1.addRules(newRules); + rulesManager1.writeRules(); + RulesManager rulesManager2 = new RulesManager(tempFile, null); + rulesManager2.readRules(); + + // Validate that we successfully wrote and then read the equivalent rule to/from disk + assertTrue(rulesManager2.getRuleList().stream().anyMatch(newRules.get(0)::equivalent)); + } finally { + temporaryFolder.delete(); + } + } + + @Test + public void testConvertMethods() { + String test = rulesManager.convertRuleToJsonString(high); + Rule newHigh = rulesManager.convertJsonStringToRule(test); + assertEquals(high, newHigh); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/priorityRules.json b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/priorityRules.json new file mode 100644 index 000000000000..56da18cc1c18 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/priorityRules.json @@ -0,0 +1,15 @@ +[ + { + "expression":"${priority:equals('0')}", + "rateOfThreadUsage": 100, + "label": "high" + }, { + "expression":"${priority:equals('0')}", + "rateOfThreadUsage": 40, + "label": "medium" + }, { + "expression":"${priority:equals('0')}", + "rateOfThreadUsage": 25, + "label": "low" + } +] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 81e5343193d3..b63697946cb9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -216,6 +216,11 @@ org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares rSquared .90 + + + false + 5 + ./conf/priorityRules.json diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 617e7225c069..e80977d92460 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -264,3 +264,9 @@ nifi.analytics.query.interval=${nifi.analytics.query.interval} nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model.implementation} nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name} nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold} + +# global priority queue properties # +nifi.controller.flowfilequeue.buckets=${nifi.controller.flowfilequeue.buckets} +nifi.controller.flowfilequeue.buckets.cache.expiration.minutes=${nifi.controller.flowfilequeue.buckets.cache.expiration.minutes} +# only used when no state manager is defined, otherwise rules are stored in nifi's state manager. +nifi.priority.rulesmanager.config=${nifi.priority.rulesmanager.config} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/BucketPrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/BucketPrioritizer.java new file mode 100644 index 000000000000..f48c8a46ea15 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/BucketPrioritizer.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.prioritizer; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.FlowFilePrioritizer; + +public class BucketPrioritizer implements FlowFilePrioritizer { + @Override + public int compare(FlowFile flowFile, FlowFile t1) { + return 0; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/resources/META-INF/services/org.apache.nifi.flowfile.FlowFilePrioritizer b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/resources/META-INF/services/org.apache.nifi.flowfile.FlowFilePrioritizer index 1c3cd25d6c56..4c2ad580c71b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/resources/META-INF/services/org.apache.nifi.flowfile.FlowFilePrioritizer +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/resources/META-INF/services/org.apache.nifi.flowfile.FlowFilePrioritizer @@ -15,4 +15,5 @@ org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer org.apache.nifi.prioritizer.OldestFlowFileFirstPrioritizer -org.apache.nifi.prioritizer.PriorityAttributePrioritizer \ No newline at end of file +org.apache.nifi.prioritizer.PriorityAttributePrioritizer +org.apache.nifi.prioritizer.BucketPrioritizer \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/PriorityRulesAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/PriorityRulesAuditor.java new file mode 100644 index 000000000000..8604fb74e783 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/PriorityRulesAuditor.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.audit; + +import org.apache.nifi.action.Action; +import org.apache.nifi.action.Component; +import org.apache.nifi.action.FlowChangeAction; +import org.apache.nifi.action.Operation; +import org.apache.nifi.action.details.ActionDetails; +import org.apache.nifi.action.details.FlowChangeConfigureDetails; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.web.api.entity.PriorityRuleEntity; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; +import java.util.Date; + +@Aspect +public class PriorityRulesAuditor extends NiFiAuditor { + public static final Logger LOGGER = LoggerFactory.getLogger(PriorityRulesAuditor.class); + + @Around("within(org.apache.nifi.web.api.PriorityRuleResource+) && " + + "execution(javax.ws.rs.core.Response addRule(org.apache.nifi.web.api.entity.PriorityRuleEntity))") + public Response addPriorityRuleAdvice(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { + Response response = (Response)proceedingJoinPoint.proceed(); + + // We'll only audit something if the operation was successful + if(response.getStatus() == 200) { + Object argObject = proceedingJoinPoint.getArgs()[0]; + if(argObject instanceof PriorityRuleEntity) { + PriorityRuleEntity priorityRuleEntity = (PriorityRuleEntity)argObject; + + FlowChangeConfigureDetails flowChangeConfigureDetails = new FlowChangeConfigureDetails(); + flowChangeConfigureDetails.setName("Add"); + flowChangeConfigureDetails.setValue(priorityRuleEntity.getPriorityRule().toString()); + + Action action = generateAuditRecord(priorityRuleEntity, Operation.Add, flowChangeConfigureDetails); + + saveAction(action, LOGGER); + } + } + + return response; + } + + @Around("within(org.apache.nifi.web.api.PriorityRuleResource+) && " + + "execution(javax.ws.rs.core.Response editRule(org.apache.nifi.web.api.entity.PriorityRuleEntity))") + public Response editPriorityRuleAdvice(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { + Response response = (Response)proceedingJoinPoint.proceed(); + + // We'll only audit something if the operation was successful + if(response.getStatus() == 200) { + Object argObject = proceedingJoinPoint.getArgs()[0]; + if(argObject instanceof PriorityRuleEntity) { + PriorityRuleEntity priorityRuleEntity = (PriorityRuleEntity)argObject; + Operation operation = priorityRuleEntity.getPriorityRule().isExpired() ? Operation.Remove : Operation.Configure; + + FlowChangeConfigureDetails flowChangeConfigureDetails = new FlowChangeConfigureDetails(); + flowChangeConfigureDetails.setName(operation.toString()); + flowChangeConfigureDetails.setValue(priorityRuleEntity.getPriorityRule().toString()); + + Action action = generateAuditRecord(priorityRuleEntity, operation, flowChangeConfigureDetails); + + saveAction(action, LOGGER); + } + } + + return response; + } + + public Action generateAuditRecord(PriorityRuleEntity priorityRuleEntity, Operation operation, ActionDetails actionDetails) { + FlowChangeAction action = null; + + // get the current user + NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // Ensure the user was found + if(user != null) { + // create the port action for adding this processor + action = new FlowChangeAction(); + action.setUserIdentity(user.getIdentity()); + action.setOperation(operation); + action.setTimestamp(new Date()); + action.setSourceId(priorityRuleEntity.getPriorityRule().getId()); + action.setSourceName(priorityRuleEntity.getPriorityRule().getLabel()); + action.setSourceType(Component.PriorityRule); + + if(actionDetails != null) { + action.setActionDetails(actionDetails); + } + } + + return action; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizableLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizableLookup.java index f3285847d651..7d35add12b64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizableLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizableLookup.java @@ -250,6 +250,13 @@ public interface AuthorizableLookup { */ Authorizable getPolicies(); + /** + * Get the authorizable for all priority rules + * + * @return authorizable + */ + Authorizable getPriorityRules(); + /** * Get the authorizable for the policy of the policy id. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java index 4594ee05fa96..443791126041 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java @@ -72,7 +72,6 @@ class StandardAuthorizableLookup implements AuthorizableLookup { - private static final TenantAuthorizable TENANT_AUTHORIZABLE = new TenantAuthorizable(); private static final Authorizable POLICIES_AUTHORIZABLE = new Authorizable() { @@ -99,6 +98,18 @@ public Resource getResource() { } }; + private static final Authorizable PRIORITY_RULES_AUTHORIZABLE = new Authorizable() { + @Override + public Authorizable getParentAuthorizable() { + return null; + } + + @Override + public Resource getResource() { + return ResourceFactory.getPriorityRulesResource(); + } + }; + private static final Authorizable COUNTERS_AUTHORIZABLE = new Authorizable() { @Override public Authorizable getParentAuthorizable() { @@ -159,7 +170,10 @@ public Resource getResource() { } }; - + @Override + public Authorizable getPriorityRules() { + return PRIORITY_RULES_AUTHORIZABLE; + } // nifi core components private ControllerFacade controllerFacade; @@ -661,6 +675,9 @@ public Resource getResource() { case Tenant: authorizable = getTenant(); break; + case PriorityRules: + authorizable = getPriorityRules(); + break; case ParameterContext: authorizable = getParameterContexts(); break; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiWebApiResourceConfig.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiWebApiResourceConfig.java index 4f1959675821..03bf7fda8f5d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiWebApiResourceConfig.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiWebApiResourceConfig.java @@ -100,6 +100,7 @@ public NiFiWebApiResourceConfig(@Context ServletContext servletContext) { register(ctx.getBean("tenantsResource")); register(ctx.getBean("versionsResource")); register(ctx.getBean("parameterContextResource")); + register(ctx.getBean("priorityRuleResource")); // exception mappers register(AccessDeniedExceptionMapper.class); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 6532f51e232b..63a2e53f27e4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -4105,6 +4105,7 @@ public CurrentUserEntity getCurrentUser() { entity.setSystemPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getSystem())); entity.setParameterContextPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getParameterContexts())); entity.setCanVersionFlows(CollectionUtils.isNotEmpty(flowRegistryClient.getRegistryIdentifiers())); + entity.setPriorityRulesPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getPriorityRules())); entity.setRestrictedComponentsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents())); @@ -5083,6 +5084,9 @@ private AuthorizationResult authorizeAction(final Action action) { case UserGroup: authorizable = authorizableLookup.getTenant(); break; + case PriorityRule: + authorizable = authorizableLookup.getPriorityRules(); + break; default: throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this action.").build()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/PriorityRuleResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/PriorityRuleResource.java new file mode 100644 index 000000000000..fad4cedbc39d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/PriorityRuleResource.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.priority.Rule; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.api.dto.PriorityRuleDTO; +import org.apache.nifi.web.api.entity.PriorityRuleEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.List; + +@Path("/priority-rules") +@Api(value = "/priority-rules", description = "Endpoint for managing priority rules") +public class PriorityRuleResource extends ApplicationResource { + private static Logger LOGGER = LoggerFactory.getLogger(PriorityRuleResource.class); + private static final Gson GSON = new GsonBuilder().create(); + + private final NiFiServiceFacade serviceFacade; + private final Authorizer authorizer; + private final FlowController flowController; + + public PriorityRuleResource(NiFiServiceFacade serviceFacade, Authorizer authorizer, FlowController flowController, NiFiProperties properties, RequestReplicator requestReplicator, + ClusterCoordinator clusterCoordinator) { + this.serviceFacade = serviceFacade; + this.authorizer = authorizer; + this.flowController = flowController; + setProperties(properties); + setRequestReplicator(requestReplicator); + setClusterCoordinator(clusterCoordinator); + setFlowController(flowController); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("getRuleList") + @ApiOperation(value = "Retrieves the current rule list", notes = NON_GUARANTEED_ENDPOINT, response = String.class) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getRuleList() { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable accessPolicy = lookup.getPriorityRules(); + accessPolicy.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + return generateOkResponse(getRuleListJSON()).build(); + } + + private String getRuleListJSON() { + List ruleList = flowController.getRulesManager().getActiveRuleList(); + JsonObject priorityRules = new JsonObject(); + JsonArray jsonArray = new JsonArray(); + priorityRules.add("priorityRules", jsonArray); + + ruleList.stream().filter(rule -> (rule != Rule.DEFAULT && rule != Rule.UNEVALUATED)).forEach(rule -> { + JsonObject jsonObjectRule = new JsonObject(); + jsonArray.add(jsonObjectRule); + jsonObjectRule.addProperty("id", rule.getUuid()); + jsonObjectRule.addProperty("label", rule.getLabel()); + jsonObjectRule.addProperty("expression", rule.getExpression()); + jsonObjectRule.addProperty("rateOfThreadUsage", rule.getRateOfThreadUsage()); + }); + return GSON.toJson(priorityRules); + } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("addRule") + @ApiOperation(value = "Add a new rule to the existing list", notes = NON_GUARANTEED_ENDPOINT) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response addRule(@ApiParam(value = "the rule to add", required = true)PriorityRuleEntity priorityRuleEntity) { + LOGGER.debug("addRule({})", priorityRuleEntity); + + // Authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable accessPolicy = lookup.getPriorityRules(); + accessPolicy.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }); + + PriorityRuleDTO priorityRuleDTO = priorityRuleEntity.getPriorityRule(); + Rule rule = Rule.builder() + .label(priorityRuleDTO.getLabel()) + .expression(priorityRuleDTO.getExpression()) + .rateOfThreadUsage(priorityRuleDTO.getRateOfThreadUsage()) + .build(); + + try { + flowController.getRulesManager().addOrModifyRule(rule); + flowController.getRulesManager().writeRules(); + // Set the id on the argument for auditing purposes (See PriorityRulesAuditor) + priorityRuleDTO.setId(rule.getUuid()); + + return generateOkResponse(priorityRuleEntity).status(200).build(); + } catch(Exception e) { + LOGGER.error("Exception when trying to add rule: {}", priorityRuleEntity, e); + return generateOkResponse("FAILURE, " + e.getMessage()).build(); + } + } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("editRule") + @ApiOperation(value = "Add a new rule to the existing list", notes = NON_GUARANTEED_ENDPOINT) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response editRule(@ApiParam(value = "the rule to edit", required = true)PriorityRuleEntity priorityRuleEntity) { + LOGGER.debug("editRule({})", priorityRuleEntity); + + // Authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable accessPolicy = lookup.getPriorityRules(); + accessPolicy.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }); + + PriorityRuleDTO priorityRuleDTO = priorityRuleEntity.getPriorityRule(); + Rule rule = Rule.builder() + .uuid(priorityRuleDTO.getId()) + .label(priorityRuleDTO.getLabel()) + .expression(priorityRuleDTO.getExpression()) + .rateOfThreadUsage(priorityRuleDTO.getRateOfThreadUsage()) + .expired(priorityRuleDTO.isExpired()) + .build(); + + try { + flowController.getRulesManager().modifyRule(rule); + flowController.getRulesManager().writeRules(); + + return generateOkResponse(priorityRuleEntity).build(); + } catch(Exception e) { + LOGGER.error("Exception when trying to edit rule: {}", priorityRuleEntity, e); + return generateOkResponse("FAILURE, " + e.getMessage()).build(); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 56b8d4d1c1ea..02675704e5f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -458,6 +458,14 @@ + + + + + + + + @@ -485,6 +493,11 @@ + + + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml index 25711e66f61a..59387b140d35 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml @@ -33,6 +33,7 @@ counters.properties cluster.properties templates.properties + priority-rules.properties users.properties bulletin-board.properties login.properties @@ -63,6 +64,7 @@ src/main/resources/filters/${counters.filter} src/main/resources/filters/${cluster.filter} src/main/resources/filters/${templates.filter} + src/main/resources/filters/${priority.rules.filter} src/main/resources/filters/${users.filter} src/main/resources/filters/${bulletin.board.filter} src/main/resources/filters/${login.filter} @@ -94,6 +96,7 @@ **/message-page.jsp, **/canvas.jsp, **/summary.jsp, + **/priority-rules.jsp, **/history.jsp, **/provenance.jsp, **/counters.jsp, @@ -337,6 +340,14 @@ true + + src/main/webapp/WEB-INF/pages + WEB-INF/pages + + priority-rules.jsp + + true + src/main/webapp/WEB-INF/pages WEB-INF/pages @@ -443,6 +454,7 @@ counters-min.properties cluster-min.properties templates-min.properties + priority-rules-min.properties users-min.properties bulletin-board-min.properties login-min.properties @@ -659,6 +671,20 @@ ${staging.dir}/js/nf/templates/nf-templates.js + + true + ${project.build.directory}/${project.build.finalName}/js/nf/priority-rules/nf-priority-rules-all.js + + ${staging.dir}/js/nf/nf-dialog.js + ${staging.dir}/js/nf/nf-storage.js + ${staging.dir}/js/nf/nf-common.js + ${staging.dir}/js/nf/nf-error-handler.js + ${staging.dir}/js/nf/nf-universal-capture.js + ${staging.dir}/js/nf/nf-ajax-setup.js + ${staging.dir}/js/nf/priority-rules/nf-priority-rules-table.js + ${staging.dir}/js/nf/priority-rules/nf-priority-rules.js + + true ${project.build.directory}/${project.build.finalName}/js/nf/cluster/nf-cluster-all.js @@ -820,6 +846,16 @@ ${staging.dir}/css/templates.css + + true + ${project.build.directory}/${project.build.finalName}/css/nf-priority-rules-all.css + + ${staging.dir}/css/main.css + ${staging.dir}/css/banner.css + ${staging.dir}/css/dialog.css + ${staging.dir}/css/priority-rules.css + + true ${project.build.directory}/${project.build.finalName}/css/nf-bulletin-board-all.css @@ -882,6 +918,8 @@ css/nf-users-all.css.gz, css/nf-templates-all.css, css/nf-templates-all.css.gz, + css/nf-priority-rules-all.css, + css/nf-priority-rules-all.css.gz, css/nf-bulletin-board-all.css, css/nf-bulletin-board-all.css.gz, css/nf-login-all.css, @@ -920,6 +958,8 @@ js/nf/cluster/nf-cluster-all.js.gz, js/nf/users/nf-users-all.js, js/nf/users/nf-users-all.js.gz, + js/nf/priority-rules/nf-priority-rules-all.js, + js/nf/priority-rules/nf-priority-rules-all.js.gz, js/nf/templates/nf-templates-all.js, js/nf/templates/nf-templates-all.js.gz, js/nf/bulletin-board/nf-bulletin-board-all.js, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/priority-rules-min.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/priority-rules-min.properties new file mode 100644 index 000000000000..eb2524a1d2b9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/priority-rules-min.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +nf.priority-rules.script.tags= + +nf.priority-rules.style.tags=\n\ +\n\ +\n\ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/priority-rules.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/priority-rules.properties new file mode 100644 index 000000000000..c6ac65029237 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/filters/priority-rules.properties @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +nf.priority-rules.script.tags=\n\ +\n\ +\n\ +\n\ +\n\ + +nf.priority-rules.style.tags=\n\ +\n\ +\n\ +\n\ +\n\ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/priority-rules.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/priority-rules.jsp new file mode 100644 index 000000000000..2be085b31fa8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/priority-rules.jsp @@ -0,0 +1,66 @@ +<%-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--%> +<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %> + + + + NiFi Priority Rules + + + + ${nf.priority-rules.style.tags} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ${nf.priority-rules.script.tags} + + + + + + + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/canvas-header.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/canvas-header.jsp index 928e8f5be801..70f85e0cf4da 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/canvas-header.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/canvas-header.jsp @@ -168,6 +168,14 @@ + + + Priority Rules + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/priority-rules/priority-rules-content.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/priority-rules/priority-rules-content.jsp new file mode 100644 index 000000000000..e5f6cc905e34 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/priority-rules/priority-rules-content.jsp @@ -0,0 +1,62 @@ +<%-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--%> +<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %> +<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %> +
+
+
NiFi Priority Rules
+
+
+ Displaying  of  +
+
+ +
+
+ +
+
+
+
+
+
+
+
+ +
+
+ +
+
+ +
+
+
+
+ +
+
+ +
+
+ +
+
+ +
+
+
\ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/web.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/web.xml index 6b8f97edd739..cae6dac7eb23 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/web.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/web.xml @@ -46,6 +46,16 @@ /history + + + NiFiPriorityRules + /WEB-INF/pages/priority-rules.jsp + + + NiFiPriorityRules + /priorityRules + + NiFiProvenance diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/priority-rules.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/priority-rules.css new file mode 100644 index 000000000000..0befe3727256 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/priority-rules.css @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + #priority-rules { + position: absolute; + top: 0x; + bottom: 0x; + left: 20px; + right: 20px; + height: 100%; + overflow: auto; + } + + button.priority-button { + float: right; + margin-top: 4px; + } + + #priority-rules-header-and-filter { + height: 108px; + } + + #priority-rules-refresh-container { + position: absolute; + bottom: 0px; + left: 20px; + right: 20px; + } + + #priority-rules-loading-container { + float: left; + width: 16px; + height: 16px; + background-color: transparent; + margin-top: 4px; + margin-left: 3px; + } + + #priority-rules-last-refreshed { + font-weight: 500; + } + + #priority-rules-header { + padding-top: 10px; + } + + #priority-rules-filter-container { + height: 32px; + width: 100% + } + + #priority-rules-filter { + width: 173px; + margin-right: 3px; + float: left; + } + + #priority-rules-filter-type { + float: left; + } + + /* priority-rules table */ +#priority-rules-table { + position: absolute; + top: 98px; + left: 0px; + bottom: 47px; + right: 0px; + min-height: 150px; +} + +#priority-rules-table div.slick-viewport { + overflow-x: hidden !important; +} + +#add-priority-rule-dialog { + width: 450px; + height: 450px; +} + +#edit-priority-rule-dialog { + width: 450px; + height: 450px; +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-global-menu-controller.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-global-menu-controller.js index 88dc30decce9..ed5f555f9d8d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-global-menu-controller.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/controllers/nf-ng-canvas-global-menu-controller.js @@ -280,6 +280,19 @@ } }; + // The priority rules menu item controller + this.priorityRules = { + // The priority rules menu item shell controller + shell: { + // Launch the priority rules shell + launch: function() { + if(nfCommon.canModifyPriorityRules()) { + nfShell.showPage('priorityRules'); + } + } + } + }; + /** * The templates menu item controller. */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-policy-management.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-policy-management.js index 5bcc4b6612f0..862fdd94aa84 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-policy-management.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-policy-management.js @@ -338,7 +338,7 @@ * @returns {boolean} whether the policy supports read/write options */ var globalPolicySupportsReadWrite = function (policyType) { - return policyType === 'controller' || policyType === 'parameter-contexts' || policyType === 'counters' || policyType === 'policies' || policyType === 'tenants'; + return policyType === 'controller' || policyType === 'parameter-contexts' || policyType === 'counters' || policyType === 'policies' || policyType === 'tenants' || policyType == 'priority-rules'; }; /** @@ -416,7 +416,8 @@ nfCommon.getPolicyTypeListing('site-to-site'), nfCommon.getPolicyTypeListing('system'), nfCommon.getPolicyTypeListing('proxy'), - nfCommon.getPolicyTypeListing('counters')], + nfCommon.getPolicyTypeListing('counters'), + nfCommon.getPolicyTypeListing('priority-rules')], select: function (option) { if (initialized) { // record the policy type diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js index 77d58475bc0c..bcd2681fbc65 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js @@ -159,6 +159,10 @@ text: 'access counters', value: 'counters', description: 'Allows users to view/modify Counters' + }, { + text: 'access priority rules', + value: 'priority-rules', + description: 'Allows users to view/modify Priority Rules' }]; var nfCommon = { @@ -745,6 +749,14 @@ } }, + canModifyPriorityRules: function() { + if(nfCommon.isDefinedAndNotNull(nfCommon.currentUser)) { + return nfCommon.currentUser.priorityRulesPermissions.canRead === true && nfCommon.currentUser.priorityRulesPermissions.canWrite === true; + } else { + return true; + } + }, + /** * Determines whether the current user can modify counters. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/priority-rules/nf-priority-rules-table.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/priority-rules/nf-priority-rules-table.js new file mode 100644 index 000000000000..0ce38031a879 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/priority-rules/nf-priority-rules-table.js @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +(function (root, factory) { + if(typeof define === 'function' && define.amd) { + define(['jquery', 'Slick', 'nf.Common', 'nf.Dialog', 'nf.ErrorHandler'], + function ($, Slick, nfCommon, nfDialog, nfErrorHandler) { + return (nf.PriorityRulesTable = factory($, Slick, nfCommon, nfDialog, nfErrorHandler)); + }); + } else if (typeof exports === 'object' && typeof module === 'object') { + module.exports = (nf.PriorityRulesTable = + factory(require('jquery'), + require('Slick'), + require('nf.Common'), + require('nf.Dialog'), + require('nf.ErrorHandler') ) ); + } else { + nf.PriorityRulesTable = factory(root.$, + root.Slick, + root.nf.Common, + root.nf.Dialog, + root.nf.ErrorHandler); + } +}(this, function($, Slick, nfCommon, nfDialog, nfErrorHandler) { + 'use strict'; + + var isDisconnectionAcknowledged = false; + + var config = { + urls: { + editRule: "../nifi-api/priority-rules/editRule", + getPriorityRules: "../nifi-api/priority-rules/getRuleList" + } + }; + + var sort = function(sortDetails, data) { + var comparer = function(a, b) { + var aString = nfCommon.isDefinedAndNotNull(a[sortDetails.columnId]) ? a[sortDetails.columnId] : ''; + var bString = nfCommon.isDefinedAndNotNull(b[sortDetails.columnId]) ? b[sortDetails.columnId] : ''; + return aString === bString ? 0 : aString > bString ? 1 : -1; + } + + data.sort(comparer, sortDetails.sortAsc); + }; + + /* + * Opens the access policies for the specified priorityRule + * + * @param priorityRuleEntity + */ + var openAccessPolicies = function (priorityRuleEntity) { + // only attempt this if we're within a frame + if(top !== window) { + // and our parent has canvas utils and shell defined + if(nfCommon.isDefinedAndNotNull(parent.nf) && nfCommon.isDefinedAndNotNull(parent.nf.PolicyManagement) && nfCommon.isDefinedAndNotNull(parent.nf.Shell)) { + parent.nf.PolicyManagement.showPriorityRulePolicy(priorityRuleEntity); + parent.$('#shell-close-button').click(); + } + } + }; + + /** + * Applies the filter found in the filter expression text field. + */ + var applyFilter = function () { + // get the dataview + var priorityRulesGrid = $('#priority-rules-table').data('gridInstance'); + + // ensure the grid has been initialized + if (nfCommon.isDefinedAndNotNull(priorityRulesGrid)) { + var priorityRulesData = priorityRulesGrid.getData(); + + // update the search criteria + priorityRulesData.setFilterArgs({ + searchString: getFilterText(), + property: $('#priority-rules-filter-type').combo('getSelectedOption').value + }); + priorityRulesData.refresh(); + priorityRulesGrid.invalidate(); + } + }; + + /** + * Get the text out of the filter field. If the filter field doesn't + * have any text it will contain the text 'filter list' so this method + * accounts for that. + */ + var getFilterText = function () { + return $('#priority-rules-filter').val(); + }; + + /** + * Performs the priorityRules filtering. + * + * @param {object} item The item subject to filtering + * @param {object} args Filter arguments + * @returns {Boolean} Whether or not to include the item + */ + var filter = function (item, args) { + if (args.searchString === '') { + return true; + } + + try { + // perform the row filtering + var filterExp = new RegExp(args.searchString, 'i'); + } catch (e) { + // invalid regex + return false; + } + + return item[args.property].search(filterExp) >= 0; + }; + + // Load the processor PriorityRules table + var loadPriorityRulesTable = function() { + return $.ajax({ + type: 'GET', + url: config.urls.getPriorityRules, + dataType: 'json' + }).done(function (response) { + // ensure there are groups specified + if(nfCommon.isDefinedAndNotNull(response.priorityRules)) { + var priorityRulesGrid = $('#priority-rules-table').data('gridInstance'); + var priorityRulesData = priorityRulesGrid.getData(); + + // set the items + priorityRulesData.setItems(response.priorityRules); + priorityRulesData.reSort(); + priorityRulesGrid.invalidate(); + + // update the stats last refreshed timestamp + $('#priorityRules-last-refreshed').text(response.generated); + + // update the total number of priority rules + $('#total-priorityRules').text(response.priorityRules.length); + } else { + $('#total-priorityRules').text('0'); + } + }).fail(nfErrorHandler.handleAjaxError); + }; + + /* + * Prompts the user before attempting to delete the specified priorityRule + * + * @argument {object} priorityRuleEntity The priorityRule + */ + var promptToDeletePriorityRule = function(priorityRuleEntity) { + // prompt for deletion + nfDialog.showYesNoDialog({ + headerText: 'Delete PriorityRule', + dialogContent: 'Delete priorityRule \'' + nfCommon.escapeHtml(priorityRuleEntity.label) + '(' + nfCommon.escapeHtml(priorityRuleEntity.id) + ')\'?', + yesHandler: function() { + deletePriorityRule(priorityRuleEntity); + } + }); + }; + + /* + * Deletes the priorityRule with the specified id + * + * @argument {string} priorityRuleEntity The priorityRule + */ + var deletePriorityRule = function(entity) { + entity.expired = true; + var priorityRuleEntity = {}; + priorityRuleEntity.priorityRule = entity; + + $.ajax({ + type: 'POST', + url: config.urls.editRule, + data: JSON.stringify(priorityRuleEntity), + dataType: 'json', + contentType: 'application/json' + }).done(function () { + loadPriorityRulesTable(); + }).fail(nfErrorHandler.handleAjaxError); + }; + + return { + /** + * Initializes the priorityRules list + */ + + init: function(disconnectionAcknowledged) { + isDisconnectionAcknowledged = disconnectionAcknowledged; + + // define the function for filtering the list + $('#priority-rules-filter').keyup(function() { + applyFilter(); + }); + + // filter type + $('#priority-rules-filter-type').combo({ + options: [{ + text: 'by label', + value: 'label' + },{ + text: 'by expression', + value: 'expression' + }], + select: function(option) { + applyFilter(); + } + }); + + var labelFormatter = function(row, cell, value, columnDef, dataContext) { + return nfCommon.escapeHtml(dataContext.label); + }; + + var expressionFormatter = function(row, cell, value, columnDef, dataContext) { + return nfCommon.escapeHtml(dataContext.expression); + }; + + var rateOfThreadUsageFormatter = function(row, cell, value, columnDef, dataContext) { + return nfCommon.escapeHtml(dataContext.rateOfThreadUsage); + }; + + // function for formatting the actions column + var actionFormatter = function(row, cell, value, columnDef, dataContext) { + var markup = ''; + markup += '
'; + markup += '
'; + + return markup; + }; + + var priorityRulesColumns = [ + { + id: 'label', + name: 'Label', + sortable: true, + resizable: true, + formatter: labelFormatter + },{ + id: 'expression', + name: 'Expression', + sortable: true, + resizable: true, + formatter: expressionFormatter + },{ + id: 'rateOfThreadUsage', + name: 'Priority Value (%)', + sortable: true, + resizable: true, + formatter: rateOfThreadUsageFormatter + },{ + id: 'actions', + name: ' ', + sortable: false, + resizable: false, + formatter: actionFormatter, + width: 100, + maxWidth: 100 + } + ]; + + var priorityRulesOptions = { + forceFitColumns: true, + enableTextSelectionOnCells: true, + enableCellNavigation: false, + enableColumnReorder: false, + autoEdit: false, + rowHeight: 24 + }; + + var priorityRulesData = new Slick.Data.DataView({ + inlineFilters: false + }); + priorityRulesData.setItems([]); + priorityRulesData.setFilterArgs({ + searchString: '', + property: 'label' + }); + priorityRulesData.setFilter(filter); + + // initialize the sort + sort({ + columnId: 'rateOfThreadUsage', + sortAsc: false + }, priorityRulesData); + + // initialize the grid + var priorityRulesGrid = new Slick.Grid('#priority-rules-table', priorityRulesData, priorityRulesColumns, priorityRulesOptions); + priorityRulesGrid.setSelectionModel(new Slick.RowSelectionModel()); + priorityRulesGrid.registerPlugin(new Slick.AutoTooltips()); + priorityRulesGrid.setSortColumn('rateOfThreadUsage', false); + priorityRulesGrid.onSort.subscribe(function (e, args) { + sort({ + columnId: args.sortCol.id, + sortAsc: args.sortAsc + }, priorityRulesData); + this.invalidate(); + }); + + priorityRulesGrid.onClick.subscribe(function(e, args) { + var target = $(e.target); + + // get the node at this row + var item = priorityRulesData.getItem(args.row); + + // determine the desired action + if(priorityRulesGrid.getColumns()[args.cell].id === 'actions') { + if(target.hasClass('prompt-to-delete-priorityRule')) { + promptToDeletePriorityRule(item); + } else if(target.hasClass('edit-priorityRule')) { + $('#edit-priority-rule-uuid-field').val(item.id); + $('#edit-priority-rule-label-field').val(item.label); + $('#edit-priority-rule-expression-field').val(item.expression); + $('#edit-priority-rule-rate-field').val(item.rateOfThreadUsage); + $('#edit-priority-rule-dialog').modal('show'); + } + } + }); + + // wire up the dataview to the grid + priorityRulesData.onRowCountChanged.subscribe(function(e, args) { + priorityRulesGrid.updateRowCount(); + priorityRulesGrid.render(); + + // update the total number of displayed processors + $('#displayed-priority-rules').text(args.current); + }); + priorityRulesData.onRowsChanged.subscribe(function(e, args) { + priorityRulesGrid.invalidateRows(args.row); + priorityRulesGrid.render(); + }); + + // hold onto an instance of the grid + $('#priority-rules-table').data('gridInstance', priorityRulesGrid); + + // initialize the number of displayed items + $('#displayed-priority-rules').text('0'); + }, + + /** + * Update the size of the grid based on its container's current size + */ + resetTableSize: function() { + var priorityRulesGrid = $('#priority-rules-table').data('gridInstance'); + if(nfCommon.isDefinedAndNotNull(priorityRulesGrid)) { + priorityRulesGrid.resizeCanvas(); + } + }, + + /** + * Load the processor priorityRules table + */ + loadPriorityRulesTable: function() { + return $.ajax({ + type: 'GET', + url: config.urls.getPriorityRules, + dataType: 'json' + }).done(function(response) { + // Ensure there are groups specified + if(nfCommon.isDefinedAndNotNull(response.priorityRules)) { + var priorityRulesGrid = $('#priority-rules-table').data('gridInstance'); + var priorityRulesData = priorityRulesGrid.getData(); + + // set the items + priorityRulesData.setItems(response.priorityRules); + priorityRulesData.reSort(); + priorityRulesGrid.invalidate(); + + // update the stats last refreshed timestamp + $('#priorityRules-last-refreshed').text(response.generated); + + // update the total number of processors + $('#total-priority-rules').text(response.priorityRules.length); + } else { + $('#total-priority-rules').text('0'); + } + }).fail(nfErrorHandler.handleAjaxError); + } + }; +})); \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/priority-rules/nf-priority-rules.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/priority-rules/nf-priority-rules.js new file mode 100644 index 000000000000..b41f46374d77 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/priority-rules/nf-priority-rules.js @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +(function (root, factory) { + if(typeof define === 'function' && define.amd) { + define(['jquery', 'nf.Common', 'nf.Dialog', 'nf.PriorityRulesTable', 'nf.ErrorHandler', 'nf.Storage'], + function($, nfCommon, nfDialog, nfPriorityRulesTable, nfErrorHandler, nfStorage) { + return(nf.PriorityRulesTable = factory($, nfCommon, nfDialog, nfPriorityRulesTable, nfErrorHandler, nfStorage)); + }); + } else if(typeof exports === 'object' && typeof module === 'object') { + module.exports = (nf.PriorityRules = + factory(require('jquery'), + require('nf.Common'), + require('nf.Dialog'), + require('nf.PriorityRulesTable'), + require('nf.ErrorHandler'), + require('nf.Storage') + ) + ); + } else { + nf.PriorityRules = factory(root.$, + root.nf.Common, + root.nf.Dialog, + root.nf.PriorityRulesTable, + root.nf.ErrorHandler, + root.nf.Storage + ); + } +}(this, function($, nfCommon, nfDialog, nfPriorityRulesTable, nfErrorHandler, nfStorage) { + 'use strict'; + + $(document).ready(function () { + nfPriorityRules.init(); + }); + + var config = { + urls: { + banners: '../nifi-api/flow/banners', + about: '../nifi-api/flow/about', + currentUser: '../nifi-api/flow/current-user', + addPriorityRule: '../nifi-api/priority-rules/addRule', + editPriorityRule: '../nifi-api/priority-rules/editRule' + } + }; + + var loadCurrentUser = function() { + return $.ajax({ + type: 'GET', + url: config.urls.currentUser, + dataType: 'json' + }).done(function(currentUser) { + nfCommon.setCurrentUser(currentUser); + }).fail(nfErrorHandler.handleAjaxError); + }; + + var addPriorityRule = function() { + var entity = {}; + entity.label = $('#add-priority-rule-label-field').val(); + entity.expression = $('#add-priority-rule-expression-field').val(); + entity.rateOfThreadUsage = $('#add-priority-rule-rate-field').val(); + entity.expired = false; + var priorityRuleEntity = {}; + priorityRuleEntity.priorityRule = entity; + + $.ajax({ + type: 'POST', + url: config.urls.addPriorityRule, + data: JSON.stringify(priorityRuleEntity), + dataType: 'json', + contentType: 'application/json' + }).done(function (rulesEntity) { + nfPriorityRulesTable.loadPriorityRulesTable(); + $('#add-priority-rule-label-field').val(''); + $('#add-priority-rule-expression-field').val(''); + $('#add-priority-rule-rate-field').val(''); + $('#add-priority-rule-dialog').modal('hide'); + }).fail(nfErrorHandler.handleAjaxError); + }; + + var editPriorityRule = function() { + var entity = {}; + entity.id = $('#edit-priority-rule-uuid-field').val(); + entity.label = $('#edit-priority-rule-label-field').val(); + entity.expression = $('#edit-priority-rule-expression-field').val(); + entity.rateOfThreadUsage = $('#edit-priority-rule-rate-field').val(); + entity.expired = false; + var priorityRuleEntity = {}; + priorityRuleEntity.priorityRule = entity; + + $.ajax({ + type: 'POST', + url: config.urls.editPriorityRule, + data: JSON.stringify(priorityRuleEntity), + dataType: 'json', + contentType: 'application/json' + }).done(function (rulesEntity) { + nfPriorityRulesTable.loadPriorityRulesTable(); + $('#edit-priority-rule-label-field').val(''); + $('#edit-priority-rule-uuid-field').val(''); + $('#edit-priority-rule-expression-field').val(''); + $('#edit-priority-rule-rate-field').val(''); + $('#edit-priority-rule-dialog').modal('hide'); + }).fail(nfErrorHandler.handleAjaxError); + }; + + var initAddPriorityRuleDialog = function() { + $('#new-priority-rule-button').on('click', function() { + $('#add-priority-rule-dialog').modal('show'); + }); + + $('#add-priority-rule-dialog').modal({ + scrollableContentStyle: 'scrollable', + headerText: 'Add Priority Rule', + buttons: [{ + buttonText: 'Add', + color: { + base: '#728E9B', + hover: '#004849', + text: '#FFFFFF' + }, + handler: { + click: function() { + addPriorityRule(); + } + } + },{ + buttonText: 'Cancel', + color: { + base: '#728E9B', + hover: '#004849', + text: '#FFFFFF' + }, + handler: { + click: function() { + $('#add-priority-rule-dialog').modal('hide'); + } + } + }] + }); + + $('#edit-priority-rule-button').on('click', function() { + $('#edit-priority-rule-dialog').modal('show'); + }); + + $('#edit-priority-rule-dialog').modal({ + scrollableContentStyle: 'scrollable', + headerText: 'Edit Priority Rule', + buttons: [{ + buttonText: 'Edit', + color: { + base: '#728E9B', + hover: '#004849', + text: '#FFFFFF' + }, + handler: { + click: function() { + editPriorityRule(); + } + } + },{ + buttonText: 'Cancel', + color: { + base: '#728E9B', + hover: '#004849', + text: '#FFFFFF' + }, + handler: { + click: function() { + $('#edit-priority-rule-dialog').modal('hide'); + } + } + }] + }); + }; + + var verifyDisconnectedCluster = function() { + return $.Deferred(function(deferred) { + if(top !== window && nfCommon.isDefinedAndNotNull(parent.nf) && nfCommon.isDefinedAndNotNull(parent.nf.Storage)) { + deferred.resolve(parent.nf.Storage.isDisconnectionAcknowledged()); + } else { + $.ajax({ + type: 'GET', + url: '../nifi-api/flow/cluster/summary', + dataType: 'json' + }).done(function(clearSummaryResults) { + var clusterSummary = clusterSummaryResults.clusterSummary; + if(clusterSummary.connectedToCluster) { + deferred.resolve(false); + } else { + nfDialog.showDisconnectedFromClusterMessage(function() { + deferred.resolve(); + }); + } + }).fail(nfErrorHandler.handleAjaxError).fail(function() { + deferred.reject(); + }); + } + }).promise(); + }; + + var initializePriorityRulesPage = function() { + $('#priority-refresh-button').on('click', function() { + nfPriorityRulesTable.loadPriorityRulesTable(); + }) + + return $.Deferred(function(deferred) { + if(top === window) { + $.ajax({ + type: 'GET', + url: config.urls.banners, + dataType: 'json' + }).done(function(response) { + // ensure the banners response is specified + if(nfCommon.isDefinedAndNotNull(response.banners)) { + if(nfCommon.isDefinedAndNotNull(response.banners.headerText) && response.banners.headerText !== '') { + // update the header text + var bannerHeader = $('#banner-header').text(response.banners.headerText).show; + + // show the banner + var updateTop = function(elementId) { + var element = $('#' + elementId); + element.css('top', (parseInt(bannerHeader.css('height'), 10) + parseInt(element.css('top'), 10)) + 'px'); + }; + + // update the position of elements affected by top banners + updateTop('priorityRules'); + } + + if(nfCommon.isDefinedAndNotNull(response.banners.footerText) && response.banners.footerText !== '') { + // update the footer text and show it + var bannerFooter = $('#banner-footer').text(response.banners.footerText).show; + + var updateBottom = function(elementId) { + var element = $('#' + elementId); + element.css('bottom', (parseInt(bannerFooter.css('height'), 10)) + 'px'); + }; + + // update the position of elements affected by bottom banners + updateBottom('priorityRules'); + } + } + + deferred.resolve(); + }).fail(function(xhr, status, error) { + nfErrorHandler.handleAjaxError(xhr, status, error); + deferred.reject(); + }); + } else { + deferred.resolve(); + } + }).promise(); + }; + + var nfPriorityRules = { + init: function() { + initAddPriorityRuleDialog(); + nfStorage.init(); + + $.when(verifyDisconnectedCluster(), loadCurrentUser()).done(function (verifyDisconnectedClusterResult) { + nfPriorityRulesTable.init(verifyDisconnectedClusterResult); + + nfPriorityRulesTable.loadPriorityRulesTable().done(function () { + initializePriorityRulesPage().done(function() { + var setBodySize = function() { + // alter styles if we're not in the shell + if(top === window) { + $('body').css({ + 'height':$(window).height() + 'px', + 'width':$(window).width() + 'px' + }); + + $('#priorityRules').css('margin', 40); + $('#priorityRules-table').css('bottom', 127); + $('#priorityRules-refresh-container').css('margin', 40); + } + + nfPriorityRulesTable.resetTableSize(); + }; + + $.ajax({ + type: 'GET', + url: config.urls.about, + dataType: 'json' + }).done(function(response) { + var aboutDetails = response.about; + var priorityRulesTitle = aboutDetails.title + ' PriorityRules'; + + // set the document title and the about title + document.title = priorityRulesTitle; + $('#priorityRules-header-text').text(priorityRulesTitle); + + // Set the initial size + setBodySize(); + }).fail(nfErrorHandler.handleAjaxError); + + $(window).on('resize', function (e) { + setBodySize(); + // resize dialogs when appropriate + var dialogs = $('.dialog'); + for (var i = 0, len = dialogs.length; i < len; i++) { + if ($(dialogs[i]).is(':visible')) { + setTimeout(function (dialog) { + dialog.modal('resize'); + }, 50, $(dialogs[i])); + } + } + + // resize grids when appropriate + var gridElements = $('*[class*="slickgrid_"]'); + for (var j = 0, len = gridElements.length; j < len; j++) { + if ($(gridElements[j]).is(':visible')) { + setTimeout(function (gridElement) { + gridElement.data('gridInstance').resizeCanvas(); + }, 50, $(gridElements[j])); + } + } + + // toggle tabs .scrollable when appropriate + var tabsContainers = $('.tab-container'); + var tabsContents = []; + for (var k = 0, len = tabsContainers.length; k < len; k++) { + if ($(tabsContainers[k]).is(':visible')) { + tabsContents.push($('#' + $(tabsContainers[k]).attr('id') + '-content')); + } + } + $.each(tabsContents, function (index, tabsContent) { + nfCommon.toggleScrollable(tabsContent.get(0)); + }); + }); + }) + }); + }); + } + }; + + return nfPriorityRules; +})); From 3ab154449b96eae3559d911ede156e705906bc5c Mon Sep 17 00:00:00 2001 From: Jon Kessler Date: Tue, 12 Nov 2019 19:20:58 +0000 Subject: [PATCH 2/2] NIFI-6831 Bumping stray SNAPSHOT reference that I missed earlier. --- .../nifi-framework/nifi-framework-core/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index f548d33d9f24..9a296c369d8a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -255,7 +255,7 @@ org.apache.nifi nifi-standard-prioritizers - 1.10.0-SNAPSHOT + 1.11.0-SNAPSHOT compile