Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions docs/apis/streaming/connectors/nifi.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ Example:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Flink")
Expand All @@ -77,6 +79,8 @@ SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()

val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Flink")
Expand All @@ -89,40 +93,51 @@ val nifiSource = new NiFiSource(clientConfig)
</div>

Here data is read from the Apache NiFi Output Port called "Data for Flink" which is part of Apache NiFi
Site-to_site protocol configuration.
Site-to-site protocol configuration.

#### Apache NiFi Sink

The connector provides a Sink for writing data from Apache NiFi to Apache Flink.
The connector provides a Sink for writing data from Apache Flink to Apache NiFi.

The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`. `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` constructs a `NiFiSink(…)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi.
The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`.

- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` constructs a `NiFiSink(…)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi.

Example:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.requestBatchCount(5)
.buildConfig();

SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig);
SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});

streamExec
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that line can be removed

{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()

val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.requestBatchCount(5)
.buildConfig()

val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig)
val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})

streamExecEnv.addSink(nifiSink)
{% endhighlight %}
</div>
</div>

More information about Apache NiFi Site-to-Site Protocol can be found [here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)

More information about Apache NiFi can be found [here](https://nifi.apache.org).