From 694431c1a4c580c32dfdb6472e9173c5358272a0 Mon Sep 17 00:00:00 2001 From: smarthi Date: Sun, 10 Jul 2016 01:20:01 -0400 Subject: [PATCH] {hotfix} fix errors and formatting in NiFi documentation --- docs/apis/streaming/connectors/nifi.md | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/docs/apis/streaming/connectors/nifi.md b/docs/apis/streaming/connectors/nifi.md index fe7d2d3ee3076..98cac5e946553 100644 --- a/docs/apis/streaming/connectors/nifi.md +++ b/docs/apis/streaming/connectors/nifi.md @@ -66,6 +66,8 @@ Example:
{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() .url("http://localhost:8080/nifi") .portName("Data for Flink") @@ -77,6 +79,8 @@ SourceFunction nifiSource = new NiFiSource(clientConfig);
{% highlight scala %} +val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment() + val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() .url("http://localhost:8080/nifi") .portName("Data for Flink") @@ -89,40 +93,51 @@ val nifiSource = new NiFiSource(clientConfig)
Here data is read from the Apache NiFi Output Port called "Data for Flink" which is part of Apache NiFi -Site-to_site protocol configuration. +Site-to-site protocol configuration. #### Apache NiFi Sink -The connector provides a Sink for writing data from Apache NiFi to Apache Flink. +The connector provides a Sink for writing data from Apache Flink to Apache NiFi. -The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`. `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder)` constructs a `NiFiSink(…)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi. +The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`. + +- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder)` constructs a `NiFiSink(…)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi. Example:
{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() .url("http://localhost:8080/nifi") .portName("Data from Flink") .requestBatchCount(5) .buildConfig(); -SinkFunction nifiSink = new NiFiSink<>(clientConfig); +SinkFunction nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder() {...}); + +streamExec {% endhighlight %}
{% highlight scala %} +val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment() + val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() .url("http://localhost:8080/nifi") .portName("Data from Flink") .requestBatchCount(5) .buildConfig() -val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig) +val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder() {...}) + +streamExecEnv.addSink(nifiSink) {% endhighlight %}
More information about Apache NiFi Site-to-Site Protocol can be found [here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site) + More information about Apache NiFi can be found [here](https://nifi.apache.org). \ No newline at end of file