Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ public void purge() {
for (final ParameterContext parameterContext : parameterContextManager.getParameterContexts()) {
parameterContextManager.removeParameterContext(parameterContext.getIdentifier());
}

LogRepositoryFactory.purge();
}

private void verifyCanPurge() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", versionedExternalFlow.getFlowContents());

final PropertyDecryptor decryptor = options.getPropertyDecryptor();
final FlowComparator flowComparator = new StandardFlowComparator(proposedFlow, localFlow, group.getAncestorServiceIds(), new StaticDifferenceDescriptor(), decryptor::decrypt);
final FlowComparator flowComparator = new StandardFlowComparator(proposedFlow, localFlow, group.getAncestorServiceIds(),
new StaticDifferenceDescriptor(), decryptor::decrypt, options.getComponentComparisonIdLookup());
final FlowComparison flowComparison = flowComparator.compare();

updatedVersionedComponentIds.clear();
Expand All @@ -184,12 +185,6 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve
if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
continue;
}
// If the difference type is a Scheduled State Change, we want to ignore it, because we are just trying to
// find components that need to be stopped in order to be updated. We don't need to stop a component in order
// to change its Scheduled State.
if (diff.getDifferenceType() == DifferenceType.SCHEDULED_STATE_CHANGED) {
continue;
}

