diff --git a/flink-streaming-connectors/flink-connector-nifi/pom.xml b/flink-streaming-connectors/flink-connector-nifi/pom.xml index d93bce79f029b..a18d7b927fde5 100644 --- a/flink-streaming-connectors/flink-connector-nifi/pom.xml +++ b/flink-streaming-connectors/flink-connector-nifi/pom.xml @@ -37,7 +37,7 @@ under the License. - 0.3.0 + 0.6.1 diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java index a213bb4b2a872..00b6921883a25 100644 --- a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java +++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.connectors.nifi; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.nifi.remote.Transaction; @@ -37,7 +38,7 @@ * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile. */ -public class NiFiSource extends RichParallelSourceFunction { +public class NiFiSource extends RichParallelSourceFunction implements StoppableFunction{ private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class); @@ -46,7 +47,7 @@ public class NiFiSource extends RichParallelSourceFunction { private long waitTimeMs; private SiteToSiteClient client; private SiteToSiteClientConfig clientConfig; - private transient volatile boolean running; + private volatile boolean isRunning = true; /** * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms. @@ -72,19 +73,19 @@ public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) { public void open(Configuration parameters) throws Exception { super.open(parameters); client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); - running = true; + isRunning = true; } @Override public void run(SourceContext ctx) throws Exception { try { - while (running) { + while (isRunning) { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); if (transaction == null) { LOG.warn("A transaction could not be created, waiting and will try again..."); try { Thread.sleep(waitTimeMs); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } continue; @@ -98,7 +99,7 @@ public void run(SourceContext ctx) throws Exception { LOG.debug("No data available to pull, waiting and will try again..."); try { Thread.sleep(waitTimeMs); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } continue; @@ -134,7 +135,7 @@ public void run(SourceContext ctx) throws Exception { @Override public void cancel() { - running = false; + isRunning = false; } @Override @@ -143,4 +144,13 @@ public void close() throws Exception { client.close(); } + /** + * {@inheritDoc} + *

+ * Sets the {@link #isRunning} flag to {@code false}. + */ + @Override + public void stop() { + this.isRunning = false; + } }