Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion nifi-api/src/main/java/org/apache/nifi/action/Component.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ public enum Component {
ParameterContext,
AccessPolicy,
User,
UserGroup;
UserGroup,
PriorityRule;
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public abstract class NiFiProperties {
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME= "nifi.analytics.connection.model.score.name";
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold";

// global priority flowfile queue properties
public static final String CONTROLLER_FLOWFILEQUEUE_BUCKETS = "nifi.controller.flowfilequeue.buckets";
public static final String CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES = "nifi.controller.flowfilequeue.buckets.cache.expiration.minutes";
public static final String PRIORITY_RULESMANAGER_CONFIG = "nifi.priority.rulesmanager.config";

// defaults
public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml";
Expand Down Expand Up @@ -326,6 +331,11 @@ public abstract class NiFiProperties {
public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = "rSquared";
public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = .90;

// global priority queue defaults
public static final String DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS = "false";
public static final int DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES = 5;
public static final String DEFAULT_PRIORITY_RULESMANAGER_CONFIG = "./conf/priorityRules.json";

/**
* Retrieves the property value for the given property key.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.web.api.dto;

import io.swagger.annotations.ApiModelProperty;

import javax.xml.bind.annotation.XmlType;

@XmlType(name = "priorityRule")
public class PriorityRuleDTO {
private String id;
private String expression;
private String label;
private int rateOfThreadUsage;
private boolean expired;

@ApiModelProperty(value = "The id of the priority rule")
public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

@ApiModelProperty(value = "The expression of the priority rule")
public String getExpression() {
return expression;
}

public void setExpression(String expression) {
this.expression = expression;
}

@ApiModelProperty(value = "An optional human readable label for the priority rule")
public String getLabel() {
return label;
}

public void setLabel(String label) {
this.label = label;
}

@ApiModelProperty(value = "The % of the time files of this priority will be given to an available thread for processing when polled")
public int getRateOfThreadUsage() {
return rateOfThreadUsage;
}

public void setRateOfThreadUsage(int rateOfThreadUsage) {
this.rateOfThreadUsage = rateOfThreadUsage;
}

@ApiModelProperty(value = "Whether or not the priority rule is expired")
public boolean isExpired() {
return expired;
}

public void setExpired(boolean expired) {
this.expired = expired;
}

@Override
public String toString() {
return "PriorityRuleDTO{" +
"id='" + id + '\'' +
", expression='" + expression + '\'' +
", label='" + label + '\'' +
", rateOfThreadUsage=" + rateOfThreadUsage +
", expired=" + expired +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CurrentUserEntity extends Entity {
private PermissionsDTO systemPermissions;
private PermissionsDTO parameterContextPermissions;
private PermissionsDTO restrictedComponentsPermissions;
private PermissionsDTO priorityRulesPermissions;
private Set<ComponentRestrictionPermissionDTO> componentRestrictionPermissions;

private boolean canVersionFlows;
Expand Down Expand Up @@ -80,6 +81,18 @@ public void setProvenancePermissions(PermissionsDTO provenancePermissions) {
this.provenancePermissions = provenancePermissions;
}

/*
* @return permissions for accessing priority rules
*/
@ApiModelProperty("Permissions for accessing priority rules")
public PermissionsDTO getPriorityRulesPermissions() {
return priorityRulesPermissions;
}

public void setPriorityRulesPermissions(PermissionsDTO priorityRulesPermissions) {
this.priorityRulesPermissions = priorityRulesPermissions;
}

/**
* @return permissions for accessing counters
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.web.api.entity;

import org.apache.nifi.web.api.dto.PriorityRuleDTO;

import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement(name = "priorityRuleEntity")
public class PriorityRuleEntity {
private PriorityRuleDTO priorityRule;

public PriorityRuleDTO getPriorityRule() {
return priorityRule;
}

public void setPriorityRule(PriorityRuleDTO priorityRule) {
this.priorityRule = priorityRule;
}

@Override
public String toString() {
return "PriorityRuleEntity{" +
"priorityRule=" + priorityRule +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,23 @@ public String getSafeDescription() {
}
};

private final static Resource PRIORITY_RULES_RESOURCE = new Resource() {
@Override
public String getIdentifier() {
return ResourceType.PriorityRules.getValue();
}

@Override
public String getName() {
return "Priority Rules for ";
}

@Override
public String getSafeDescription() {
return "priority rules";
}
};

private final static Resource COUNTERS_RESOURCE = new Resource() {
@Override
public String getIdentifier() {
Expand Down Expand Up @@ -468,6 +485,30 @@ public String getSafeDescription() {
};
}

/**
* Gets a Resource for accessing priority rules.
*
* @return The resource
*/
public static Resource getPriorityRulesResource() {
return new Resource() {
@Override
public String getIdentifier() {
return PRIORITY_RULES_RESOURCE.getIdentifier();
}

@Override
public String getName() {
return PRIORITY_RULES_RESOURCE.getName();
}

@Override
public String getSafeDescription() {
return PRIORITY_RULES_RESOURCE.getSafeDescription();
}
};
}

/**
* Gets a Resource for accessing component operations.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public enum ResourceType {
Label("/labels"),
OutputPort("/output-ports"),
Policy("/policies"),
PriorityRules("/priority-rules"),
Processor("/processors"),
ProcessGroup("/process-groups"),
Provenance("/provenance"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected void mergeResponses(final CurrentUserEntity clientEntity, final Map<No
mergePermissions(clientEntity.getTenantsPermissions(), entity.getTenantsPermissions());
mergePermissions(clientEntity.getSystemPermissions(), entity.getSystemPermissions());
mergePermissions(clientEntity.getTenantsPermissions(), entity.getTenantsPermissions());
mergePermissions(clientEntity.getPriorityRulesPermissions(), entity.getPriorityRulesPermissions());

final Set<ComponentRestrictionPermissionDTO> clientEntityComponentRestrictionsPermissions = clientEntity.getComponentRestrictionPermissions();
final Set<ComponentRestrictionPermissionDTO> entityComponentRestrictionsPermissions = entity.getComponentRestrictionPermissions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void testMergeUserPermissions() {
userNode1.setRestrictedComponentsPermissions(buildPermissions(false, false));
userNode1.setSystemPermissions(buildPermissions(true, true));
userNode1.setTenantsPermissions(buildPermissions(false, true));
userNode1.setPriorityRulesPermissions(buildPermissions(true, true));

final Set<ComponentRestrictionPermissionDTO> componentRestrictionsNode1 = new HashSet<>();
componentRestrictionsNode1.add(buildComponentRestriction(RequiredPermission.ACCESS_KEYTAB, true, true));
Expand All @@ -64,6 +65,7 @@ public void testMergeUserPermissions() {
userNode2.setRestrictedComponentsPermissions(buildPermissions(true, true));
userNode2.setSystemPermissions(buildPermissions(false, false));
userNode2.setTenantsPermissions(buildPermissions(true, true));
userNode2.setPriorityRulesPermissions(buildPermissions(true, true));

final Set<ComponentRestrictionPermissionDTO> componentRestrictionsNode2 = new HashSet<>();
componentRestrictionsNode2.add(buildComponentRestriction(RequiredPermission.ACCESS_KEYTAB, true, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,17 @@
<version>1.0.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-prioritizers</artifactId>
<version>1.11.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand All @@ -262,6 +273,7 @@
<exclude>src/test/resources/old-swap-file.swap</exclude>
<exclude>src/test/resources/xxe_template.xml</exclude>
<exclude>src/test/resources/swap/444-old-swap-file.swap</exclude>
<exclude>src/test/resources/priorityRules.json</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
package org.apache.nifi.controller;

import static java.util.Objects.requireNonNull;
import static org.apache.nifi.util.NiFiProperties.CONTROLLER_FLOWFILEQUEUE_BUCKETS;
import static org.apache.nifi.util.NiFiProperties.CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES;
import static org.apache.nifi.util.NiFiProperties.DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS;
import static org.apache.nifi.util.NiFiProperties.DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES;
import static org.apache.nifi.util.NiFiProperties.DEFAULT_PRIORITY_RULESMANAGER_CONFIG;
import static org.apache.nifi.util.NiFiProperties.PRIORITY_RULESMANAGER_CONFIG;

import java.io.ByteArrayInputStream;
import java.io.File;
Expand Down Expand Up @@ -98,6 +104,7 @@
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.GlobalPriorityFlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
Expand Down Expand Up @@ -177,6 +184,7 @@
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.StandardParameterContextManager;
import org.apache.nifi.priority.RulesManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ComponentIdentifierLookup;
Expand Down Expand Up @@ -267,6 +275,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
private final StateManagerProvider stateManagerProvider;
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
private final VariableRegistry variableRegistry;
private final RulesManager rulesManager;

private final ConnectionLoadBalanceServer loadBalanceServer;
private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
Expand Down Expand Up @@ -502,6 +511,13 @@ private FlowController(
throw new RuntimeException(e);
}

String rulesManagerConfigString = nifiProperties.getProperty(PRIORITY_RULESMANAGER_CONFIG, DEFAULT_PRIORITY_RULESMANAGER_CONFIG);
File rulesManagerConfigFile = new File(rulesManagerConfigString);
this.rulesManager = new RulesManager(rulesManagerConfigFile, stateManagerProvider);
// The following purge only does something if stateManagerProvider is not null
this.rulesManager.purgeExpiredRulesFromState();
this.rulesManager.readRules();

processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, encryptor, stateManagerProvider, this.nifiProperties);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);

Expand Down Expand Up @@ -734,6 +750,10 @@ public void run() {
}
}

public RulesManager getRulesManager() {
return this.rulesManager;
}

@Override
public Authorizable getParentAuthorizable() {
return null;
Expand Down Expand Up @@ -1861,7 +1881,15 @@ public EventReporter getEventReporter() {
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
final FlowFileQueue flowFileQueue;

if (clusterCoordinator == null) {
String useBucketQueues = nifiProperties.getProperty(CONTROLLER_FLOWFILEQUEUE_BUCKETS, DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS);

if(useBucketQueues.equalsIgnoreCase("true")) {
int cacheExpiration = nifiProperties.getIntegerProperty(CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES,
DEFAULT_CONTROLLER_FLOWFILEQUEUE_BUCKETS_CACHE_EXPIRATION_MINUTES);
flowFileQueue = new GlobalPriorityFlowFileQueue(id, processScheduler, flowFileRepository, provenanceRepository, resourceClaimManager, rulesManager, cacheExpiration,
nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold(), swapManager, nifiProperties.getQueueSwapThreshold(),
eventReporter);
} else if (clusterCoordinator == null) {
flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold());
} else {
Expand Down
Loading