Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.components.connector.components.FlowContext;
import org.apache.nifi.web.NiFiConnectorWebContext;

Expand All @@ -38,7 +39,8 @@ public MockNiFiConnectorWebContext(final ConnectorRepository connectorRepository
@Override
@SuppressWarnings("unchecked")
public <T> ConnectorWebContext<T> getConnectorWebContext(final String connectorId) throws IllegalArgumentException {
final ConnectorNode connectorNode = connectorRepository.getConnector(connectorId);
// Test runner: there is no real ConnectorConfigurationProvider in this context, so syncing would be wasted work.
final ConnectorNode connectorNode = connectorRepository.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY);
if (connectorNode == null) {
throw new IllegalArgumentException("Unable to find connector with id: " + connectorId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.nifi.components.connector.Connector;
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.web.NiFiConnectorWebContext;
import org.apache.nifi.web.NiFiConnectorWebContext.ConnectorWebContext;
Expand Down Expand Up @@ -55,7 +56,7 @@ class MockNiFiConnectorWebContextTest {

@Test
void testGetConnectorWebContextReturnsConnectorAndFlowContexts() {
when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode);
when(connectorRepository.getConnector(CONNECTOR_ID, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode);
when(connectorNode.getConnector()).thenReturn(connector);
when(connectorNode.getWorkingFlowContext()).thenReturn(workingFlowContext);
when(connectorNode.getActiveFlowContext()).thenReturn(activeFlowContext);
Expand All @@ -71,7 +72,7 @@ void testGetConnectorWebContextReturnsConnectorAndFlowContexts() {

@Test
void testGetConnectorWebContextThrowsForUnknownConnector() {
when(connectorRepository.getConnector("unknown-id")).thenReturn(null);
when(connectorRepository.getConnector("unknown-id", ConnectorSyncMode.LOCAL_ONLY)).thenReturn(null);

final NiFiConnectorWebContext context = new MockNiFiConnectorWebContext(connectorRepository);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,26 @@ public interface ConnectorRepository {
void removeConnector(String connectorId);

/**
* Gets the Connector with the given identifier
* Gets the Connector with the given identifier.
*
* @param identifier the identifier of the Connector to get
* @param syncMode whether to consult the {@link ConnectorConfigurationProvider} (if configured)
* to refresh local state from the external store before returning, or whether
* to return the in-memory copy as-is. See {@link ConnectorSyncMode}.
* @return the Connector with the given identifier, or null if no such Connector exists
*/
ConnectorNode getConnector(String identifier);
ConnectorNode getConnector(String identifier, ConnectorSyncMode syncMode);
Comment thread
mcgilman marked this conversation as resolved.

/**
* Returns all Connectors in the Repository.
*
* @param syncMode whether to consult the {@link ConnectorConfigurationProvider} (if configured)
* to refresh each connector's local state from the external store before
* returning, or whether to return the in-memory copies as-is. See
* {@link ConnectorSyncMode}.
* @return all Connectors in the Repository
*/
List<ConnectorNode> getConnectors();
List<ConnectorNode> getConnectors(ConnectorSyncMode syncMode);
Comment thread
mcgilman marked this conversation as resolved.

/**
* Starts the given Connector, managing any appropriate lifecycle events.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.components.connector;

/**
* Indicates whether retrieval of {@link ConnectorNode} instances from the
* {@link ConnectorRepository} should consult the configured
* {@link ConnectorConfigurationProvider} to refresh the local state from the
* external store, or whether the in-memory copy should be returned as-is.
*
* <p>Choosing {@link #SYNC_WITH_PROVIDER} causes the repository to call into
* the configuration provider (which may perform network I/O, secret lookups,
* and other potentially expensive or failure-prone operations) before returning
* the connector. This is appropriate for user-facing reads that must reflect
* the latest external state, such as REST API responses. Callers operating on
* critical paths that must not block on or fail because of the external store
* (for example, periodic flow serialization) should use {@link #LOCAL_ONLY}
* instead.</p>
*/
public enum ConnectorSyncMode {

/**
* Consult the {@link ConnectorConfigurationProvider} (when configured) to
* refresh the local connector state from the external store before
* returning. This may perform network I/O and may throw if the external
* store is unavailable.
*/
SYNC_WITH_PROVIDER,

/**
* Return the in-memory connector state without consulting the
* {@link ConnectorConfigurationProvider}. No external I/O is performed.
*/
LOCAL_ONLY
Comment thread
mcgilman marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,19 +459,23 @@ private void purgeConnectorAndAwait(final ConnectorNode connector) {
}

@Override
public ConnectorNode getConnector(final String identifier) {
public ConnectorNode getConnector(final String identifier, final ConnectorSyncMode syncMode) {
Objects.requireNonNull(syncMode, "syncMode is required");
final ConnectorNode connector = connectors.get(identifier);
if (connector != null) {
if (connector != null && syncMode == ConnectorSyncMode.SYNC_WITH_PROVIDER) {
syncFromProvider(connector);
}
return connector;
}

@Override
public List<ConnectorNode> getConnectors() {
public List<ConnectorNode> getConnectors(final ConnectorSyncMode syncMode) {
Objects.requireNonNull(syncMode, "syncMode is required");
final List<ConnectorNode> connectorList = List.copyOf(connectors.values());
for (final ConnectorNode connector : connectorList) {
syncFromProvider(connector);
if (syncMode == ConnectorSyncMode.SYNC_WITH_PROVIDER) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that a null value for syncMode will default to LOCAL_ONLY. It might be preferable to require that arg is not null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — added Objects.requireNonNull(syncMode, "syncMode is required") to both getConnector(String, ConnectorSyncMode) and getConnectors(ConnectorSyncMode) so a missing mode now fails fast instead of silently behaving as LOCAL_ONLY.

for (final ConnectorNode connector : connectorList) {
syncFromProvider(connector);
}
}
return connectorList;
}
Expand Down Expand Up @@ -665,7 +669,9 @@ private void collectReferencedAssetIds(final FrameworkFlowContext flowContext, f
@Override
public void updateConnector(final ConnectorNode connector, final String name) {
if (configurationProvider != null) {
final ConnectorWorkingConfiguration workingConfiguration = buildWorkingConfiguration(connector);
// Load the latest provider state so that other in-flight working changes are not overwritten by a rename.
final Optional<ConnectorWorkingConfiguration> externalConfig = configurationProvider.load(connector.getIdentifier());
final ConnectorWorkingConfiguration workingConfiguration = externalConfig.orElseGet(() -> buildWorkingConfiguration(connector));
workingConfiguration.setName(name);
configurationProvider.save(connector.getIdentifier(), workingConfiguration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorRepositoryInitializationContext;
import org.apache.nifi.components.connector.ConnectorRequestReplicator;
import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.components.connector.ConnectorValidationTrigger;
import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.components.connector.StandardConnectorConfigurationProviderInitializationContext;
Expand Down Expand Up @@ -1050,7 +1051,7 @@ public Connection findConnectionIncludingConnectorManaged(final String connectio
return connection;
}

for (final ConnectorNode connector : connectorRepository.getConnectors()) {
for (final ConnectorNode connector : connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
if (flowContext != null) {
final ProcessGroup managedGroup = flowContext.getManagedProcessGroup();
Expand All @@ -1077,7 +1078,7 @@ public RemoteGroupPort findRemoteGroupPortIncludingConnectorManaged(final String
return remoteGroupPort;
}

for (final ConnectorNode connector : connectorRepository.getConnectors()) {
for (final ConnectorNode connector : connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
if (flowContext != null) {
final ProcessGroup managedGroup = flowContext.getManagedProcessGroup();
Expand Down Expand Up @@ -1507,7 +1508,7 @@ public void trigger(final ComponentNode component) {
LOG.info("Starting {} Connectors", startConnectorsAfterInitialization.size());
for (final ConnectorNode connectorNode : startConnectorsAfterInitialization) {
try {
final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier());
final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier(), ConnectorSyncMode.LOCAL_ONLY);
if (existingConnector == null) {
LOG.debug("Will not start {} because it no longer exists", connectorNode);
continue;
Expand Down Expand Up @@ -1539,7 +1540,7 @@ public void trigger(final ComponentNode component) {
// Explicitly stop Connectors so that their state is properly transitioned from UPDATED to STOPPED.
for (final ConnectorNode connectorNode : startConnectorsAfterInitialization) {
try {
final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier());
final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier(), ConnectorSyncMode.LOCAL_ONLY);
if (existingConnector != null) {
connectorRepository.stopConnector(connectorNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorStateTransition;
import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.components.connector.FlowContextFactory;
import org.apache.nifi.components.connector.ProcessGroupFactory;
import org.apache.nifi.components.connector.StandardComponentBundleLookup;
Expand Down Expand Up @@ -853,12 +854,12 @@ private void gatherParameterContexts(final ProcessGroup sourceGroup, final Map<S

@Override
public List<ConnectorNode> getAllConnectors() {
return flowController.getConnectorRepository().getConnectors();
return flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY);
}

@Override
public ConnectorNode getConnector(final String id) {
return flowController.getConnectorRepository().getConnector(id);
return flowController.getConnectorRepository().getConnector(id, ConnectorSyncMode.LOCAL_ONLY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.nifi.controller.serialization;

import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.FlowAnalysisRuleNode;
import org.apache.nifi.controller.FlowController;
Expand Down Expand Up @@ -97,7 +98,7 @@ public VersionedDataflow createMapping() {
private List<VersionedConnector> mapConnectors() {
final List<VersionedConnector> connectors = new ArrayList<>();

for (final ConnectorNode connectorNode : flowController.getConnectorRepository().getConnectors()) {
for (final ConnectorNode connectorNode : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
final VersionedConnector versionedConnector = flowMapper.mapConnector(connectorNode);
if (flowController.isStartAfterInitialization(connectorNode)) {
versionedConnector.setScheduledState(ScheduledState.RUNNING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.components.connector.ConnectorSyncResult;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
Expand Down Expand Up @@ -1058,7 +1059,7 @@ private void inheritConnectors(final FlowController flowController, final Versio
}
}

for (final ConnectorNode existingConnector : connectorRepository.getConnectors()) {
for (final ConnectorNode existingConnector : connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
if (!proposedConnectorIds.contains(existingConnector.getIdentifier())) {
logger.info("Connector [{}] (state={}) is no longer part of the proposed flow. Stopping and removing.",
existingConnector.getIdentifier(), existingConnector.getCurrentState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.queue.FlowFileQueue;
Expand Down Expand Up @@ -60,7 +61,7 @@ public DiagnosticsDumpElement captureDump(final boolean verbose) {

details.add("");
final ConnectorRepository connectorRepository = flowController.getConnectorRepository();
final List<ConnectorNode> connectors = connectorRepository != null ? connectorRepository.getConnectors() : List.of();
final List<ConnectorNode> connectors = connectorRepository != null ? connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY) : List.of();
if (connectors.isEmpty()) {
details.add("This instance has no Connectors.");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private ConnectorNode initializeDynamicFlowConnector() {
final DynamicFlowConnector flowConnector = (DynamicFlowConnector) connector;
assertTrue(flowConnector.isInitialized());

assertEquals(List.of(connectorNode), connectorRepository.getConnectors());
assertEquals(List.of(connectorNode), connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY));

final ProcessGroup rootGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
assertEquals(3, rootGroup.getProcessGroups().size());
Expand All @@ -282,7 +282,7 @@ private ConnectorNode initializeDynamicFlowConnector() {
private ConnectorNode initializeParameterConnector() {
final ConnectorNode connectorNode = flowManager.createConnector(ParameterConnector.class.getName(), "parameter-connector", SystemBundle.SYSTEM_BUNDLE_COORDINATE, true, true);
assertNotNull(connectorNode);
assertEquals(List.of(connectorNode), connectorRepository.getConnectors());
assertEquals(List.of(connectorNode), connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY));

final ProcessGroup rootGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
assertEquals(3, rootGroup.getProcessors().size());
Expand Down
Loading
Loading