// If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
// and if so compare our VersionedControllerService to the existing service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer;
Expand Down Expand Up @@ -3780,6 +3781,7 @@ public void updateFlow(final VersionedExternalFlow proposedSnapshot, final Strin

final FlowSynchronizationOptions synchronizationOptions = new FlowSynchronizationOptions.Builder()
.componentIdGenerator(idGenerator)
.componentComparisonIdLookup(VersionedComponent::getIdentifier)
.componentScheduler(retainExistingStateScheduler)
.ignoreLocalModifications(!verifyNotDirty)
.updateDescendantVersionedFlows(updateDescendantVersionedFlows)
Expand Down Expand Up @@ -3904,7 +3906,8 @@ private Set<FlowDifference> getModifications() {
final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());

final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), new EvolvingDifferenceDescriptor(), encryptor::decrypt);
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(),
new EvolvingDifferenceDescriptor(), encryptor::decrypt, VersionedComponent::getIdentifier);
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences().stream()
.filter(difference -> !FlowDifferenceFilters.isEnvironmentalChange(difference, versionedGroup, flowManager))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.nifi.flow.ConnectableComponentType;
import org.apache.nifi.flow.Position;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedParameter;
Expand Down Expand Up @@ -191,6 +192,7 @@ public void setup() {

synchronizationOptions = new FlowSynchronizationOptions.Builder()
.componentIdGenerator(componentIdGenerator)
.componentComparisonIdLookup(VersionedComponent::getIdentifier)
.componentScheduler(componentScheduler)
.build();

Expand All @@ -202,6 +204,7 @@ public void setup() {
private FlowSynchronizationOptions createQuickFailSynchronizationOptions(final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction) {
return new FlowSynchronizationOptions.Builder()
.componentIdGenerator(componentIdGenerator)
.componentComparisonIdLookup(VersionedComponent::getIdentifier)
.componentScheduler(componentScheduler)
.componentStopTimeout(Duration.ofMillis(10))
.componentStopTimeoutAction(timeoutAction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.nifi.groups;

import org.apache.nifi.flow.VersionedComponent;

import java.time.Duration;
import java.util.function.Function;

public class FlowSynchronizationOptions {
private final ComponentIdGenerator componentIdGenerator;
private final Function<VersionedComponent, String> componentComparisonIdLookup;
private final ComponentScheduler componentScheduler;
private final PropertyDecryptor propertyDecryptor;
private final boolean ignoreLocalModifications;
Expand All @@ -34,6 +38,7 @@ public class FlowSynchronizationOptions {

private FlowSynchronizationOptions(final Builder builder) {
this.componentIdGenerator = builder.componentIdGenerator;
this.componentComparisonIdLookup = builder.componentComparisonIdLookup;
this.componentScheduler = builder.componentScheduler;
this.propertyDecryptor = builder.propertyDecryptor;
this.ignoreLocalModifications = builder.ignoreLocalModifications;
Expand All @@ -50,6 +55,10 @@ public ComponentIdGenerator getComponentIdGenerator() {
return componentIdGenerator;
}

public Function<VersionedComponent, String> getComponentComparisonIdLookup() {
return componentComparisonIdLookup;
}

public ComponentScheduler getComponentScheduler() {
return componentScheduler;
}
Expand Down Expand Up @@ -92,6 +101,7 @@ public ComponentStopTimeoutAction getComponentStopTimeoutAction() {

public static class Builder {
private ComponentIdGenerator componentIdGenerator;
private Function<VersionedComponent, String> componentComparisonIdLookup;
private ComponentScheduler componentScheduler;
private boolean ignoreLocalModifications = false;
private boolean updateSettings = true;
Expand All @@ -114,6 +124,17 @@ public Builder componentIdGenerator(final ComponentIdGenerator componentIdGenera
return this;
}

/**
* When comparing two flows, the components in those two flows must be matched up by their ID's. This specifies how to determine the ID for a given
* Versioned Component
* @param idLookup the lookup that indicates the ID to use for components
* @return the builder
*/
public Builder componentComparisonIdLookup(final Function<VersionedComponent, String> idLookup) {
this.componentComparisonIdLookup = idLookup;
return this;
}

/**
* Specifies the ComponentScheduler to use for starting connectable components
* @param componentScheduler the ComponentScheduler to use
Expand Down Expand Up @@ -231,6 +252,9 @@ public FlowSynchronizationOptions build() {
if (componentIdGenerator == null) {
throw new IllegalStateException("Must set Component ID Generator");
}
if (componentComparisonIdLookup == null) {
throw new IllegalStateException("Must set the Component Comparison ID Lookup");
}
if (componentScheduler == null) {
throw new IllegalStateException("Must set Component Scheduler");
}
Expand All @@ -241,6 +265,7 @@ public FlowSynchronizationOptions build() {
public static Builder from(final FlowSynchronizationOptions options) {
final Builder builder = new Builder();
builder.componentIdGenerator = options.getComponentIdGenerator();
builder.componentComparisonIdLookup = options.getComponentComparisonIdLookup();
builder.componentScheduler = options.getComponentScheduler();
builder.ignoreLocalModifications = options.isIgnoreLocalModifications();
builder.updateSettings = options.isUpdateSettings();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,10 @@ public AffectedComponentSet toActiveSet() {

private boolean isActive(final ProcessorNode processor) {
// We consider component active if it's starting, running, or has active threads. The call to ProcessorNode.isRunning() will only return true if it has active threads or a scheduled
// state of RUNNING but not if it has a scheduled state of STARTING.
// state of RUNNING but not if it has a scheduled state of STARTING. We also consider if the processor is to be started once the flow controller has been fully initialized, as
// the state of the processor may not yet have been set
final ScheduledState scheduledState = processor.getPhysicalScheduledState();
return scheduledState == ScheduledState.STARTING || scheduledState == ScheduledState.RUNNING || processor.isRunning();
return scheduledState == ScheduledState.STARTING || scheduledState == ScheduledState.RUNNING || processor.isRunning() || flowController.isStartAfterInitialization(processor);
}

private boolean isStopped(final ProcessorNode processor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedParameter;
Expand Down Expand Up @@ -334,6 +335,7 @@ private void synchronizeFlow(final FlowController controller, final DataFlow exi
// Synchronize the root group
final FlowSynchronizationOptions syncOptions = new FlowSynchronizationOptions.Builder()
.componentIdGenerator(componentIdGenerator)
.componentComparisonIdLookup(VersionedComponent::getInstanceIdentifier) // compare components by Instance ID because both versioned flows are derived from instantiated flows
.componentScheduler(componentScheduler)
.ignoreLocalModifications(true)
.updateGroupSettings(true)
Expand Down Expand Up @@ -379,7 +381,8 @@ private FlowComparison compareFlows(final DataFlow existingFlow, final DataFlow
final ComparableDataFlow clusterDataFlow = new StandardComparableDataFlow("Cluster Flow", clusterVersionedFlow.getRootGroup(), toSet(clusterVersionedFlow.getControllerServices()),
toSet(clusterVersionedFlow.getReportingTasks()), toSet(clusterVersionedFlow.getParameterContexts()));

final FlowComparator flowComparator = new StandardFlowComparator(localDataFlow, clusterDataFlow, Collections.emptySet(), differenceDescriptor, encryptor::decrypt);
final FlowComparator flowComparator = new StandardFlowComparator(localDataFlow, clusterDataFlow, Collections.emptySet(),
differenceDescriptor, encryptor::decrypt, VersionedComponent::getInstanceIdentifier);
final FlowComparison flowComparison = flowComparator.compare();
return flowComparison;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
Expand Down Expand Up @@ -738,7 +739,8 @@ private Set<FlowDifference> getLocalModifications(final ProcessGroup processGrou
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);

final Set<String> ancestorServiceIds = processGroup.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor(), Function.identity());
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor(), Function.identity(),
VersionedComponent::getIdentifier);
final FlowComparison flowComparison = flowComparator.compare();
final Set<FlowDifference> differences = flowComparison.getDifferences().stream()
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4889,7 +4889,8 @@ public FlowComparisonEntity getLocalModifications(final String processGroupId) {
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);

final Set<String> ancestorServiceIds = processGroup.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor(), Function.identity());
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor(),
Function.identity(), VersionedComponent::getIdentifier);
final FlowComparison flowComparison = flowComparator.compare();

final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtosForLocalModifications(flowComparison, localGroup, controllerFacade.getFlowManager());
Expand Down Expand Up @@ -5001,7 +5002,8 @@ public Set<AffectedComponentEntity> getComponentsAffectedByFlowUpdate(final Stri
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("New Flow", updatedSnapshot.getFlowContents());

final Set<String> ancestorServiceIds = group.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new StaticDifferenceDescriptor(), Function.identity());
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new StaticDifferenceDescriptor(),
Function.identity(), VersionedComponent::getIdentifier);
final FlowComparison comparison = flowComparator.compare();

final FlowManager flowManager = controllerFacade.getFlowManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ public class StandardFlowComparator implements FlowComparator {
private final Set<String> externallyAccessibleServiceIds;
private final DifferenceDescriptor differenceDescriptor;
private final Function<String, String> propertyDecryptor;
private final Function<VersionedComponent, String> idLookup;

public StandardFlowComparator(final ComparableDataFlow flowA, final ComparableDataFlow flowB,
final Set<String> externallyAccessibleServiceIds, final DifferenceDescriptor differenceDescriptor, final Function<String, String> propertyDecryptor) {
public StandardFlowComparator(final ComparableDataFlow flowA, final ComparableDataFlow flowB, final Set<String> externallyAccessibleServiceIds,
final DifferenceDescriptor differenceDescriptor, final Function<String, String> propertyDecryptor, final Function<VersionedComponent, String> idLookup) {
this.flowA = flowA;
this.flowB = flowB;
this.externallyAccessibleServiceIds = externallyAccessibleServiceIds;
this.differenceDescriptor = differenceDescriptor;
this.propertyDecryptor = propertyDecryptor;
this.idLookup = idLookup;
}

@Override
Expand All @@ -93,6 +95,13 @@ private Set<FlowDifference> compare(final VersionedProcessGroup groupA, final Ve
return differences;
}

private boolean allHaveInstanceId(Set<? extends VersionedComponent> components) {
if (components == null) {
return false;
}

return components.stream().allMatch(component -> component.getInstanceIdentifier() != null);
}

private <T extends VersionedComponent> Set<FlowDifference> compareComponents(final Set<T> componentsA, final Set<T> componentsB, final ComponentComparator<T> comparator) {
final Map<String, T> componentMapA = byId(componentsA == null ? Collections.emptySet() : componentsA);
Expand Down Expand Up @@ -515,11 +524,7 @@ private void compare(final VersionedConnection connectionA, final VersionedConne


private <T extends VersionedComponent> Map<String, T> byId(final Set<T> components) {
return components.stream().collect(Collectors.toMap(VersionedComponent::getIdentifier, Function.identity()));
}

private Map<String, VersionedParameterContext> parameterContextsById(final Set<VersionedParameterContext> contexts) {
return contexts.stream().collect(Collectors.toMap(VersionedParameterContext::getIdentifier, Function.identity()));
return components.stream().collect(Collectors.toMap(idLookup::apply, Function.identity()));
}

private <T extends VersionedComponent> void addIfDifferent(final Set<FlowDifference> differences, final DifferenceType type, final T componentA, final T componentB,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.nifi.registry.flow.diff;

import org.apache.nifi.flow.VersionedComponent;

import java.util.Objects;
import java.util.Optional;

import org.apache.nifi.flow.VersionedComponent;

public class StandardFlowDifference implements FlowDifference {
private final DifferenceType type;
private final VersionedComponent componentA;
Expand Down Expand Up @@ -91,6 +91,8 @@ public String toString() {
public int hashCode() {
return 31 + 17 * (componentA == null ? 0 : componentA.getIdentifier().hashCode()) +
17 * (componentB == null ? 0 : componentB.getIdentifier().hashCode()) +
15 * (componentA == null ? 0 : Objects.hash(componentA.getInstanceIdentifier())) +
15 * (componentB == null ? 0 : Objects.hash(componentB.getInstanceIdentifier())) +
Objects.hash(description, type, valueA, valueB);
}

Expand All @@ -112,6 +114,18 @@ public boolean equals(final Object obj) {
final String componentBId = componentB == null ? null : componentB.getIdentifier();
final String otherComponentBId = other.componentB == null ? null : other.componentB.getIdentifier();

// If both flows have a component A with an instance identifier, the instance ID's must be the same.
if (componentA != null && componentA.getInstanceIdentifier() != null && other.componentA != null && other.componentA.getInstanceIdentifier() != null
&& !componentA.getInstanceIdentifier().equals(other.componentA.getInstanceIdentifier())) {
return false;
}

// If both flows have a component B with an instance identifier, the instance ID's must be the same.
if (componentB != null && componentB.getInstanceIdentifier() != null && other.componentB != null && other.componentB.getInstanceIdentifier() != null
&& !componentB.getInstanceIdentifier().equals(other.componentB.getInstanceIdentifier())) {
return false;
}

return Objects.equals(componentAId, otherComponentAId) && Objects.equals(componentBId, otherComponentBId)
&& Objects.equals(description, other.description) && Objects.equals(type, other.type)
&& Objects.equals(valueA, other.valueA) && Objects.equals(valueB, other.valueB);
Expand Down
Loading