Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,11 @@ public void verifyCanSetParameters(final Map<ParameterDescriptor, Parameter> cur
for (final Map.Entry<String, Parameter> entry : updatedParameters.entrySet()) {
final String parameterName = entry.getKey();
final Parameter parameter = entry.getValue();
final Parameter currentParameter = currentParameters.get(new ParameterDescriptor.Builder().name(parameterName).build());

if (parameter == null) {
// parameter is being deleted.
validateReferencingComponents(parameterName, null, duringUpdate);
validateReferencingComponents(parameterName, currentParameter, null, duringUpdate);
continue;
}

Expand All @@ -752,7 +754,7 @@ public void verifyCanSetParameters(final Map<ParameterDescriptor, Parameter> cur
}

validateSensitiveFlag(currentParameters, parameter);
validateReferencingComponents(parameterName, parameter, duringUpdate);
validateReferencingComponents(parameterName, currentParameter, parameter, duringUpdate);
}
}

Expand All @@ -777,15 +779,18 @@ private void validateSensitiveFlag(final Map<ParameterDescriptor, Parameter> cur
}
}

private void validateReferencingComponents(final String parameterName, final Parameter parameter, final boolean duringUpdate) {
private void validateReferencingComponents(final String parameterName, final Parameter currentParameter, final Parameter parameter, final boolean duringUpdate) {
final boolean isDeletion = (parameter == null);
final String action = isDeletion ? "remove" : "update";
final boolean runtimeAffectingChange = isRuntimeAffectingChange(currentParameter, parameter);
final boolean enforceReferencingState = isDeletion || (duringUpdate && runtimeAffectingChange);

for (final ProcessorNode procNode : parameterReferenceManager.getProcessorsReferencing(this, parameterName)) {
if (procNode.isExtensionMissing()) {
continue;
}

if (procNode.isRunning() && (isDeletion || duringUpdate)) {
if (procNode.isRunning() && enforceReferencingState) {
throw new IllegalStateException("Cannot " + action + " parameter '" + parameterName + "' because it is referenced by " + procNode + ", which is currently running");
}

Expand All @@ -800,7 +805,7 @@ private void validateReferencingComponents(final String parameterName, final Par
}

final ControllerServiceState serviceState = serviceNode.getState();
if (serviceState != ControllerServiceState.DISABLED && (isDeletion || duringUpdate)) {
if (serviceState != ControllerServiceState.DISABLED && enforceReferencingState) {
throw new IllegalStateException("Cannot " + action + " parameter '" + parameterName + "' because it is referenced by "
+ serviceNode + ", which currently has a state of " + serviceState);
}
Expand All @@ -811,6 +816,24 @@ private void validateReferencingComponents(final String parameterName, final Par
}
}

/**
* Determines whether an update from {@code currentParameter} to {@code proposedParameter} would affect the runtime
* behavior of components that reference the parameter. Metadata-only changes such as description or tag updates do
* not affect runtime behavior and therefore do not require referencing components to be stopped or disabled.
*/
private boolean isRuntimeAffectingChange(final Parameter currentParameter, final Parameter proposedParameter) {
if (currentParameter == null || proposedParameter == null) {
return true;
}
if (currentParameter.getDescriptor().isSensitive() != proposedParameter.getDescriptor().isSensitive()) {
return true;
}
if (!Objects.equals(currentParameter.getValue(), proposedParameter.getValue())) {
return true;
}
return !Objects.equals(currentParameter.getReferencedAssets(), proposedParameter.getReferencedAssets());
}

private void validateParameterSensitivity(final Parameter parameter, final ComponentNode componentNode) {
final String paramName = parameter.getDescriptor().getName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@

public class TestStandardParameterContext {

private static final String PARAM_NAME_ABC = "abc";
private static final String PARAM_VALUE = "value";
private static final String PARAM_VALUE_UPDATED = "new-value";
private static final String DESCRIPTION_ORIGINAL = "original description";
private static final String DESCRIPTION_UPDATED = "updated description";

@Test
public void testUpdatesApply() {
final ParameterReferenceManager referenceManager = new HashMapParameterReferenceManager();
Expand Down Expand Up @@ -549,6 +555,59 @@ public void testChangingNestedParameterForEnabledControllerService() {
assertThrows(IllegalStateException.class, () -> b.setInheritedParameterContexts(Collections.emptyList()));
}

@Test
public void testDescriptionOnlyUpdateAllowedWhileReferencingProcessorRunning() {
final HashMapParameterReferenceManager referenceManager = new HashMapParameterReferenceManager();
final ParameterContext context = createStandardParameterContext(referenceManager);
final ProcessorNode procNode = getProcessorNode(PARAM_NAME_ABC, referenceManager);

final ParameterDescriptor originalDescriptor = new ParameterDescriptor.Builder().name(PARAM_NAME_ABC).description(DESCRIPTION_ORIGINAL).build();
context.setParameters(Collections.singletonMap(PARAM_NAME_ABC, createParameter(originalDescriptor, PARAM_VALUE)));

startProcessor(procNode);

final ParameterDescriptor updatedDescriptor = new ParameterDescriptor.Builder().name(PARAM_NAME_ABC).description(DESCRIPTION_UPDATED).build();
context.setParameters(Collections.singletonMap(PARAM_NAME_ABC, createParameter(updatedDescriptor, PARAM_VALUE)));

final Parameter updatedParam = context.getParameter(PARAM_NAME_ABC).get();
assertEquals(DESCRIPTION_UPDATED, updatedParam.getDescriptor().getDescription());
assertEquals(PARAM_VALUE, updatedParam.getValue());

// Changing the value while the processor is running is still rejected
final Parameter valueChanged = createParameter(updatedDescriptor, PARAM_VALUE_UPDATED);
assertThrows(IllegalStateException.class, () -> context.setParameters(Collections.singletonMap(PARAM_NAME_ABC, valueChanged)));
assertEquals(PARAM_VALUE, context.getParameter(PARAM_NAME_ABC).get().getValue());
}

@Test
public void testDescriptionOnlyUpdateAllowedWhileReferencingServiceEnabled() {
final HashMapParameterReferenceManager referenceManager = new HashMapParameterReferenceManager();
final ParameterContext context = createStandardParameterContext(referenceManager);
final ControllerServiceNode serviceNode = mock(ControllerServiceNode.class);
setControllerServiceState(serviceNode, ControllerServiceState.DISABLED);
referenceManager.addControllerServiceReference(PARAM_NAME_ABC, serviceNode);

final ParameterDescriptor originalDescriptor = new ParameterDescriptor.Builder().name(PARAM_NAME_ABC).description(DESCRIPTION_ORIGINAL).build();
context.setParameters(Collections.singletonMap(PARAM_NAME_ABC, createParameter(originalDescriptor, PARAM_VALUE)));

for (final ControllerServiceState state : EnumSet.of(ControllerServiceState.ENABLED, ControllerServiceState.ENABLING, ControllerServiceState.DISABLING)) {
setControllerServiceState(serviceNode, state);

final String newDescription = "updated while " + state;
final ParameterDescriptor updatedDescriptor = new ParameterDescriptor.Builder().name(PARAM_NAME_ABC).description(newDescription).build();
context.setParameters(Collections.singletonMap(PARAM_NAME_ABC, createParameter(updatedDescriptor, PARAM_VALUE)));

final Parameter updatedParam = context.getParameter(PARAM_NAME_ABC).get();
assertEquals(newDescription, updatedParam.getDescriptor().getDescription());
assertEquals(PARAM_VALUE, updatedParam.getValue());

// Attempting to change the value while the referencing service is active still fails
final Parameter valueChange = createParameter(updatedDescriptor, PARAM_VALUE_UPDATED + "-" + state);
assertThrows(IllegalStateException.class, () -> context.setParameters(Collections.singletonMap(PARAM_NAME_ABC, valueChange)));
assertEquals(PARAM_VALUE, context.getParameter(PARAM_NAME_ABC).get().getValue());
}
}

@Test
public void testChangingParameterForEnabledControllerService() {
final HashMapParameterReferenceManager referenceManager = new HashMapParameterReferenceManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
Expand Down Expand Up @@ -70,6 +72,15 @@ class ParameterContextPreservationIT extends NiFiSystemIT {
private static final String PARAMETER_DESCRIPTION_V2 = "Description for version 2";
private static final String CONTEXT_DESCRIPTION_V1 = "Context description v1";
private static final String CONTEXT_DESCRIPTION_V2 = "Context description v2";
private static final String COUNT_PARAMETER_NAME = "countStart";
private static final String COUNT_PARAMETER_VALUE = "0";
private static final String COUNT_PARAMETER_REFERENCE = "#{" + COUNT_PARAMETER_NAME + "}";
private static final String COUNT_PARAMETER_DESCRIPTION_V1 = "Counter start description v1";
private static final String COUNT_PARAMETER_DESCRIPTION_V2 = "Counter start description v2";
private static final String COUNT_SERVICE_TYPE = "StandardCountService";
private static final String COUNT_SERVICE_START_VALUE_PROPERTY = "Start Value";
private static final String CONTROLLER_SERVICE_STATE_ENABLED = "ENABLED";
private static final String CONTROLLER_SERVICE_STATE_DISABLED = "DISABLED";

@Test
void testNewProcessGroupUsesCorrectParameterContextDuringUpgrade() throws NiFiClientException, IOException, InterruptedException {
Expand Down Expand Up @@ -235,15 +246,20 @@ void testNewParameterInInheritedContextSynchronizedDuringUpgrade() throws NiFiCl
}

/**
* Verifies that parameter descriptions are updated when upgrading a versioned process group from one version
* to the next, even when the parameter value itself remains unchanged.
* Verifies that parameter and parameter context descriptions are updated when upgrading a versioned
* process group from one version to the next, even when the parameter value itself remains unchanged
* and the parameter is referenced by a Controller Service that is currently ENABLED on the target.
* Description-only updates do not affect runtime behavior, so referencing components should not have
* to be disabled for the upgrade to succeed.
*/
@Test
void testParameterDescriptionUpdatedDuringVersionUpgrade() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient();
final NiFiClientUtil util = getClientUtil();

final Set<ParameterEntity> parameterEntitiesV1 = Set.of(util.createParameterEntity(PARAMETER_NAME, PARAMETER_DESCRIPTION_V1, false, PARAMETER_VALUE));
final Set<ParameterEntity> parameterEntitiesV1 = Set.of(
util.createParameterEntity(PARAMETER_NAME, PARAMETER_DESCRIPTION_V1, false, PARAMETER_VALUE),
util.createParameterEntity(COUNT_PARAMETER_NAME, COUNT_PARAMETER_DESCRIPTION_V1, false, COUNT_PARAMETER_VALUE));
final ParameterContextEntity paramContext = getNifiClient().getParamContextClient().createParamContext(
util.createParameterContextEntity(PARAMETER_CONTEXT_NAME, CONTEXT_DESCRIPTION_V1, parameterEntitiesV1));

Expand All @@ -254,12 +270,23 @@ void testParameterDescriptionUpdatedDuringVersionUpgrade() throws NiFiClientExce
util.updateProcessorProperties(processor, Collections.singletonMap(PROCESSOR_PROPERTY_TEXT, PARAMETER_REFERENCE));
util.setAutoTerminatedRelationships(processor, RELATIONSHIP_SUCCESS);

final ControllerServiceEntity sourceService = util.createControllerService(COUNT_SERVICE_TYPE, groupA.getId());
util.updateControllerServiceProperties(sourceService, Map.of(COUNT_SERVICE_START_VALUE_PROPERTY, COUNT_PARAMETER_REFERENCE));

final VersionControlInformationEntity vciV1 = util.startVersionControl(groupA, clientEntity, TEST_FLOWS_BUCKET, DESCRIPTION_UPDATE_FLOW_NAME);
final String flowId = vciV1.getVersionControlInformation().getFlowId();

// Update the parameter description (keeping the same value) and context description, then save as version 2
// Enable the Controller Service on the source side so updating the parameter description exercises
// the "description-only update while a referencing service is ENABLED" path on both source and target.
final ControllerServiceEntity sourceServiceForEnable = getNifiClient().getControllerServicesClient().getControllerService(sourceService.getId());
util.enableControllerService(sourceServiceForEnable);
util.waitForControllerServicesEnabled(groupA.getId(), List.of(sourceService.getId()));

// Update both parameter descriptions (keeping values unchanged) plus the context description, then save as version 2
final ParameterContextEntity currentContext = getNifiClient().getParamContextClient().getParamContext(paramContext.getId(), false);
final Set<ParameterEntity> parameterEntitiesV2 = Set.of(util.createParameterEntity(PARAMETER_NAME, PARAMETER_DESCRIPTION_V2, false, PARAMETER_VALUE));
final Set<ParameterEntity> parameterEntitiesV2 = Set.of(
util.createParameterEntity(PARAMETER_NAME, PARAMETER_DESCRIPTION_V2, false, PARAMETER_VALUE),
util.createParameterEntity(COUNT_PARAMETER_NAME, COUNT_PARAMETER_DESCRIPTION_V2, false, COUNT_PARAMETER_VALUE));
final ParameterContextEntity entityUpdate = util.createParameterContextEntity(PARAMETER_CONTEXT_NAME, CONTEXT_DESCRIPTION_V2, parameterEntitiesV2);
entityUpdate.setId(currentContext.getId());
entityUpdate.setRevision(currentContext.getRevision());
Expand All @@ -271,6 +298,10 @@ void testParameterDescriptionUpdatedDuringVersionUpgrade() throws NiFiClientExce
util.saveFlowVersion(refreshedGroupA, clientEntity, vciV1);

// Clean up the original flow
final ControllerServiceEntity sourceServiceForDisable = getNifiClient().getControllerServicesClient().getControllerService(sourceService.getId());
util.disableControllerService(sourceServiceForDisable);
util.waitForControllerServiceState(groupA.getId(), CONTROLLER_SERVICE_STATE_DISABLED, List.of(sourceService.getId()));

final ProcessGroupEntity groupAForStopVc = getNifiClient().getProcessGroupClient().getProcessGroup(groupA.getId());
getNifiClient().getVersionsClient().stopVersionControl(groupAForStopVc);
util.deleteAll(groupA.getId());
Expand All @@ -288,22 +319,34 @@ void testParameterDescriptionUpdatedDuringVersionUpgrade() throws NiFiClientExce

// Verify version 1 descriptions
final ParameterContextEntity importedContext = getNifiClient().getParamContextClient().getParamContext(importedContextId, false);
final String descriptionAfterV1 = getParameterDescription(importedContext, PARAMETER_NAME);
assertEquals(PARAMETER_DESCRIPTION_V1, descriptionAfterV1);
assertEquals(PARAMETER_DESCRIPTION_V1, getParameterDescription(importedContext, PARAMETER_NAME));
assertEquals(COUNT_PARAMETER_DESCRIPTION_V1, getParameterDescription(importedContext, COUNT_PARAMETER_NAME));
assertEquals(CONTEXT_DESCRIPTION_V1, importedContext.getComponent().getDescription());

// Upgrade from version 1 to version 2
// Enable the imported Controller Service before the upgrade. With the description-only fix, the
// subsequent upgrade must succeed without requiring the service to be disabled first.
final ControllerServicesEntity servicesInImportedGroup = getNifiClient().getFlowClient().getControllerServices(importedGroup.getId());
assertEquals(1, servicesInImportedGroup.getControllerServices().size());
final ControllerServiceEntity importedService = servicesInImportedGroup.getControllerServices().iterator().next();
util.enableControllerService(importedService);
util.waitForControllerServicesEnabled(importedGroup.getId(), List.of(importedService.getId()));

// Upgrade from version 1 to version 2 while the referencing Controller Service is ENABLED
util.changeFlowVersion(importedGroup.getId(), VERSION_2);

// Verify descriptions were updated to version 2
final ParameterContextEntity contextAfterUpgrade = getNifiClient().getParamContextClient().getParamContext(importedContextId, false);
final String descriptionAfterV2 = getParameterDescription(contextAfterUpgrade, PARAMETER_NAME);
assertEquals(PARAMETER_DESCRIPTION_V2, descriptionAfterV2);
assertEquals(PARAMETER_DESCRIPTION_V2, getParameterDescription(contextAfterUpgrade, PARAMETER_NAME));
assertEquals(COUNT_PARAMETER_DESCRIPTION_V2, getParameterDescription(contextAfterUpgrade, COUNT_PARAMETER_NAME));
assertEquals(CONTEXT_DESCRIPTION_V2, contextAfterUpgrade.getComponent().getDescription());

// Verify the value was not changed
final String valueAfterUpgrade = getParameterValue(contextAfterUpgrade, PARAMETER_NAME);
assertEquals(PARAMETER_VALUE, valueAfterUpgrade);
// Verify the parameter values were not changed
assertEquals(PARAMETER_VALUE, getParameterValue(contextAfterUpgrade, PARAMETER_NAME));
assertEquals(COUNT_PARAMETER_VALUE, getParameterValue(contextAfterUpgrade, COUNT_PARAMETER_NAME));

// Verify the Controller Service remained ENABLED throughout the upgrade
final ControllerServiceEntity serviceAfterUpgrade = getNifiClient().getControllerServicesClient().getControllerService(importedService.getId());
assertEquals(CONTROLLER_SERVICE_STATE_ENABLED, serviceAfterUpgrade.getComponent().getState());
}

private String getParameterDescription(final ParameterContextEntity context, final String parameterName) {
Expand Down
Loading