Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-12735 Adding the option to execute flow analysis before committing snapshot to registry #8377

Closed
wants to merge 3 commits into from

Conversation

simonbence
Copy link
Contributor

@simonbence simonbence commented Feb 8, 2024

Summary

NIFI-12735

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 21

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

if (analyzeProcessGroupToRegister(snapshot)) {
return node.registerFlowSnapshot(context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion);
} else {
throw new ViolatingFlowException("Cannot store flow version to registry due to rules violations");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently on the UI we get this error message:

Failed to register flow with Flow Registry due to Cannot store flow version to registry due to rules violations

which is a bit convoluted, something like this sounds more natural:

Failed to register flow with Flow Registry due to There are unresolved rule violations
Suggested change
throw new ViolatingFlowException("Cannot store flow version to registry due to rules violations");
throw new InvalidFlowException("There are unresolved rule violations");

package org.apache.nifi.registry.flow;

public class ViolatingFlowException extends FlowRegistryException {
public ViolatingFlowException(final String message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Violating' is not really an expression we use normally as an adjective so it's use like this feels unnatural.
I propose the name InvalidFlowException that has the added benefit of being more general, in case we later add other methods to validate flows.

Suggested change
public ViolatingFlowException(final String message) {
public InvalidFlowException(final String message) {

final FlowAnalyzer flowAnalyzer,
final RuleViolationsManager ruleViolationsManager,
final FlowManager flowManager,
final Supplier<NiFiRegistryFlowMapper> flowMapperSupplier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason for this to be a Supplier. The mapper the flow analysis requires an implementation that doesn't rely on any configuration that can change runtime.
So we can just create it and pass on.

final InstantiatedVersionedProcessGroup nonVersionedProcessGroup = mapper.mapNonVersionedProcessGroup(flowManager.getGroup(snapshot.getInstanceIdentifier()), serviceProvider);

flowAnalyzer.analyzeProcessGroup(nonVersionedProcessGroup);
final Collection<RuleViolation> ruleViolations = ruleViolationsManager.getRuleViolationsForGroup(snapshot.getInstanceIdentifier());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want only violations with ENFORCE policy to block committing to Registry.

Suggested change
final Collection<RuleViolation> ruleViolations = ruleViolationsManager.getRuleViolationsForGroup(snapshot.getInstanceIdentifier());
final List<RuleViolation> enforcingRuleViolations = ruleViolationsManager.getRuleViolationsForGroup(snapshot.getInstanceIdentifier()).stream()
.filter(ruleViolation -> EnforcementPolicy.ENFORCE.equals(ruleViolation.getEnforcementPolicy()))
.collect(Collectors.toList());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. What do you think, from rules and violation perspective, would it make sense to allow users to configure the level or violations which can prevents commit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this. We could change the nifi.property entry from a boolean to accommodate for this but even that might be a much.

}

@Test
public void testWhenNoViolationFound() throws IOException, FlowRegistryException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test reports can help a lot understanding the various use cases if the method names describe the cases properly. The 'test' prefix on the other hand has no meaningful purpose and although doesn't hurt much, if the method name would become too long, it's better to omit it.

Suggested change
public void testWhenNoViolationFound() throws IOException, FlowRegistryException {
public void allowFlowRegistrationWhenNoEnforcingViolationFound() throws IOException, FlowRegistryException {

Comment on lines 421 to 432
.flowAnalysisAtRegistryCommit(nifiProperties.shouldCheckForRulesViolationWhenRegistryCommit());

if (getFlowAnalyzer().isPresent()) {
extensionBuilder.flowAnalyzer(getFlowAnalyzer().get());
}

if (getRuleViolationsManager().isPresent()) {
extensionBuilder.ruleViolationsManager(getRuleViolationsManager().get());
}

final FlowRegistryClientNode clientNode =
extensionBuilder.buildFlowRegistryClient();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.flowAnalysisAtRegistryCommit(nifiProperties.shouldCheckForRulesViolationWhenRegistryCommit());
if (getFlowAnalyzer().isPresent()) {
extensionBuilder.flowAnalyzer(getFlowAnalyzer().get());
}
if (getRuleViolationsManager().isPresent()) {
extensionBuilder.ruleViolationsManager(getRuleViolationsManager().get());
}
final FlowRegistryClientNode clientNode =
extensionBuilder.buildFlowRegistryClient();
.flowAnalyzer(getFlowAnalyzer().orElse(null))
.ruleViolationsManager(getRuleViolationsManager().orElse(null))

@@ -300,6 +300,9 @@ public class NiFiProperties extends ApplicationProperties {
// flow analysis properties
public static final String BACKGROUND_FLOW_ANALYSIS_SCHEDULE = "nifi.flow.analysis.background.task.schedule";

// registry client properties
public static final String CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT = "nifi.registry.check.for.rules.violation";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to grasp the concept properly. It indicates only the fact that violations can be important but doesn't tell why or what the consequences are.

Suggested change
public static final String CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT = "nifi.registry.check.for.rules.violation";
public static final String NIFI_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = "nifi.registry.check.for.rule.violations.before.commit";

@@ -401,6 +404,7 @@ public class NiFiProperties extends ApplicationProperties {
private static final String DEFAULT_SECURITY_USER_JWS_KEY_ROTATION_PERIOD = "PT1H";
public static final String DEFAULT_WEB_SHOULD_SEND_SERVER_VERSION = "true";
public static final int DEFAULT_LISTENER_BOOTSTRAP_PORT = 0;
public static final Boolean DEFAULT_CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final Boolean DEFAULT_CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT = false;
public static final Boolean DEFAULT_NIFI_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = false;

@@ -1826,6 +1830,17 @@ public Set<String> getDirectSubsequentTokens(final String prefix) {
.collect(Collectors.toSet());
}

/**
* @return Returns true if NiFi should execute flow analysis check on the process group before committing it to a registry. Returns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return Returns true if NiFi should execute flow analysis check on the process group before committing it to a registry. Returns
* @return Returns true if NiFi should execute flow analysis on the process group before committing it to a registry. Returns

Comment on lines 1837 to 1865
public boolean shouldCheckForRulesViolationWhenRegistryCommit() {
final String shouldCheckForRulesViolationWhenRegistryCommit = getProperty(
CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT,
DEFAULT_CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT.toString());
return Boolean.parseBoolean(shouldCheckForRulesViolationWhenRegistryCommit);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public boolean shouldCheckForRulesViolationWhenRegistryCommit() {
final String shouldCheckForRulesViolationWhenRegistryCommit = getProperty(
CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT,
DEFAULT_CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT.toString());
return Boolean.parseBoolean(shouldCheckForRulesViolationWhenRegistryCommit);
}
public boolean nifiRegistryCheckForRuleViolationsBeforeCommit() {
final String nifiRegistryCheckForRuleViolationsBeforeCommit = getProperty(
NIFI_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT,
DEFAULT_NIFI_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT.toString());
if (!"true".equalsIgnoreCase(nifiRegistryCheckForRuleViolationsBeforeCommit) && !"false".equalsIgnoreCase(nifiRegistryCheckForRuleViolationsBeforeCommit)) {
throw new RuntimeException(String.format("%s was '%s', expected true or false", NiFiProperties.ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER, nifiRegistryCheckForRuleViolationsBeforeCommit));
}
return Boolean.parseBoolean(nifiRegistryCheckForRuleViolationsBeforeCommit);
}

@@ -303,6 +303,9 @@ public class NiFiProperties extends ApplicationProperties {
// flow analysis properties
public static final String BACKGROUND_FLOW_ANALYSIS_SCHEDULE = "nifi.flow.analysis.background.task.schedule";

// registry client properties
public static final String FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = "nifi.registry.check.for.rule.violation.before.commit";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add this property in nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties and in nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml as well.

Suggested change
public static final String FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = "nifi.registry.check.for.rule.violation.before.commit";
public static final String FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = "nifi.registry.check.for.rule.violations.before.commit";

@tpalfy
Copy link
Contributor

tpalfy commented Apr 3, 2024

LGTM
Thank you for your work @simonbence !
Merged into main.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants