Skip to content

Commit

Permalink
[FLINK-3405] [nifi] Extend NiFiSource with interface StoppableFunction
Browse files Browse the repository at this point in the history
This closes #2047
  • Loading branch information
smarthi authored and StephanEwen committed Jun 8, 2016
1 parent 6afb2b0 commit 38362c4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
2 changes: 1 addition & 1 deletion flink-streaming-connectors/flink-connector-nifi/pom.xml
Expand Up @@ -37,7 +37,7 @@ under the License.


<!-- Allow users to pass custom connector versions --> <!-- Allow users to pass custom connector versions -->
<properties> <properties>
<nifi.version>0.3.0</nifi.version> <nifi.version>0.6.1</nifi.version>
</properties> </properties>


<dependencies> <dependencies>
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/ */
package org.apache.flink.streaming.connectors.nifi; package org.apache.flink.streaming.connectors.nifi;


import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
Expand All @@ -37,7 +38,7 @@
* A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
* produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile. * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
*/ */
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> { public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction{


private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class); private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);


Expand All @@ -46,7 +47,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
private long waitTimeMs; private long waitTimeMs;
private SiteToSiteClient client; private SiteToSiteClient client;
private SiteToSiteClientConfig clientConfig; private SiteToSiteClientConfig clientConfig;
private transient volatile boolean running; private volatile boolean isRunning = true;


/** /**
* Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms. * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
Expand All @@ -72,19 +73,19 @@ public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) {
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
running = true; isRunning = true;
} }


@Override @Override
public void run(SourceContext<NiFiDataPacket> ctx) throws Exception { public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
try { try {
while (running) { while (isRunning) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
if (transaction == null) { if (transaction == null) {
LOG.warn("A transaction could not be created, waiting and will try again..."); LOG.warn("A transaction could not be created, waiting and will try again...");
try { try {
Thread.sleep(waitTimeMs); Thread.sleep(waitTimeMs);
} catch (InterruptedException e) { } catch (InterruptedException ignored) {


} }
continue; continue;
Expand All @@ -98,7 +99,7 @@ public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
LOG.debug("No data available to pull, waiting and will try again..."); LOG.debug("No data available to pull, waiting and will try again...");
try { try {
Thread.sleep(waitTimeMs); Thread.sleep(waitTimeMs);
} catch (InterruptedException e) { } catch (InterruptedException ignored) {


} }
continue; continue;
Expand Down Expand Up @@ -134,7 +135,7 @@ public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {


@Override @Override
public void cancel() { public void cancel() {
running = false; isRunning = false;
} }


@Override @Override
Expand All @@ -143,4 +144,13 @@ public void close() throws Exception {
client.close(); client.close();
} }


/**
* {@inheritDoc}
* <p>
* Sets the {@link #isRunning} flag to {@code false}.
*/
@Override
public void stop() {
this.isRunning = false;
}
} }

0 comments on commit 38362c4

Please sign in to comment.