diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockNiFiConnectorWebContext.java b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockNiFiConnectorWebContext.java index 9e8891ee5270..ba585633edff 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockNiFiConnectorWebContext.java +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockNiFiConnectorWebContext.java @@ -19,6 +19,7 @@ import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.components.FlowContext; import org.apache.nifi.web.NiFiConnectorWebContext; @@ -38,7 +39,8 @@ public MockNiFiConnectorWebContext(final ConnectorRepository connectorRepository @Override @SuppressWarnings("unchecked") public ConnectorWebContext getConnectorWebContext(final String connectorId) throws IllegalArgumentException { - final ConnectorNode connectorNode = connectorRepository.getConnector(connectorId); + // Test runner: there is no real ConnectorConfigurationProvider in this context, so syncing would be wasted work. + final ConnectorNode connectorNode = connectorRepository.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); if (connectorNode == null) { throw new IllegalArgumentException("Unable to find connector with id: " + connectorId); } diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/test/java/org/apache/nifi/mock/connector/server/MockNiFiConnectorWebContextTest.java b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/test/java/org/apache/nifi/mock/connector/server/MockNiFiConnectorWebContextTest.java index 659fe2e8fb73..115e306188a6 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/test/java/org/apache/nifi/mock/connector/server/MockNiFiConnectorWebContextTest.java +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/test/java/org/apache/nifi/mock/connector/server/MockNiFiConnectorWebContextTest.java @@ -20,6 +20,7 @@ import org.apache.nifi.components.connector.Connector; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.web.NiFiConnectorWebContext; import org.apache.nifi.web.NiFiConnectorWebContext.ConnectorWebContext; @@ -55,7 +56,7 @@ class MockNiFiConnectorWebContextTest { @Test void testGetConnectorWebContextReturnsConnectorAndFlowContexts() { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getConnector()).thenReturn(connector); when(connectorNode.getWorkingFlowContext()).thenReturn(workingFlowContext); when(connectorNode.getActiveFlowContext()).thenReturn(activeFlowContext); @@ -71,7 +72,7 @@ void testGetConnectorWebContextReturnsConnectorAndFlowContexts() { @Test void testGetConnectorWebContextThrowsForUnknownConnector() { - when(connectorRepository.getConnector("unknown-id")).thenReturn(null); + when(connectorRepository.getConnector("unknown-id", ConnectorSyncMode.LOCAL_ONLY)).thenReturn(null); final NiFiConnectorWebContext context = new MockNiFiConnectorWebContext(connectorRepository); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java index 6c0a3ba43ea1..d8d06bc79efa 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java @@ -89,16 +89,26 @@ public interface ConnectorRepository { void removeConnector(String connectorId); /** - * Gets the Connector with the given identifier + * Gets the Connector with the given identifier. + * * @param identifier the identifier of the Connector to get + * @param syncMode whether to consult the {@link ConnectorConfigurationProvider} (if configured) + * to refresh local state from the external store before returning, or whether + * to return the in-memory copy as-is. See {@link ConnectorSyncMode}. * @return the Connector with the given identifier, or null if no such Connector exists */ - ConnectorNode getConnector(String identifier); + ConnectorNode getConnector(String identifier, ConnectorSyncMode syncMode); /** + * Returns all Connectors in the Repository. + * + * @param syncMode whether to consult the {@link ConnectorConfigurationProvider} (if configured) + * to refresh each connector's local state from the external store before + * returning, or whether to return the in-memory copies as-is. See + * {@link ConnectorSyncMode}. * @return all Connectors in the Repository */ - List getConnectors(); + List getConnectors(ConnectorSyncMode syncMode); /** * Starts the given Connector, managing any appropriate lifecycle events. diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorSyncMode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorSyncMode.java new file mode 100644 index 000000000000..9d970eee044c --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorSyncMode.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +/** + * Indicates whether retrieval of {@link ConnectorNode} instances from the + * {@link ConnectorRepository} should consult the configured + * {@link ConnectorConfigurationProvider} to refresh the local state from the + * external store, or whether the in-memory copy should be returned as-is. + * + *

Choosing {@link #SYNC_WITH_PROVIDER} causes the repository to call into + * the configuration provider (which may perform network I/O, secret lookups, + * and other potentially expensive or failure-prone operations) before returning + * the connector. This is appropriate for user-facing reads that must reflect + * the latest external state, such as REST API responses. Callers operating on + * critical paths that must not block on or fail because of the external store + * (for example, periodic flow serialization) should use {@link #LOCAL_ONLY} + * instead.

+ */ +public enum ConnectorSyncMode { + + /** + * Consult the {@link ConnectorConfigurationProvider} (when configured) to + * refresh the local connector state from the external store before + * returning. This may perform network I/O and may throw if the external + * store is unavailable. + */ + SYNC_WITH_PROVIDER, + + /** + * Return the in-memory connector state without consulting the + * {@link ConnectorConfigurationProvider}. No external I/O is performed. + */ + LOCAL_ONLY +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java index ffd8f2e7a764..3e2d8ede2eab 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java @@ -459,19 +459,23 @@ private void purgeConnectorAndAwait(final ConnectorNode connector) { } @Override - public ConnectorNode getConnector(final String identifier) { + public ConnectorNode getConnector(final String identifier, final ConnectorSyncMode syncMode) { + Objects.requireNonNull(syncMode, "syncMode is required"); final ConnectorNode connector = connectors.get(identifier); - if (connector != null) { + if (connector != null && syncMode == ConnectorSyncMode.SYNC_WITH_PROVIDER) { syncFromProvider(connector); } return connector; } @Override - public List getConnectors() { + public List getConnectors(final ConnectorSyncMode syncMode) { + Objects.requireNonNull(syncMode, "syncMode is required"); final List connectorList = List.copyOf(connectors.values()); - for (final ConnectorNode connector : connectorList) { - syncFromProvider(connector); + if (syncMode == ConnectorSyncMode.SYNC_WITH_PROVIDER) { + for (final ConnectorNode connector : connectorList) { + syncFromProvider(connector); + } } return connectorList; } @@ -665,7 +669,9 @@ private void collectReferencedAssetIds(final FrameworkFlowContext flowContext, f @Override public void updateConnector(final ConnectorNode connector, final String name) { if (configurationProvider != null) { - final ConnectorWorkingConfiguration workingConfiguration = buildWorkingConfiguration(connector); + // Load the latest provider state so that other in-flight working changes are not overwritten by a rename. + final Optional externalConfig = configurationProvider.load(connector.getIdentifier()); + final ConnectorWorkingConfiguration workingConfiguration = externalConfig.orElseGet(() -> buildWorkingConfiguration(connector)); workingConfiguration.setName(name); configurationProvider.save(connector.getIdentifier(), workingConfiguration); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 8b92b5e411ed..bbdb6d1898dc 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -55,6 +55,7 @@ import org.apache.nifi.components.connector.ConnectorRepository; import org.apache.nifi.components.connector.ConnectorRepositoryInitializationContext; import org.apache.nifi.components.connector.ConnectorRequestReplicator; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.ConnectorValidationTrigger; import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.components.connector.StandardConnectorConfigurationProviderInitializationContext; @@ -1050,7 +1051,7 @@ public Connection findConnectionIncludingConnectorManaged(final String connectio return connection; } - for (final ConnectorNode connector : connectorRepository.getConnectors()) { + for (final ConnectorNode connector : connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)) { final FrameworkFlowContext flowContext = connector.getActiveFlowContext(); if (flowContext != null) { final ProcessGroup managedGroup = flowContext.getManagedProcessGroup(); @@ -1077,7 +1078,7 @@ public RemoteGroupPort findRemoteGroupPortIncludingConnectorManaged(final String return remoteGroupPort; } - for (final ConnectorNode connector : connectorRepository.getConnectors()) { + for (final ConnectorNode connector : connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)) { final FrameworkFlowContext flowContext = connector.getActiveFlowContext(); if (flowContext != null) { final ProcessGroup managedGroup = flowContext.getManagedProcessGroup(); @@ -1507,7 +1508,7 @@ public void trigger(final ComponentNode component) { LOG.info("Starting {} Connectors", startConnectorsAfterInitialization.size()); for (final ConnectorNode connectorNode : startConnectorsAfterInitialization) { try { - final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier()); + final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier(), ConnectorSyncMode.LOCAL_ONLY); if (existingConnector == null) { LOG.debug("Will not start {} because it no longer exists", connectorNode); continue; @@ -1539,7 +1540,7 @@ public void trigger(final ComponentNode component) { // Explicitly stop Connectors so that their state is properly transitioned from UPDATED to STOPPED. for (final ConnectorNode connectorNode : startConnectorsAfterInitialization) { try { - final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier()); + final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier(), ConnectorSyncMode.LOCAL_ONLY); if (existingConnector != null) { connectorRepository.stopConnector(connectorNode); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java index 144f3cde8c56..0e54e27c0634 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java @@ -32,6 +32,7 @@ import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; import org.apache.nifi.components.connector.ConnectorStateTransition; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.FlowContextFactory; import org.apache.nifi.components.connector.ProcessGroupFactory; import org.apache.nifi.components.connector.StandardComponentBundleLookup; @@ -853,12 +854,12 @@ private void gatherParameterContexts(final ProcessGroup sourceGroup, final Map getAllConnectors() { - return flowController.getConnectorRepository().getConnectors(); + return flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY); } @Override public ConnectorNode getConnector(final String id) { - return flowController.getConnectorRepository().getConnector(id); + return flowController.getConnectorRepository().getConnector(id, ConnectorSyncMode.LOCAL_ONLY); } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java index ccf5391707e9..7e74a7f2389b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.serialization; import org.apache.nifi.components.connector.ConnectorNode; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.FlowAnalysisRuleNode; import org.apache.nifi.controller.FlowController; @@ -97,7 +98,7 @@ public VersionedDataflow createMapping() { private List mapConnectors() { final List connectors = new ArrayList<>(); - for (final ConnectorNode connectorNode : flowController.getConnectorRepository().getConnectors()) { + for (final ConnectorNode connectorNode : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) { final VersionedConnector versionedConnector = flowMapper.mapConnector(connectorNode); if (flowController.isStartAfterInitialization(connectorNode)) { versionedConnector.setScheduledState(ScheduledState.RUNNING); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java index f2893fca8b11..52fc732be7ad 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java @@ -28,6 +28,7 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.ConnectorSyncResult; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.connectable.Connectable; @@ -1058,7 +1059,7 @@ private void inheritConnectors(final FlowController flowController, final Versio } } - for (final ConnectorNode existingConnector : connectorRepository.getConnectors()) { + for (final ConnectorNode existingConnector : connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)) { if (!proposedConnectorIds.contains(existingConnector.getIdentifier())) { logger.info("Connector [{}] (state={}) is no longer part of the proposed flow. Stopping and removing.", existingConnector.getIdentifier(), existingConnector.getCurrentState()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ConnectionDiagnosticTask.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ConnectionDiagnosticTask.java index a7294307ac4b..eff5c724a579 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ConnectionDiagnosticTask.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ConnectionDiagnosticTask.java @@ -18,6 +18,7 @@ import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -60,7 +61,7 @@ public DiagnosticsDumpElement captureDump(final boolean verbose) { details.add(""); final ConnectorRepository connectorRepository = flowController.getConnectorRepository(); - final List connectors = connectorRepository != null ? connectorRepository.getConnectors() : List.of(); + final List connectors = connectorRepository != null ? connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY) : List.of(); if (connectors.isEmpty()) { details.add("This instance has no Connectors."); } else { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java index 6b321eca8e86..0ae7e272a895 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java @@ -263,7 +263,7 @@ private ConnectorNode initializeDynamicFlowConnector() { final DynamicFlowConnector flowConnector = (DynamicFlowConnector) connector; assertTrue(flowConnector.isInitialized()); - assertEquals(List.of(connectorNode), connectorRepository.getConnectors()); + assertEquals(List.of(connectorNode), connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)); final ProcessGroup rootGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); assertEquals(3, rootGroup.getProcessGroups().size()); @@ -282,7 +282,7 @@ private ConnectorNode initializeDynamicFlowConnector() { private ConnectorNode initializeParameterConnector() { final ConnectorNode connectorNode = flowManager.createConnector(ParameterConnector.class.getName(), "parameter-connector", SystemBundle.SYSTEM_BUNDLE_COORDINATE, true, true); assertNotNull(connectorNode); - assertEquals(List.of(connectorNode), connectorRepository.getConnectors()); + assertEquals(List.of(connectorNode), connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)); final ProcessGroup rootGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); assertEquals(3, rootGroup.getProcessors().size()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java index 0bcfd2736b2e..8560cb6f8bca 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java @@ -75,7 +75,7 @@ public void testAddAndGetConnectors() { repository.addConnector(connector1); repository.addConnector(connector2); - final List connectors = repository.getConnectors(); + final List connectors = repository.getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER); assertEquals(2, connectors.size()); assertTrue(connectors.contains(connector1)); assertTrue(connectors.contains(connector2)); @@ -98,8 +98,8 @@ public void testRemoveConnector() { repository.addConnector(connector); repository.removeConnector("connector-1"); - assertEquals(0, repository.getConnectors().size()); - assertNull(repository.getConnector("connector-1")); + assertEquals(0, repository.getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER).size()); + assertNull(repository.getConnector("connector-1", ConnectorSyncMode.SYNC_WITH_PROVIDER)); } @Test @@ -110,8 +110,8 @@ public void testRestoreConnector() { when(connector.getIdentifier()).thenReturn("connector-1"); repository.restoreConnector(connector); - assertEquals(1, repository.getConnectors().size()); - assertEquals(connector, repository.getConnector("connector-1")); + assertEquals(1, repository.getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER).size()); + assertEquals(connector, repository.getConnector("connector-1", ConnectorSyncMode.SYNC_WITH_PROVIDER)); } @Test @@ -126,8 +126,8 @@ public void testGetConnectorsReturnsNewListInstances() { repository.addConnector(connector1); repository.addConnector(connector2); - final List connectors1 = repository.getConnectors(); - final List connectors2 = repository.getConnectors(); + final List connectors1 = repository.getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER); + final List connectors2 = repository.getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER); assertEquals(2, connectors1.size()); assertEquals(2, connectors2.size()); @@ -147,9 +147,9 @@ public void testAddConnectorWithDuplicateIdReplaces() { repository.addConnector(connector1); repository.addConnector(connector2); - final List connectors = repository.getConnectors(); + final List connectors = repository.getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER); assertEquals(1, connectors.size()); - assertEquals(connector2, repository.getConnector("same-id")); + assertEquals(connector2, repository.getConnector("same-id", ConnectorSyncMode.SYNC_WITH_PROVIDER)); } @Test @@ -233,7 +233,7 @@ public void testGetConnectorWithProviderOverridesWorkingConfig() { externalConfig.setWorkingFlowConfiguration(List.of(externalStep)); when(provider.load("connector-1")).thenReturn(Optional.of(externalConfig)); - final ConnectorNode result = repository.getConnector("connector-1"); + final ConnectorNode result = repository.getConnector("connector-1", ConnectorSyncMode.SYNC_WITH_PROVIDER); assertNotNull(result); verify(connector).setName("External Name"); @@ -250,7 +250,7 @@ public void testGetConnectorWithProviderReturnsEmpty() { when(provider.load("connector-1")).thenReturn(Optional.empty()); - final ConnectorNode result = repository.getConnector("connector-1"); + final ConnectorNode result = repository.getConnector("connector-1", ConnectorSyncMode.SYNC_WITH_PROVIDER); assertNotNull(result); verify(connector, never()).setName(anyString()); @@ -266,7 +266,7 @@ public void testGetConnectorWithProviderThrowsException() { when(provider.load("connector-1")).thenThrow(new ConnectorConfigurationProviderException("Provider failure")); - assertThrows(ConnectorConfigurationProviderException.class, () -> repository.getConnector("connector-1")); + assertThrows(ConnectorConfigurationProviderException.class, () -> repository.getConnector("connector-1", ConnectorSyncMode.SYNC_WITH_PROVIDER)); verify(connector, never()).setName(anyString()); } @@ -277,12 +277,45 @@ public void testGetConnectorWithNullProvider() { final ConnectorNode connector = createSimpleConnectorNode("connector-1", "Original Name"); repository.addConnector(connector); - final ConnectorNode result = repository.getConnector("connector-1"); + final ConnectorNode result = repository.getConnector("connector-1", ConnectorSyncMode.SYNC_WITH_PROVIDER); assertNotNull(result); verify(connector, never()).setName(anyString()); } + @Test + public void testGetConnectorLocalOnlyDoesNotCallProvider() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final StandardConnectorRepository repository = createRepositoryWithProvider(provider); + + final ConnectorNode connector = createSimpleConnectorNode("connector-1", "Local Name"); + repository.restoreConnector(connector); + + final ConnectorNode result = repository.getConnector("connector-1", ConnectorSyncMode.LOCAL_ONLY); + + assertNotNull(result); + verifyNoInteractions(provider); + verify(connector, never()).setName(anyString()); + } + + @Test + public void testGetConnectorsLocalOnlyDoesNotCallProvider() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final StandardConnectorRepository repository = createRepositoryWithProvider(provider); + + final ConnectorNode connector1 = createSimpleConnectorNode("connector-1", "Local Name 1"); + final ConnectorNode connector2 = createSimpleConnectorNode("connector-2", "Local Name 2"); + repository.restoreConnector(connector1); + repository.restoreConnector(connector2); + + final List results = repository.getConnectors(ConnectorSyncMode.LOCAL_ONLY); + + assertEquals(2, results.size()); + verifyNoInteractions(provider); + verify(connector1, never()).setName(anyString()); + verify(connector2, never()).setName(anyString()); + } + @Test public void testGetConnectorsWithProviderOverrides() { final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); @@ -301,7 +334,7 @@ public void testGetConnectorsWithProviderOverrides() { when(provider.load("connector-1")).thenReturn(Optional.of(externalConfig1)); when(provider.load("connector-2")).thenReturn(Optional.empty()); - final List results = repository.getConnectors(); + final List results = repository.getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER); assertEquals(2, results.size()); verify(connector1).setName("External Name 1"); @@ -675,7 +708,7 @@ public void testSyncFromProviderAppliesNifiUuidsDirectly() { config.setWorkingFlowConfiguration(List.of(step)); when(provider.load("connector-1")).thenReturn(Optional.of(config)); - repository.getConnector("connector-1"); + repository.getConnector("connector-1", ConnectorSyncMode.SYNC_WITH_PROVIDER); // Working config is updated with NiFi UUIDs as-is -- no translation in the repository verify(workingConfigContext).replaceProperties(eq("step1"), any(StepConfiguration.class)); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java index 47b5c2c02c1a..2684627afc3b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java @@ -20,6 +20,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.ConnectorSyncResult; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ReportingTaskNode; @@ -289,7 +290,7 @@ private void setFlowController(final ConnectorRepository connectorRepository) { when(flowController.createVersionedComponentStateLookup(any())).thenReturn(stateLookup); when(flowController.getControllerServiceProvider()).thenReturn(controllerServiceProvider); - when(connectorRepository.getConnectors()).thenReturn(Collections.emptyList()); + when(connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)).thenReturn(Collections.emptyList()); when(flowController.getConnectorRepository()).thenReturn(connectorRepository); } @@ -376,7 +377,7 @@ void testSyncOrphanConnectorIsRemoved() { .thenReturn(java.util.concurrent.CompletableFuture.completedFuture(null)); setFlowController(connectorRepository); - when(connectorRepository.getConnectors()).thenReturn(List.of(orphanConnector)); + when(connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)).thenReturn(List.of(orphanConnector)); when(versionedDataflow.getConnectors()).thenReturn(List.of(proposedConnector)); versionedFlowSynchronizer.sync(flowController, dataFlow, flowService, BundleUpdateStrategy.USE_SPECIFIED_OR_GHOST); @@ -410,7 +411,7 @@ void testSyncOrphanConnectorNotRemovedWhenInProposedFlow() { .thenReturn(java.util.concurrent.CompletableFuture.completedFuture(null)); setFlowController(connectorRepository); - when(connectorRepository.getConnectors()).thenReturn(List.of(existingConnector)); + when(connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)).thenReturn(List.of(existingConnector)); when(versionedDataflow.getConnectors()).thenReturn(List.of(versionedConnector)); versionedFlowSynchronizer.sync(flowController, dataFlow, flowService, BundleUpdateStrategy.USE_SPECIFIED_OR_GHOST); @@ -447,7 +448,7 @@ void testSyncOrphanRemovalFailureMarksInvalid() { .thenReturn(java.util.concurrent.CompletableFuture.completedFuture(null)); setFlowController(connectorRepository); - when(connectorRepository.getConnectors()).thenReturn(List.of(orphanConnector)); + when(connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)).thenReturn(List.of(orphanConnector)); when(versionedDataflow.getConnectors()).thenReturn(List.of(proposedConnector)); versionedFlowSynchronizer.sync(flowController, dataFlow, flowService, BundleUpdateStrategy.USE_SPECIFIED_OR_GHOST); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ConnectorAuditor.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ConnectorAuditor.java index 57c762451fe3..9108e3a3b349 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ConnectorAuditor.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ConnectorAuditor.java @@ -27,6 +27,7 @@ import org.apache.nifi.components.connector.ConnectorConfiguration; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorState; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.ConnectorValueReference; import org.apache.nifi.components.connector.NamedStepConfiguration; import org.apache.nifi.components.connector.SecretReference; @@ -95,7 +96,7 @@ public ConnectorNode createConnectorAdvice(final ProceedingJoinPoint proceedingJ + "args(connectorId) && " + "target(connectorDAO)") public void removeConnectorAdvice(final ProceedingJoinPoint proceedingJoinPoint, final String connectorId, final ConnectorDAO connectorDAO) throws Throwable { - final ConnectorNode connector = connectorDAO.getConnector(connectorId); + final ConnectorNode connector = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); proceedingJoinPoint.proceed(); @@ -118,7 +119,7 @@ public void removeConnectorAdvice(final ProceedingJoinPoint proceedingJoinPoint, + "args(connectorId) && " + "target(connectorDAO)") public void startConnectorAdvice(final ProceedingJoinPoint proceedingJoinPoint, final String connectorId, final ConnectorDAO connectorDAO) throws Throwable { - final ConnectorNode connector = connectorDAO.getConnector(connectorId); + final ConnectorNode connector = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); final ConnectorState previousState = connector.getCurrentState(); proceedingJoinPoint.proceed(); @@ -144,7 +145,7 @@ public void startConnectorAdvice(final ProceedingJoinPoint proceedingJoinPoint, + "args(connectorId) && " + "target(connectorDAO)") public void stopConnectorAdvice(final ProceedingJoinPoint proceedingJoinPoint, final String connectorId, final ConnectorDAO connectorDAO) throws Throwable { - final ConnectorNode connector = connectorDAO.getConnector(connectorId); + final ConnectorNode connector = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); final ConnectorState previousState = connector.getCurrentState(); proceedingJoinPoint.proceed(); @@ -173,9 +174,8 @@ public void stopConnectorAdvice(final ProceedingJoinPoint proceedingJoinPoint, f + "target(connectorDAO)") public void updateConfigurationStepAdvice(final ProceedingJoinPoint proceedingJoinPoint, final String connectorId, final String configurationStepName, final ConfigurationStepConfigurationDTO configurationStepConfiguration, final ConnectorDAO connectorDAO) throws Throwable { - final ConnectorNode connector = connectorDAO.getConnector(connectorId); + final ConnectorNode connector = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); - // Capture the current property values before the update (flat map: property name -> value) final Map previousValues = extractCurrentPropertyValues(connector, configurationStepName); proceedingJoinPoint.proceed(); @@ -346,7 +346,7 @@ private String formatPropertyName(final String stepName, final String groupName, + "args(connectorId) && " + "target(connectorDAO)") public void applyConnectorUpdateAdvice(final ProceedingJoinPoint proceedingJoinPoint, final String connectorId, final ConnectorDAO connectorDAO) throws Throwable { - final ConnectorNode connector = connectorDAO.getConnector(connectorId); + final ConnectorNode connector = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); proceedingJoinPoint.proceed(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java index 9e9339493cc6..be703d121f43 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java @@ -31,6 +31,7 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Port; @@ -660,7 +661,7 @@ public Authorizable getAuthorizableFromResource(final String resource) { @Override public Authorizable getConnector(final String connectorId) { - return connectorDAO.getConnector(connectorId); + return connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); } private Authorizable handleResourceTypeContainingOtherResourceType(final String resource, final ResourceType resourceType) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 1b03128abe42..a510e774c2b8 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -81,6 +81,7 @@ import org.apache.nifi.components.connector.Connector; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorState; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.ConnectorUpdateContext; import org.apache.nifi.components.connector.Secret; import org.apache.nifi.components.connector.secrets.AuthorizableSecret; @@ -2135,7 +2136,7 @@ public ComponentStateDTO clearConnectorControllerServiceState(final String conne } private ProcessorNode locateConnectorProcessor(final String connectorId, final String processorId) { - final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId); + final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); final ProcessorNode processor = managedGroup.findProcessor(processorId); if (processor == null) { @@ -2145,7 +2146,7 @@ private ProcessorNode locateConnectorProcessor(final String connectorId, final S } private ControllerServiceNode locateConnectorControllerService(final String connectorId, final String controllerServiceId) { - final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId); + final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); final ControllerServiceNode controllerService = managedGroup.findControllerService(controllerServiceId, false, true); if (controllerService == null) { @@ -3558,7 +3559,7 @@ public ConnectorEntity createConnector(final Revision revision, final ConnectorD return new StandardRevisionUpdate<>(dto, lastMod); }); - final ConnectorNode connector = connectorDAO.getConnector(snapshot.getComponent().getId()); + final ConnectorNode connector = connectorDAO.getConnector(snapshot.getComponent().getId(), ConnectorSyncMode.LOCAL_ONLY); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connector); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(connector)); final ConnectorStatusDTO status = createConnectorStatusDto(connector); @@ -3627,13 +3628,13 @@ public ConnectorEntity updateConnector(final Revision revision, final ConnectorD controllerFacade.save(); - final ConnectorNode node = connectorDAO.getConnector(connectorDTO.getId()); + final ConnectorNode node = connectorDAO.getConnector(connectorDTO.getId(), ConnectorSyncMode.LOCAL_ONLY); final ConnectorDTO dto = dtoFactory.createConnectorDto(node); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); - final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId()); + final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId(), ConnectorSyncMode.LOCAL_ONLY); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(node); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(node)); final ConnectorStatusDTO statusDto = createConnectorStatusDto(node); @@ -3683,13 +3684,13 @@ public ConnectorEntity scheduleConnector(final Revision revision, final String i } controllerFacade.save(); - final ConnectorNode node = connectorDAO.getConnector(id); + final ConnectorNode node = connectorDAO.getConnector(id, ConnectorSyncMode.LOCAL_ONLY); final ConnectorDTO dto = dtoFactory.createConnectorDto(node); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); - final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId()); + final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId(), ConnectorSyncMode.LOCAL_ONLY); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(node); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(node)); final ConnectorStatusDTO statusDto = createConnectorStatusDto(node); @@ -3698,7 +3699,7 @@ public ConnectorEntity scheduleConnector(final Revision revision, final String i @Override public void verifyDrainConnector(final String id) { - final ConnectorNode connector = connectorDAO.getConnector(id); + final ConnectorNode connector = connectorDAO.getConnector(id, ConnectorSyncMode.LOCAL_ONLY); final ConnectorState currentState = connector.getCurrentState(); if (currentState != ConnectorState.STOPPED) { throw new IllegalStateException("Cannot drain FlowFiles for Connector " + id + " because it is not currently stopped. Current state: " + currentState); @@ -3714,13 +3715,13 @@ public ConnectorEntity drainConnector(final Revision revision, final String id) connectorDAO.drainFlowFiles(id); controllerFacade.save(); - final ConnectorNode node = connectorDAO.getConnector(id); + final ConnectorNode node = connectorDAO.getConnector(id, ConnectorSyncMode.LOCAL_ONLY); final ConnectorDTO dto = dtoFactory.createConnectorDto(node); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); - final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId()); + final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId(), ConnectorSyncMode.LOCAL_ONLY); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(node); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(node)); final ConnectorStatusDTO statusDto = createConnectorStatusDto(node); @@ -3741,13 +3742,13 @@ public ConnectorEntity cancelConnectorDrain(final Revision revision, final Strin connectorDAO.cancelDrainFlowFiles(id); controllerFacade.save(); - final ConnectorNode node = connectorDAO.getConnector(id); + final ConnectorNode node = connectorDAO.getConnector(id, ConnectorSyncMode.LOCAL_ONLY); final ConnectorDTO dto = dtoFactory.createConnectorDto(node); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); - final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId()); + final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId(), ConnectorSyncMode.LOCAL_ONLY); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(node); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(node)); final ConnectorStatusDTO statusDto = createConnectorStatusDto(node); @@ -3778,12 +3779,10 @@ public ConfigurationStepEntity updateConnectorConfigurationStep(final Revision r final RevisionClaim claim = new StandardRevisionClaim(revision); final RevisionUpdate snapshot = revisionManager.updateRevision(claim, user, () -> { - // Update the configuration step connectorDAO.updateConnectorConfigurationStep(id, configurationStepName, configurationStepConfiguration); controllerFacade.save(); - // Return updated connector DTO - final ConnectorNode node = connectorDAO.getConnector(id); + final ConnectorNode node = connectorDAO.getConnector(id, ConnectorSyncMode.LOCAL_ONLY); final ConnectorDTO dto = dtoFactory.createConnectorDto(node); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); @@ -3803,13 +3802,13 @@ public ConnectorEntity applyConnectorUpdate(final Revision revision, final Strin final RevisionUpdate snapshot = revisionManager.updateRevision(claim, user, () -> { connectorDAO.applyConnectorUpdate(connectorId, updateContext); - final ConnectorNode node = connectorDAO.getConnector(connectorId); + final ConnectorNode node = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); final ConnectorDTO dto = dtoFactory.createConnectorDto(node); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); - final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId()); + final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId(), ConnectorSyncMode.LOCAL_ONLY); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(node); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(node)); final ConnectorStatusDTO statusDto = createConnectorStatusDto(node); @@ -3824,13 +3823,13 @@ public ConnectorEntity discardConnectorUpdate(final Revision revision, final Str final RevisionUpdate snapshot = revisionManager.updateRevision(claim, user, () -> { connectorDAO.discardWorkingConfiguration(connectorId); - final ConnectorNode node = connectorDAO.getConnector(connectorId); + final ConnectorNode node = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); final ConnectorDTO dto = dtoFactory.createConnectorDto(node); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); return new StandardRevisionUpdate<>(dto, lastMod); }); - final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId()); + final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId(), ConnectorSyncMode.LOCAL_ONLY); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(node); final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(node)); final ConnectorStatusDTO statusDto = createConnectorStatusDto(node); @@ -3839,7 +3838,7 @@ public ConnectorEntity discardConnectorUpdate(final Revision revision, final Str @Override public ProcessGroupFlowEntity getConnectorFlow(final String connectorId, final String processGroupId, final boolean uiOnly) { - final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId); + final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); final ProcessGroup managedProcessGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); final ProcessGroup targetProcessGroup = managedProcessGroup.findProcessGroup(processGroupId); if (targetProcessGroup == null) { @@ -3850,7 +3849,7 @@ public ProcessGroupFlowEntity getConnectorFlow(final String connectorId, final S @Override public ProcessGroupStatusEntity getConnectorProcessGroupStatus(final String id, final Boolean recursive) { - final ConnectorNode connectorNode = connectorDAO.getConnector(id); + final ConnectorNode connectorNode = connectorDAO.getConnector(id, ConnectorSyncMode.LOCAL_ONLY); final ProcessGroup managedProcessGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); final String processGroupId = managedProcessGroup.getIdentifier(); @@ -3873,7 +3872,7 @@ public ProcessGroupStatusEntity getConnectorProcessGroupStatus(final String id, @Override public Set getConnectorControllerServices(final String connectorId, final String processGroupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups, final boolean includeReferencingComponents) { - final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId); + final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); final ProcessGroup managedProcessGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); final ProcessGroup targetProcessGroup = managedProcessGroup.findProcessGroup(processGroupId); if (targetProcessGroup == null) { @@ -3908,7 +3907,7 @@ public List performConnectorConfigurationStepVerifi @Override public SearchResultsDTO searchConnector(final String connectorId, final String query) { - final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId); + final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); final ProcessGroup managedProcessGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); return controllerFacade.searchConnector(query, managedProcessGroup); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java index 70801f1dcdc9..acf2e7f53900 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java @@ -21,6 +21,7 @@ import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.DescribedValue; import org.apache.nifi.components.connector.ConnectorNode; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.ConnectorUpdateContext; import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO; import org.apache.nifi.web.api.dto.ConnectorDTO; @@ -38,6 +39,8 @@ public interface ConnectorDAO { ConnectorNode getConnector(String id); + ConnectorNode getConnector(String id, ConnectorSyncMode syncMode); + List getConnectors(); ConnectorNode createConnector(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded, boolean registerLogObserver); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java index 47aa7a14dc8b..5b4ab5c14205 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java @@ -24,6 +24,7 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.components.connector.ConnectorNode; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; @@ -92,7 +93,7 @@ private Connection locateConnection(final String connectionId, final boolean inc // Optionally search Connector-managed ProcessGroups if (includeConnectorManaged) { - for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors()) { + for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) { final FrameworkFlowContext flowContext = connector.getActiveFlowContext(); if (flowContext != null) { final ProcessGroup managedGroup = flowContext.getManagedProcessGroup(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java index 697e8c5de6c7..9a507e16e5ce 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java @@ -23,6 +23,7 @@ import org.apache.nifi.components.connector.AssetReference; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.ConnectorUpdateContext; import org.apache.nifi.components.connector.ConnectorValueReference; import org.apache.nifi.components.connector.ConnectorValueType; @@ -85,21 +86,24 @@ public void verifyCreate(final ConnectorDTO connectorDTO) { @Override public boolean hasConnector(final String id) { - return getConnectorRepository().getConnector(id) != null; + return getConnectorRepository().getConnector(id, ConnectorSyncMode.LOCAL_ONLY) != null; } @Override public ConnectorNode getConnector(final String id) { - final ConnectorNode connector = getConnectorRepository().getConnector(id); - if (connector == null) { - throw new ResourceNotFoundException("Could not find Connector with ID " + id); - } - return connector; + // Public read returned to clients; must reflect the latest configuration from the provider. + return requireConnector(id, ConnectorSyncMode.SYNC_WITH_PROVIDER); + } + + @Override + public ConnectorNode getConnector(final String id, final ConnectorSyncMode syncMode) { + return requireConnector(id, syncMode); } @Override public List getConnectors() { - return getConnectorRepository().getConnectors(); + // Public read returned to clients; must reflect the latest configuration from the provider. + return getConnectorRepository().getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER); } @Override @@ -111,7 +115,7 @@ public ConnectorNode createConnector(final String type, final String id, final B @Override public void updateConnector(final ConnectorDTO connectorDTO) { - final ConnectorNode connector = getConnector(connectorDTO.getId()); + final ConnectorNode connector = requireConnector(connectorDTO.getId(), ConnectorSyncMode.LOCAL_ONLY); if (connectorDTO.getName() != null) { getConnectorRepository().updateConnector(connector, connectorDTO.getName()); } @@ -125,43 +129,43 @@ public void deleteConnector(final String id) { @Override public void startConnector(final String id) { - final ConnectorNode connector = getConnector(id); + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); getConnectorRepository().startConnector(connector); } @Override public void stopConnector(final String id) { - final ConnectorNode connector = getConnector(id); + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); getConnectorRepository().stopConnector(connector); } @Override public void drainFlowFiles(final String id) { - final ConnectorNode connector = getConnector(id); + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); connector.drainFlowFiles(); } @Override public void cancelDrainFlowFiles(final String id) { - final ConnectorNode connector = getConnector(id); + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); connector.cancelDrainFlowFiles(); } @Override public void verifyCancelDrainFlowFile(final String id) { - final ConnectorNode connector = getConnector(id); + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); connector.verifyCancelDrainFlowFiles(); } @Override public void verifyPurgeFlowFiles(final String id) { - final ConnectorNode connector = getConnector(id); + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); connector.verifyCanPurgeFlowFiles(); } @Override public void purgeFlowFiles(final String id, final String requestor) { - final ConnectorNode connector = getConnector(id); + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); try { connector.purgeFlowFiles(requestor).get(); } catch (final InterruptedException e) { @@ -174,12 +178,12 @@ public void purgeFlowFiles(final String id, final String requestor) { @Override public void updateConnectorConfigurationStep(final String id, final String configurationStepName, final ConfigurationStepConfigurationDTO configurationStepDto) { - final ConnectorNode connector = getConnector(id); + // ConnectorRepository.configureConnector consults the provider directly when merging the step, + // so a sync at the lookup would be redundant. + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); - // Convert DTO to domain object - flatten all property groups into a single StepConfiguration final StepConfiguration stepConfiguration = convertToStepConfiguration(configurationStepDto); - // Update the connector configuration through the repository try { getConnectorRepository().configureConnector(connector, configurationStepName, stepConfiguration); } catch (final Exception e) { @@ -187,6 +191,14 @@ public void updateConnectorConfigurationStep(final String id, final String confi } } + private ConnectorNode requireConnector(final String id, final ConnectorSyncMode syncMode) { + final ConnectorNode connector = getConnectorRepository().getConnector(id, syncMode); + if (connector == null) { + throw new ResourceNotFoundException("Could not find Connector with ID " + id); + } + return connector; + } + private StepConfiguration convertToStepConfiguration(final ConfigurationStepConfigurationDTO dto) { final Map propertyValues = new HashMap<>(); if (dto.getPropertyGroupConfigurations() != null) { @@ -222,7 +234,9 @@ private Set convertToAssetIdentifiers(final List asse @Override public void applyConnectorUpdate(final String id, final ConnectorUpdateContext updateContext) { - final ConnectorNode connector = getConnector(id); + // ConnectorRepository.applyUpdate calls syncAssetsFromProvider internally, which refreshes + // the connector from the provider; a sync at the lookup would be redundant. + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); try { getConnectorRepository().applyUpdate(connector, updateContext); } catch (final Exception e) { @@ -232,19 +246,20 @@ public void applyConnectorUpdate(final String id, final ConnectorUpdateContext u @Override public void discardWorkingConfiguration(final String id) { - final ConnectorNode connector = getConnector(id); + // The working configuration is being thrown away; reading the latest from the provider first serves no purpose. + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); getConnectorRepository().discardWorkingConfiguration(connector); } @Override public void verifyCanVerifyConfigurationStep(final String id, final String configurationStepName) { - // Verify that the connector exists - getConnector(id); + requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); } @Override public List verifyConfigurationStep(final String id, final String configurationStepName, final ConfigurationStepConfigurationDTO configurationStepDto) { - final ConnectorNode connector = getConnector(id); + // syncAssetsFromProvider below performs the provider sync, so the lookup itself does not need to. + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); getConnectorRepository().syncAssetsFromProvider(connector); final StepConfiguration stepConfiguration = convertToStepConfiguration(configurationStepDto); return connector.verifyConfigurationStep(configurationStepName, stepConfiguration); @@ -252,7 +267,7 @@ public List verifyConfigurationStep(final String id, f @Override public List fetchAllowableValues(final String id, final String stepName, final String propertyName, final String filter) { - final ConnectorNode connector = getConnector(id); + final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); if (filter == null || filter.isEmpty()) { return connector.fetchAllowableValues(stepName, propertyName); } else { @@ -262,7 +277,7 @@ public List fetchAllowableValues(final String id, final String s @Override public void verifyCreateAsset(final String id) { - getConnector(id); + requireConnector(id, ConnectorSyncMode.LOCAL_ONLY); } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java index 3de939f586ea..aba366850076 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java @@ -37,6 +37,7 @@ import org.apache.nifi.authorization.user.StandardNiFiUser.Builder; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.connector.ConnectorNode; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.components.connector.Secret; import org.apache.nifi.components.connector.secrets.AuthorizableSecret; @@ -1576,7 +1577,7 @@ public void testSearchConnector() { final FrameworkFlowContext flowContext = mock(FrameworkFlowContext.class); final ProcessGroup managedProcessGroup = mock(ProcessGroup.class); - when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode); + when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getActiveFlowContext()).thenReturn(flowContext); when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup); when(managedProcessGroup.getIdentifier()).thenReturn(managedGroupId); @@ -1589,7 +1590,7 @@ public void testSearchConnector() { final SearchResultsDTO results = serviceFacade.searchConnector(connectorId, searchQuery); assertNotNull(results); - verify(connectorDAO).getConnector(connectorId); + verify(connectorDAO).getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); verify(connectorNode).getActiveFlowContext(); verify(flowContext).getManagedProcessGroup(); verify(controllerFacade).searchConnector(searchQuery, managedProcessGroup); @@ -1809,7 +1810,7 @@ public void testGetConnectorProcessorState() { final Processor processor = mock(Processor.class); final StateMap localStateMap = mock(StateMap.class); - when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode); + when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getActiveFlowContext()).thenReturn(flowContext); when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup); when(managedProcessGroup.findProcessor(processorId)).thenReturn(processorNode); @@ -1824,7 +1825,7 @@ public void testGetConnectorProcessorState() { assertNotNull(result); assertEquals(processorId, result.getComponentId()); - verify(connectorDAO).getConnector(connectorId); + verify(connectorDAO).getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); verify(managedProcessGroup).findProcessor(processorId); verify(componentStateDAO).getState(processorNode, Scope.LOCAL); } @@ -1841,7 +1842,7 @@ public void testGetConnectorProcessorStateNotFound() { final FrameworkFlowContext flowContext = mock(FrameworkFlowContext.class); final ProcessGroup managedProcessGroup = mock(ProcessGroup.class); - when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode); + when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getActiveFlowContext()).thenReturn(flowContext); when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup); when(managedProcessGroup.findProcessor(processorId)).thenReturn(null); @@ -1862,12 +1863,11 @@ public void testVerifyCanClearConnectorProcessorState() { final ProcessGroup managedProcessGroup = mock(ProcessGroup.class); final ProcessorNode processorNode = mock(ProcessorNode.class); - when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode); + when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getActiveFlowContext()).thenReturn(flowContext); when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup); when(managedProcessGroup.findProcessor(processorId)).thenReturn(processorNode); - // Should not throw serviceFacade.verifyCanClearConnectorProcessorState(connectorId, processorId); verify(processorNode).verifyCanClearState(); @@ -1892,7 +1892,7 @@ public void testClearConnectorProcessorState() { final Processor processor = mock(Processor.class); final StateMap localStateMap = mock(StateMap.class); - when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode); + when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getActiveFlowContext()).thenReturn(flowContext); when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup); when(managedProcessGroup.findProcessor(processorId)).thenReturn(processorNode); @@ -1929,7 +1929,7 @@ public void testGetConnectorControllerServiceState() { final ControllerService controllerService = mock(ControllerService.class); final StateMap localStateMap = mock(StateMap.class); - when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode); + when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getActiveFlowContext()).thenReturn(flowContext); when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup); when(managedProcessGroup.findControllerService(controllerServiceId, false, true)).thenReturn(controllerServiceNode); @@ -1944,7 +1944,7 @@ public void testGetConnectorControllerServiceState() { assertNotNull(result); assertEquals(controllerServiceId, result.getComponentId()); - verify(connectorDAO).getConnector(connectorId); + verify(connectorDAO).getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY); verify(managedProcessGroup).findControllerService(controllerServiceId, false, true); verify(componentStateDAO).getState(controllerServiceNode, Scope.LOCAL); } @@ -1961,7 +1961,7 @@ public void testGetConnectorControllerServiceStateNotFound() { final FrameworkFlowContext flowContext = mock(FrameworkFlowContext.class); final ProcessGroup managedProcessGroup = mock(ProcessGroup.class); - when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode); + when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getActiveFlowContext()).thenReturn(flowContext); when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup); when(managedProcessGroup.findControllerService(controllerServiceId, false, true)).thenReturn(null); @@ -1982,12 +1982,11 @@ public void testVerifyCanClearConnectorControllerServiceState() { final ProcessGroup managedProcessGroup = mock(ProcessGroup.class); final ControllerServiceNode controllerServiceNode = mock(ControllerServiceNode.class); - when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode); + when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getActiveFlowContext()).thenReturn(flowContext); when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup); when(managedProcessGroup.findControllerService(controllerServiceId, false, true)).thenReturn(controllerServiceNode); - // Should not throw serviceFacade.verifyCanClearConnectorControllerServiceState(connectorId, controllerServiceId); verify(controllerServiceNode).verifyCanClearState(); @@ -2012,7 +2011,7 @@ public void testClearConnectorControllerServiceState() { final ControllerService controllerService = mock(ControllerService.class); final StateMap localStateMap = mock(StateMap.class); - when(connectorDAO.getConnector(connectorId)).thenReturn(connectorNode); + when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.getActiveFlowContext()).thenReturn(flowContext); when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup); when(managedProcessGroup.findControllerService(controllerServiceId, false, true)).thenReturn(controllerServiceNode); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectionDAOTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectionDAOTest.java index 7aa6dc19d224..db8a22f34e9c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectionDAOTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectionDAOTest.java @@ -18,6 +18,7 @@ import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; @@ -92,7 +93,7 @@ void setUp() { when(rootGroup.findConnection(NON_EXISTENT_ID)).thenReturn(null); // Setup connector managed group - when(connectorRepository.getConnectors()).thenReturn(List.of(connectorNode)); + when(connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)).thenReturn(List.of(connectorNode)); when(connectorNode.getActiveFlowContext()).thenReturn(frameworkFlowContext); when(frameworkFlowContext.getManagedProcessGroup()).thenReturn(connectorManagedGroup); when(connectorManagedGroup.findConnection(CONNECTOR_CONNECTION_ID)).thenReturn(connectorConnection); @@ -184,7 +185,7 @@ void testGetConnectionWithMultipleConnectors() { final Connection connectionInSecondConnector = org.mockito.Mockito.mock(Connection.class); final String secondConnectorConnectionId = "second-connector-connection-id"; - when(connectorRepository.getConnectors()).thenReturn(List.of(connectorNode, connectorNode2)); + when(connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)).thenReturn(List.of(connectorNode, connectorNode2)); when(connectorNode2.getActiveFlowContext()).thenReturn(flowContext2); when(flowContext2.getManagedProcessGroup()).thenReturn(managedGroup2); when(managedGroup2.findConnection(secondConnectorConnectionId)).thenReturn(connectionInSecondConnector); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java index d1c589fd2296..4b2e9f6dd6d1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java @@ -21,6 +21,7 @@ import org.apache.nifi.components.connector.ConnectorConfiguration; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.ConnectorUpdateContext; import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.FrameworkFlowContext; @@ -98,30 +99,30 @@ void setUp() { @Test void testApplyConnectorUpdate() throws Exception { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); connectorDAO.applyConnectorUpdate(CONNECTOR_ID, connectorUpdateContext); - verify(connectorRepository).getConnector(CONNECTOR_ID); + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY); verify(connectorRepository).applyUpdate(connectorNode, connectorUpdateContext); } @Test void testApplyConnectorUpdateWithNonExistentConnector() throws Exception { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(null); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(null); final ResourceNotFoundException exception = assertThrows(ResourceNotFoundException.class, () -> connectorDAO.applyConnectorUpdate(CONNECTOR_ID, connectorUpdateContext) ); assertEquals("Could not find Connector with ID " + CONNECTOR_ID, exception.getMessage()); - verify(connectorRepository).getConnector(CONNECTOR_ID); + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY); verify(connectorRepository, never()).applyUpdate(any(ConnectorNode.class), any(ConnectorUpdateContext.class)); } @Test void testApplyConnectorUpdateWithFlowUpdateException() throws Exception { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); doThrow(new FlowUpdateException("Flow update failed")).when(connectorRepository).applyUpdate(connectorNode, connectorUpdateContext); final NiFiCoreException exception = assertThrows(NiFiCoreException.class, () -> @@ -129,13 +130,13 @@ void testApplyConnectorUpdateWithFlowUpdateException() throws Exception { ); assertEquals("Failed to apply connector update: org.apache.nifi.components.connector.FlowUpdateException: Flow update failed", exception.getMessage()); - verify(connectorRepository).getConnector(CONNECTOR_ID); + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY); verify(connectorRepository).applyUpdate(connectorNode, connectorUpdateContext); } @Test void testApplyConnectorUpdateWithRuntimeException() throws Exception { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); doThrow(new RuntimeException("Test exception")).when(connectorRepository).applyUpdate(connectorNode, connectorUpdateContext); final NiFiCoreException exception = assertThrows(NiFiCoreException.class, () -> @@ -143,13 +144,13 @@ void testApplyConnectorUpdateWithRuntimeException() throws Exception { ); assertEquals("Failed to apply connector update: java.lang.RuntimeException: Test exception", exception.getMessage()); - verify(connectorRepository).getConnector(CONNECTOR_ID); + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY); verify(connectorRepository).applyUpdate(connectorNode, connectorUpdateContext); } @Test void testApplyConnectorUpdateWithNullException() throws Exception { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); doThrow(new RuntimeException()).when(connectorRepository).applyUpdate(connectorNode, connectorUpdateContext); final NiFiCoreException exception = assertThrows(NiFiCoreException.class, () -> @@ -157,29 +158,51 @@ void testApplyConnectorUpdateWithNullException() throws Exception { ); assertEquals("Failed to apply connector update: java.lang.RuntimeException", exception.getMessage()); - verify(connectorRepository).getConnector(CONNECTOR_ID); + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY); verify(connectorRepository).applyUpdate(connectorNode, connectorUpdateContext); } @Test void testGetConnectorWithNonExistentId() { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(null); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.SYNC_WITH_PROVIDER)).thenReturn(null); assertThrows(ResourceNotFoundException.class, () -> connectorDAO.getConnector(CONNECTOR_ID) ); - verify(connectorRepository).getConnector(CONNECTOR_ID); + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.SYNC_WITH_PROVIDER); } @Test void testGetConnectorSuccess() { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.SYNC_WITH_PROVIDER)).thenReturn(connectorNode); final ConnectorNode result = connectorDAO.getConnector(CONNECTOR_ID); assertEquals(connectorNode, result); - verify(connectorRepository).getConnector(CONNECTOR_ID); + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.SYNC_WITH_PROVIDER); + } + + @Test + void testGetConnectorLocalOnly() { + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); + + final ConnectorNode result = connectorDAO.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY); + + assertEquals(connectorNode, result); + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY); + verify(connectorRepository, never()).getConnector(CONNECTOR_ID, ConnectorSyncMode.SYNC_WITH_PROVIDER); + } + + @Test + void testGetConnectorLocalOnlyWithNonExistentId() { + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(null); + + assertThrows(ResourceNotFoundException.class, () -> + connectorDAO.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY) + ); + + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY); } @Test @@ -188,7 +211,7 @@ void testFetchAllowableValuesWithoutFilter() { new AllowableValue("value1", "Value 1", "First value"), new AllowableValue("value2", "Value 2", "Second value") ); - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.fetchAllowableValues(STEP_NAME, PROPERTY_NAME)).thenReturn(expectedValues); final List result = connectorDAO.fetchAllowableValues(CONNECTOR_ID, STEP_NAME, PROPERTY_NAME, null); @@ -203,7 +226,7 @@ void testFetchAllowableValuesWithEmptyFilter() { final List expectedValues = List.of( new AllowableValue("value1", "Value 1", "First value") ); - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.fetchAllowableValues(STEP_NAME, PROPERTY_NAME)).thenReturn(expectedValues); final List result = connectorDAO.fetchAllowableValues(CONNECTOR_ID, STEP_NAME, PROPERTY_NAME, ""); @@ -219,7 +242,7 @@ void testFetchAllowableValuesWithFilter() { final List expectedValues = List.of( new AllowableValue("filtered-value", "Filtered Value", "Filtered result") ); - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); when(connectorNode.fetchAllowableValues(STEP_NAME, PROPERTY_NAME, filter)).thenReturn(expectedValues); final List result = connectorDAO.fetchAllowableValues(CONNECTOR_ID, STEP_NAME, PROPERTY_NAME, filter); @@ -231,18 +254,18 @@ void testFetchAllowableValuesWithFilter() { @Test void testFetchAllowableValuesWithNonExistentConnector() { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(null); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(null); assertThrows(ResourceNotFoundException.class, () -> connectorDAO.fetchAllowableValues(CONNECTOR_ID, STEP_NAME, PROPERTY_NAME, null) ); - verify(connectorRepository).getConnector(CONNECTOR_ID); + verify(connectorRepository).getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY); } @Test void testVerifyConfigurationStepSyncsAssetsBeforeVerification() { - when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode); + when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode); final ConfigurationStepConfigurationDTO stepConfigDto = new ConfigurationStepConfigurationDTO(); connectorDAO.verifyConfigurationStep(CONNECTOR_ID, STEP_NAME, stepConfigDto);