Date: Mon, 1 Aug 2016 19:59:31 +0900
Subject: [PATCH 3/3] NIFI-2078, NIFI-2442: Support full EL functionality.
- Added ExternalStateManager.validateExternalStateAccess method to
support
full functionality of Expression Language, including Variable
registory.
- Changed how to capture property values from onPropertyModified to
validateExternalStateAccess
- Removed Expression Language dependency
- Changed error handling to display validation errors appropriately
---
.../state/ExternalStateManager.java | 15 ++++
.../src/main/asciidoc/developer-guide.adoc | 10 ++-
.../apache/nifi/util/MockProcessContext.java | 5 ++
.../util/StandardProcessorTestRunner.java | 11 ++-
.../java/org/apache/nifi/util/TestRunner.java | 7 ++
.../ComponentStateEndpointMerger.java | 22 +++++
.../nifi/web/StandardNiFiServiceFacade.java | 57 +++++++++---
.../nifi/web/controller/ControllerFacade.java | 11 +++
.../nifi/web/dao/ComponentStateDAO.java | 6 +-
.../dao/impl/StandardComponentStateDAO.java | 37 +++-----
.../nifi/processors/kafka/GetKafka.java | 62 +++++++------
.../nifi/processors/kafka/TestGetKafka.java | 63 ++++++-------
.../nifi-kafka-pubsub-processors/pom.xml | 4 -
.../processors/kafka/pubsub/ConsumeKafka.java | 88 +++++++++----------
.../kafka/pubsub/ConsumeKafkaTest.java | 55 +++++++++---
15 files changed, 283 insertions(+), 170 deletions(-)
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/ExternalStateManager.java b/nifi-api/src/main/java/org/apache/nifi/components/state/ExternalStateManager.java
index 896c3e349010..59392284e659 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/state/ExternalStateManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/state/ExternalStateManager.java
@@ -18,8 +18,11 @@
package org.apache.nifi.components.state;
import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import java.io.IOException;
+import java.util.Collection;
/**
*
@@ -47,6 +50,18 @@
*/
public interface ExternalStateManager {
+ /**
+ * To access an external system from those method implementations, configured property values of the component will be needed,
+ * such as Database connection URL or Kafka broker address.
+ * NiFi framework will call {@link #validateExternalStateAccess} method, to determine if required properties are set
+ * to access an external system, before {@link #getExternalState} or {@link #clearExternalState} is called.
+ * In this method, implementations has to validate configured properties,
+ * and also capture configured property values to use at getExternalState and clearExternalState.
+ * @param context validation context
+ * @return validation error result
+ */
+ Collection validateExternalStateAccess(final ValidationContext context);
+
/**
* Returns the current state for the component. This return value may be null.
*
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index 84605fdfb54b..32c6f31afd3d 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -645,12 +645,14 @@ External State Manager is a mechanism to provide DFMs such interactions with ext
If a Processor, Controller Service or Reporting Task interact with an external system that manages NiFi component state,
it's advisable for those components to implement `ExternalStateManager` interface, and annotate the class with `@Stateful` using `Scope.EXTERNAL`.
-By doing so, NiFi UI lets a DFM to access component states by calling corresponding `getExternalState` or `clearExternalState` method.
+By doing so, NiFi UI lets a DFM access component states by calling corresponding `getExternalState` or `clearExternalState` method.
-To access an external system from those method implementations, you will need configured property values of the component,
+To access an external system from those method implementations, configured property values of the component will be needed,
such as Database connection URL or Kafka broker address.
-Since `ExternalStateManager` methods can be called before component is fully configured or scheduled,
-it's important to use `onPropertyModified` to capture those required configuration values for an external system connection.
+NiFi framework will call `validateExternalStateAccess` method, to determine if required properties are set to access an external system,
+before `getExternalState` or `clearExternalState` is called.
+At `valiadteExternalStateAccess` method, implementations has to validate configured properties,
+and also capture configured property values to use at `getExternalState` and `clearExternalState`.
`ExternalStateManager` also has the `getExternalStateScope` method which return either `ExternalStateScope.NODE` or `ExternalStateScope.CLUSTER`
to indicate how a NiFi cluster should access the target external system.
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 63a5c8571ef3..6c15b1974ffa 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -32,6 +32,7 @@
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
@@ -394,4 +395,8 @@ public void setPrimaryNode(boolean primaryNode) {
}
isPrimaryNode = primaryNode;
}
+
+ public ValidationContext newValidationContext() {
+ return new MockValidationContext(this, stateManager, variableRegistry);
+ }
}
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 6607e85498e5..1845fe7db32d 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -606,7 +606,7 @@ public void assertNotValid(final ControllerService service) {
throw new IllegalStateException("Controller Service has not been added to this TestRunner via the #addControllerService method");
}
- final ValidationContext validationContext = new MockValidationContext(context, serviceStateManager, variableRegistry).getControllerServiceValidationContext(service);
+ final ValidationContext validationContext = newValidationContext().getControllerServiceValidationContext(service);
final Collection results = context.getControllerService(service.getIdentifier()).validate(validationContext);
for (final ValidationResult result : results) {
@@ -625,7 +625,7 @@ public void assertValid(final ControllerService service) {
throw new IllegalStateException("Controller Service has not been added to this TestRunner via the #addControllerService method");
}
- final ValidationContext validationContext = new MockValidationContext(context, serviceStateManager, variableRegistry).getControllerServiceValidationContext(service);
+ final ValidationContext validationContext = newValidationContext().getControllerServiceValidationContext(service);
final Collection results = context.getControllerService(service.getIdentifier()).validate(validationContext);
for (final ValidationResult result : results) {
@@ -740,7 +740,7 @@ public ValidationResult setProperty(final ControllerService service, final Prope
final Map curProps = configuration.getProperties();
final Map updatedProps = new HashMap<>(curProps);
- final ValidationContext validationContext = new MockValidationContext(context, serviceStateManager, variableRegistry).getControllerServiceValidationContext(service);
+ final ValidationContext validationContext = newValidationContext().getControllerServiceValidationContext(service);
final ValidationResult validationResult = property.validate(value, validationContext);
updatedProps.put(property, value);
@@ -824,4 +824,9 @@ public void setClustered(boolean clustered) {
public void setPrimaryNode(boolean primaryNode) {
context.setPrimaryNode(primaryNode);
}
+
+ @Override
+ public ValidationContext newValidationContext() {
+ return context.newValidationContext();
+ }
}
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 023ef643594a..8d3ca63d9ed8 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -24,6 +24,7 @@
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.queue.QueueSize;
@@ -901,4 +902,10 @@ public interface TestRunner {
* @param primaryNode Specify if this test emulates running as a primary node
*/
void setPrimaryNode(boolean primaryNode);
+
+ /**
+ * Returns the {@link MockValidationContext} instance that enables custom validation test cases.
+ * @return newly created validation context
+ */
+ ValidationContext newValidationContext();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
index 84e6d706bf39..c8474d99ae9a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
@@ -17,6 +17,7 @@
package org.apache.nifi.cluster.coordination.http.endpoints;
+import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@@ -30,12 +31,18 @@
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.state.SortedStateUtils;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.StateEntryDTO;
import org.apache.nifi.web.api.dto.StateMapDTO;
import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.StreamingOutput;
public class ComponentStateEndpointMerger extends AbstractSingleDTOEndpoint {
+ private static final Logger logger = LoggerFactory.getLogger(ComponentStateEndpointMerger.class);
public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state");
public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/[a-f0-9\\-]{36}/state");
public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}/state");
@@ -65,6 +72,21 @@ protected ComponentStateDTO getDto(ComponentStateEntity entity) {
public void mergeResponses(ComponentStateDTO clientDto, Map dtoMap,
Set successfulResponses, Set problematicResponses) {
+ // If there's a problematic response, pick it as the final response.
+ if (problematicResponses != null && problematicResponses.size() > 0) {
+ final NodeResponse problematicResponse = problematicResponses.iterator().next();
+ if (problematicResponse.getResponse().getEntity() instanceof StreamingOutput) {
+ try (
+ final ByteArrayOutputStream errResponse = new ByteArrayOutputStream();
+ ) {
+ ((StreamingOutput)problematicResponse.getResponse().getEntity()).write(errResponse);
+ throw new IllegalStateException(errResponse.toString("utf-8"));
+ } catch (IOException e) {
+ logger.error("Failed to read problematic response due to {}", e, e);
+ }
+ }
+ }
+
// If there're more than 1 node returning external state, then it's a per node external state.
final boolean externalPerNode = dtoMap.values().stream()
.filter(component -> (component.getExternalState() != null && component.getExternalState().getState() != null))
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 46bc9a16335a..d50949b01fef 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -203,6 +203,7 @@
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -820,13 +821,19 @@ public CounterDTO updateCounter(final String counterId) {
return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId));
}
- private ClearComponentStateResultEntity clearComponentState(final ConfigurableComponent component) {
+ private ClearComponentStateResultEntity clearComponentState(final ConfiguredComponent configuredComponent, final ConfigurableComponent component) {
logger.debug("clearComponentState: component={}, shouldManageClusterState={}, shouldManageExternalState={}",
component, shouldManageClusterState(), shouldManageExternalState(component));
final ClearComponentStateResultEntity entity = new ClearComponentStateResultEntity();
- componentStateDAO.clearState(component, Scope.LOCAL);
+ try {
+ componentStateDAO.clearState(component, Scope.LOCAL);
+ } catch (IOException e) {
+ final String msg = String.format("Failed to clear state for %s due to %s", component, e);
+ logger.error(msg, e);
+ throw new IllegalStateException(msg, e);
+ }
try {
if (shouldManageClusterState()) {
@@ -834,6 +841,10 @@ private ClearComponentStateResultEntity clearComponentState(final ConfigurableCo
}
if (shouldManageExternalState(component)) {
+ final Collection validationResults = controllerFacade.validateExternalStateAccess(configuredComponent, (ExternalStateManager) component);
+ if (!validationResults.isEmpty()) {
+ throw new IllegalStateException("Invalid configuration to access external state: " + validationResults);
+ }
componentStateDAO.clearState(component, Scope.EXTERNAL);
}
} catch (Exception e) {
@@ -843,7 +854,9 @@ private ClearComponentStateResultEntity clearComponentState(final ConfigurableCo
// See: NodeClusterCoordinator.afterRequest()
// Removing node doesn't help since it's related to external system, and other node will have the same issue.
// So instead, return entity with message.
- logger.warn("Failed to clear state for {} due to {}", component, e, e);
+ if (!(e instanceof IllegalStateException)) {
+ logger.error("Failed to clear state for {} due to {}", component, e, e);
+ }
entity.setCleared(false);
entity.setMessage(e.getMessage());
}
@@ -857,7 +870,8 @@ public void verifyCanClearProcessorState(final String processorId) {
@Override
public ClearComponentStateResultEntity clearProcessorState(final String processorId) {
- return clearComponentState(processorDAO.getProcessor(processorId).getProcessor());
+ final ProcessorNode processor = processorDAO.getProcessor(processorId);
+ return clearComponentState(processor, processor.getProcessor());
}
@Override
@@ -867,7 +881,8 @@ public void verifyCanClearControllerServiceState(final String controllerServiceI
@Override
public ClearComponentStateResultEntity clearControllerServiceState(final String controllerServiceId) {
- return clearComponentState(controllerServiceDAO.getControllerService(controllerServiceId).getControllerServiceImplementation());
+ final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
+ return clearComponentState(controllerService, controllerService.getControllerServiceImplementation());
}
@Override
@@ -877,7 +892,8 @@ public void verifyCanClearReportingTaskState(final String reportingTaskId) {
@Override
public ClearComponentStateResultEntity clearReportingTaskState(final String reportingTaskId) {
- return clearComponentState(reportingTaskDAO.getReportingTask(reportingTaskId).getReportingTask());
+ final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
+ return clearComponentState(reportingTask, reportingTask.getReportingTask());
}
@Override
@@ -1948,7 +1964,7 @@ private boolean shouldManageExternalState(final ConfigurableComponent component)
|| controllerFacade.isPrimary());
}
- private ComponentStateDTO getComponentState(final ConfigurableComponent component) {
+ private ComponentStateDTO getComponentState(final ConfiguredComponent configuredComponent, final ConfigurableComponent component) {
logger.debug("getComponentState: component={}, shouldManageClusterState={}, shouldManageExternalState={}",
component, shouldManageClusterState(), shouldManageExternalState(component));
@@ -1958,29 +1974,42 @@ private ComponentStateDTO getComponentState(final ConfigurableComponent componen
logger.debug("clusterState={}", clusterState);
final StateMap localState = componentStateDAO.getState(component, Scope.LOCAL);
logger.debug("localState={}", localState);
- final StateMap externalState = shouldManageExternalState(component) ? componentStateDAO.getState(component, Scope.EXTERNAL) : null;
+ StateMap externalState = null;
+ if (shouldManageExternalState(component)) {
+ final Collection validationResults = controllerFacade.validateExternalStateAccess(configuredComponent, (ExternalStateManager) component);
+ if (!validationResults.isEmpty()) {
+ throw new IllegalStateException("Invalid configuration to access external state: " + validationResults);
+ } else {
+ externalState = componentStateDAO.getState(component, Scope.EXTERNAL);
+ }
+ }
logger.debug("externalState={}", externalState);
return dtoFactory.createComponentStateDTO(component, localState, clusterState, externalState);
- } catch (Exception e) {
- logger.error("Failed to retrieve state for {} due to {}", component, e, e);
- throw e;
+ } catch (IOException e) {
+ final String msg = String.format("Failed to clear state for %s due to %s", component, e);
+ logger.error(msg, e);
+ throw new IllegalStateException(msg, e);
}
+
}
@Override
public ComponentStateDTO getProcessorState(final String processorId) {
- return getComponentState(processorDAO.getProcessor(processorId).getProcessor());
+ final ProcessorNode processor = processorDAO.getProcessor(processorId);
+ return getComponentState(processor, processor.getProcessor());
}
@Override
public ComponentStateDTO getControllerServiceState(final String controllerServiceId) {
- return getComponentState(controllerServiceDAO.getControllerService(controllerServiceId).getControllerServiceImplementation());
+ final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
+ return getComponentState(controllerService, controllerService.getControllerServiceImplementation());
}
@Override
public ComponentStateDTO getReportingTaskState(final String reportingTaskId) {
- return getComponentState(reportingTaskDAO.getReportingTask(reportingTaskId).getReportingTask());
+ final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
+ return getComponentState(reportingTask, reportingTask.getReportingTask());
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index afc2659f47a8..c1bd2f78b2eb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -33,10 +33,13 @@
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.ExternalStateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ContentAvailability;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
@@ -69,6 +72,7 @@
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.SearchableFields;
@@ -1757,6 +1761,12 @@ private void addIfAppropriate(final String searchStr, final String value, final
}
}
+ public Collection validateExternalStateAccess(final ConfiguredComponent configuredComponent, final ExternalStateManager stateManager) {
+ final StandardValidationContext validationContext = new StandardValidationContext(flowController, configuredComponent.getProperties(),
+ configuredComponent.getAnnotationData(), null, configuredComponent.getIdentifier(), variableRegistry);
+ return stateManager.validateExternalStateAccess(validationContext);
+ }
+
/*
* setters
*/
@@ -1791,4 +1801,5 @@ public void setBulletinRepository(BulletinRepository bulletinRepository) {
public void setVariableRegistry(VariableRegistry variableRegistry) {
this.variableRegistry = variableRegistry;
}
+
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
index b244e04c05e2..1940b5696e2c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
@@ -20,6 +20,8 @@
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
+import java.io.IOException;
+
public interface ComponentStateDAO {
/**
@@ -29,7 +31,7 @@ public interface ComponentStateDAO {
* @param scope scope
* @return state map
*/
- StateMap getState(ConfigurableComponent component, Scope scope);
+ StateMap getState(ConfigurableComponent component, Scope scope) throws IOException;
/**
* Clears the state for the specified component.
@@ -37,6 +39,6 @@ public interface ComponentStateDAO {
* @param component component
* @param scope scope
*/
- void clearState(ConfigurableComponent component, Scope scope);
+ void clearState(ConfigurableComponent component, Scope scope) throws IOException;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
index a2ebd1a2fbd3..4ee0ff59c59d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
@@ -32,7 +32,7 @@ public class StandardComponentStateDAO implements ComponentStateDAO {
private StateManagerProvider stateManagerProvider;
@Override
- public StateMap getState(final ConfigurableComponent component, final Scope scope) {
+ public StateMap getState(final ConfigurableComponent component, final Scope scope) throws IOException {
switch (scope) {
case EXTERNAL:
return getExternalState(component);
@@ -54,20 +54,15 @@ private StateMap getState(final String componentId, final Scope scope) {
}
}
- private StateMap getExternalState(final ConfigurableComponent component) {
+ private StateMap getExternalState(final ConfigurableComponent component) throws IOException {
if (component instanceof ExternalStateManager) {
- try {
- return ((ExternalStateManager)component).getExternalState();
- } catch (final IOException ioe) {
- throw new IllegalStateException(String.format("Unable to get the external state for the specified component %s: %s",
- component.getIdentifier(), ioe), ioe);
- }
+ return ((ExternalStateManager)component).getExternalState();
}
return null;
}
@Override
- public void clearState(final ConfigurableComponent component, final Scope scope) {
+ public void clearState(final ConfigurableComponent component, final Scope scope) throws IOException {
switch (scope) {
case EXTERNAL:
clearExternalState(component);
@@ -78,27 +73,17 @@ public void clearState(final ConfigurableComponent component, final Scope scope)
}
}
- private void clearState(final String componentId, final Scope scope) {
- try {
- final StateManager manager = stateManagerProvider.getStateManager(componentId);
- if (manager == null) {
- throw new ResourceNotFoundException(String.format("State for the specified component %s could not be found.", componentId));
- }
- manager.clear(scope);
-
- } catch (final IOException ioe) {
- throw new IllegalStateException(String.format("Unable to clear the state for the specified component %s: %s", componentId, ioe), ioe);
+ private void clearState(final String componentId, final Scope scope) throws IOException {
+ final StateManager manager = stateManagerProvider.getStateManager(componentId);
+ if (manager == null) {
+ throw new ResourceNotFoundException(String.format("State for the specified component %s could not be found.", componentId));
}
+ manager.clear(scope);
}
- private void clearExternalState(final ConfigurableComponent component) {
+ private void clearExternalState(final ConfigurableComponent component) throws IOException {
if (component instanceof ExternalStateManager) {
- try {
- ((ExternalStateManager)component).clearExternalState();
- } catch (final IOException ioe) {
- throw new IllegalStateException(String.format("Unable to clear the external state for the specified component %s: %s",
- component.getIdentifier(), ioe), ioe);
- }
+ ((ExternalStateManager)component).clearExternalState();
}
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 1b88dc5df349..f3d969665230 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -20,6 +20,7 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -51,6 +52,8 @@
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.ExternalStateManager;
import org.apache.nifi.components.state.Scope;
@@ -237,6 +240,10 @@ public Set getRelationships() {
public void createConsumers(final ProcessContext context) {
+ zookeeperConnectionString = context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue();
+ groupId = context.getProperty(GROUP_ID).getValue();
+ topic = context.getProperty(TOPIC).getValue();
+
final Properties props = new Properties();
props.setProperty("zookeeper.connect", zookeeperConnectionString);
props.setProperty("group.id", groupId);
@@ -503,53 +510,44 @@ public ExternalStateScope getExternalStateScope() {
return ExternalStateScope.CLUSTER;
}
+ @Override
+ public Collection validateExternalStateAccess(ValidationContext context) {
+ final Collection validationResults = validate(context);
+ if (validationResults.isEmpty()) {
+
+ // Capture settings.
+ context.getProperties().entrySet().forEach(kv -> {
+ final PropertyDescriptor descriptor = kv.getKey();
+ // If value hasn't been modified by user from the default value, the value is null.
+ final String v = StringUtils.isBlank(kv.getValue()) ? descriptor.getDefaultValue() : kv.getValue();
+ if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
+ zookeeperConnectionString = v;
+ } else if (TOPIC.equals(descriptor)) {
+ topic = v;
+ } else if (GROUP_ID.equals(descriptor)) {
+ groupId = v;
+ }
+ });
+ }
+
+ return validationResults;
+ }
+
@Override
public StateMap getExternalState() throws IOException {
// We don't have to synchronize with onTrigger here,
// since it merely retrieves state from Zk using different channel, it doesn't affect consuming.
- if (!isReadyToAccessState()) {
- return null;
- }
final Map partitionOffsets = KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
return new StandardStateMap(partitionOffsets, System.currentTimeMillis());
}
- private boolean isReadyToAccessState() {
- return !StringUtils.isEmpty(zookeeperConnectionString)
- && !StringUtils.isEmpty(topic)
- && !StringUtils.isEmpty(groupId);
- }
-
@Override
public void clearExternalState() throws IOException {
- if (!isReadyToAccessState()) {
- return;
- }
// Block onTrigger starts creating new consumer until clear offset finishes.
synchronized (this.consumerStreamsReady) {
KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic, groupId);
}
}
- /**
- * GetKafka overrides this method in order to capture processor's property values required when it retrieves
- * its state managed externally at Kafka. Since view/clear state operation can be executed before onTrigger() is called,
- * we need to capture these values as it's modified. This method is also called when NiFi restarts and loads configs,
- * so users can access external states right after restart of NiFi.
- * @param descriptor of the modified property
- * @param oldValue non-null property value (previous)
- * @param newValue the new property value or if null indicates the property
- */
- @Override
- public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
- if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
- zookeeperConnectionString = newValue;
- } else if (TOPIC.equals(descriptor)) {
- topic = newValue;
- } else if (GROUP_ID.equals(descriptor)) {
- groupId = newValue;
- }
- }
-
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
index fda896db13f1..014176d75384 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
@@ -16,14 +16,16 @@
*/
package org.apache.nifi.processors.kafka;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.log4j.BasicConfigurator;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -38,8 +40,9 @@
import kafka.consumer.ConsumerIterator;
import kafka.message.MessageAndMetadata;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class TestGetKafka {
@@ -171,43 +174,43 @@ public byte[] answer(InvocationOnMock invocation) throws Throwable {
}
@Test
- public void testGetState() throws Exception {
+ public void testValidateExternalStateAccess() throws Exception {
final GetKafka processor = new GetKafka();
final TestRunner runner = TestRunners.newTestRunner(processor);
- assertNull("State should be null when required properties are not specified.", processor.getExternalState());
-
- runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "0.0.0.0:invalid-port");
- runner.setProperty(GetKafka.GROUP_ID, "consumer-group-id");
-
- assertNull("State should be null when required properties are not specified.", processor.getExternalState());
+ Collection validationResults = processor.validateExternalStateAccess(runner.newValidationContext());
+ // Group Id has default value. These two properties are required.
+ assertEquals(2, validationResults.size());
+ final List explanations = validationResults.stream().map(r -> r.getExplanation()).collect(Collectors.toList());
+ assertTrue(explanations.contains("Topic Name is required"));
+ assertTrue(explanations.contains("ZooKeeper Connection String is required"));
+ // Set required properties, validation passes, values are set
+ runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "0.0.0.0:9092");
runner.setProperty(GetKafka.TOPIC, "testX");
- try {
- processor.getExternalState();
- fail("The processor should try to access Zookeeper and should fail since it can not connect.");
- } catch (IOException e) {
- }
- }
-
- @Test
- public void testClearState() throws Exception {
- final GetKafka processor = new GetKafka();
- final TestRunner runner = TestRunners.newTestRunner(processor);
+ validationResults = processor.validateExternalStateAccess(runner.newValidationContext());
+ assertEquals(0, validationResults.size());
- // Clear doesn't do anything until required properties are set.
- processor.clearExternalState();
+ assertEquals("testX", getPrivateFieldValue(processor, "topic"));
+ assertEquals("0.0.0.0:9092", getPrivateFieldValue(processor, "zookeeperConnectionString"));
+ assertNotNull("Default groupId should be used", getPrivateFieldValue(processor, "groupId"));
- runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "0.0.0.0:invalid-port");
- runner.setProperty(GetKafka.TOPIC, "testX");
+ // Set groupId
runner.setProperty(GetKafka.GROUP_ID, "consumer-group-id");
- try {
- processor.clearExternalState();
- fail("The processor should try to access Zookeeper and should fail since it can not connect Zookeeper.");
- } catch (IOException e) {
- }
+ validationResults = processor.validateExternalStateAccess(runner.newValidationContext());
+ assertEquals(0, validationResults.size());
+
+ assertEquals("testX", getPrivateFieldValue(processor, "topic"));
+ assertEquals("0.0.0.0:9092", getPrivateFieldValue(processor, "zookeeperConnectionString"));
+ assertNotNull("consumer-group-id", getPrivateFieldValue(processor, "groupId"));
+ }
+
+ private Object getPrivateFieldValue(GetKafka processor, String fieldName) throws NoSuchFieldException, IllegalAccessException {
+ final Field field = processor.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(processor);
}
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
index 2b3e5246dac4..53e309a920bd 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
@@ -34,10 +34,6 @@
org.apache.nifi
nifi-utils