From a8f0c2e5b8078416c12ea7179e48bc5bddb8cdb8 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Mon, 30 Oct 2017 10:19:07 +0900 Subject: [PATCH] NIFI-4546: Make ReportingTask aware of node type in a cluster --- .../nifi/reporting/AbstractReportingTask.java | 14 ++++++++++++-- .../reporting/ReportingInitializationContext.java | 9 +++++++++ .../util/MockReportingInitializationContext.java | 6 ++++++ .../org/apache/nifi/controller/FlowController.java | 2 +- .../nifi/controller/StandardFlowSynchronizer.java | 2 +- .../StandardReportingInitializationContext.java | 11 ++++++++++- .../scheduling/TestStandardProcessScheduler.java | 2 +- .../mock/MockReportingInitializationContext.java | 6 ++++++ 8 files changed, 46 insertions(+), 6 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java b/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java index b5afe17a9fb7..93d23b22f85a 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java @@ -20,8 +20,8 @@ import org.apache.nifi.components.AbstractConfigurableComponent; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessorInitializationContext; public abstract class AbstractReportingTask extends AbstractConfigurableComponent implements ReportingTask { @@ -30,6 +30,7 @@ public abstract class AbstractReportingTask extends AbstractConfigurableComponen private long schedulingNanos; private ControllerServiceLookup serviceLookup; private ComponentLog logger; + private NodeTypeProvider nodeTypeProvider; @Override public final void initialize(final ReportingInitializationContext config) throws InitializationException { @@ -38,18 +39,27 @@ public final void initialize(final ReportingInitializationContext config) throws name = config.getName(); schedulingNanos = config.getSchedulingPeriod(TimeUnit.NANOSECONDS); serviceLookup = config.getControllerServiceLookup(); + nodeTypeProvider = config.getNodeTypeProvider(); init(config); } /** * @return the {@link ControllerServiceLookup} that was passed to the - * {@link #init(ProcessorInitializationContext)} method + * {@link #initialize(ReportingInitializationContext)} method */ protected final ControllerServiceLookup getControllerServiceLookup() { return serviceLookup; } + /** + * @return the {@link NodeTypeProvider} that was passed to the + * {@link #initialize(ReportingInitializationContext)} method + */ + protected final NodeTypeProvider getNodeTypeProvider() { + return nodeTypeProvider; + } + /** * @return the identifier of this Reporting Task */ diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java index df64e03493a7..0bf49d3976ba 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -75,4 +76,12 @@ public interface ReportingInitializationContext extends KerberosContext { * way and generate bulletins when appropriate */ ComponentLog getLogger(); + + /** + * @return the {@link NodeTypeProvider} which can be used to detect the node + * type of this NiFi instance. + * @since Apache NiFi 1.5.0 + */ + NodeTypeProvider getNodeTypeProvider(); + } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java index 454b74293656..d1b8e5c84b06 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java @@ -23,6 +23,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -73,6 +74,11 @@ public ControllerServiceLookup getControllerServiceLookup() { return this; } + @Override + public NodeTypeProvider getNodeTypeProvider() { + return null; + } + @Override public String getSchedulingPeriod() { return "0 sec"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index acc3102aef71..99d8ed0801d1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3150,7 +3150,7 @@ public ReportingTaskNode createReportingTask(final String type, final String id, if (firstTimeAdded) { final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(), - SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this, nifiProperties); + SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this, nifiProperties, this); try { taskNode.getReportingTask().initialize(config); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 3af270cc503e..3d07456e1ff0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -620,7 +620,7 @@ private ReportingTaskNode getOrCreateReportingTask(final FlowController controll final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), - SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller, nifiProperties); + SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller, nifiProperties, controller); try { reportingTask.getReportingTask().initialize(config); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java index d96d2b70009b..ebe774bdcb84 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java @@ -22,6 +22,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.ReportingInitializationContext; @@ -38,11 +39,13 @@ public class StandardReportingInitializationContext implements ReportingInitiali private final ControllerServiceProvider serviceProvider; private final ComponentLog logger; private final NiFiProperties nifiProperties; + private final NodeTypeProvider nodeTypeProvider; public StandardReportingInitializationContext( final String id, final String name, final SchedulingStrategy schedulingStrategy, final String schedulingPeriod, final ComponentLog logger, - final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties) { + final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties, + final NodeTypeProvider nodeTypeProvider) { this.id = id; this.name = name; this.schedulingPeriod = schedulingPeriod; @@ -50,6 +53,7 @@ public StandardReportingInitializationContext( this.schedulingStrategy = schedulingStrategy; this.logger = logger; this.nifiProperties = nifiProperties; + this.nodeTypeProvider = nodeTypeProvider; } @Override @@ -134,4 +138,9 @@ public File getKerberosServiceKeytab() { public File getKerberosConfigurationFile() { return nifiProperties.getKerberosConfigurationFile(); } + + @Override + public NodeTypeProvider getNodeTypeProvider() { + return nodeTypeProvider; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 2c5996423850..0c4acd80a24a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -116,7 +116,7 @@ public void setup() throws InitializationException { reportingTask = new TestReportingTask(); final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs", - Mockito.mock(ComponentLog.class), null, nifiProperties); + Mockito.mock(ComponentLog.class), null, nifiProperties, null); reportingTask.initialize(config); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java index 630c657824ff..379a56e918f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.mock; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -80,4 +81,9 @@ public File getKerberosServiceKeytab() { public File getKerberosConfigurationFile() { return null; } + + @Override + public NodeTypeProvider getNodeTypeProvider() { + return null; + } }