From 3d588a4e6eb36761f3d708e9fd3bce66cfd23a98 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Wed, 30 Sep 2015 09:15:44 -0400 Subject: [PATCH 1/3] FLINK-2740 Adding flink-connector-nifi module with NiFiSource and NiFiSink --- .../flink-connector-nifi/pom.xml | 94 ++++++++++++++ .../connectors/nifi/NiFiDataPacket.java | 39 ++++++ .../nifi/NiFiDataPacketBuilder.java | 34 +++++ .../streaming/connectors/nifi/NiFiSink.java | 74 +++++++++++ .../connectors/nifi/NiFiSinkTopology.java | 51 ++++++++ .../streaming/connectors/nifi/NiFiSource.java | 119 ++++++++++++++++++ .../connectors/nifi/NiFiSourceTopology.java | 55 ++++++++ .../nifi/StandardNiFiDataPacket.java | 46 +++++++ .../src/test/resources/NiFi_Flink.xml | 1 + .../flink-streaming-connectors/pom.xml | 1 + 10 files changed, 514 insertions(+) create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSinkTopology.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSourceTopology.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml new file mode 100644 index 0000000000000..4e1ebb88dfb48 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml @@ -0,0 +1,94 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors-parent + 0.10-SNAPSHOT + .. + + + flink-connector-nifi + flink-connector-nifi + + jar + + + + 0.3.0 + + + + + org.apache.nifi + nifi-site-to-site-client + ${nifi.version} + + + org.apache.flink + flink-streaming-core + ${project.version} + + + org.apache.flink + flink-streaming-core + ${project.version} + test + test-jar + + + org.apache.flink + flink-tests + ${project.version} + test + + + org.apache.flink + flink-test-utils + ${project.version} + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + 3 + + + + org.apache.maven.plugins + maven-failsafe-plugin + + 3 + + + + + + diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java new file mode 100644 index 0000000000000..41495d78dd511 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.nifi; + +import java.util.Map; + +/** + *

+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both + * a FlowFile's content and its attributes so that they can be processed by Flink. + *

+ */ +public interface NiFiDataPacket { + + /** + * @return the contents of a NiFi FlowFile + */ + byte[] getContent(); + + /** + * @return a Map of attributes that are associated with the NiFi FlowFile + */ + Map getAttributes(); + +} \ No newline at end of file diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java new file mode 100644 index 0000000000000..9bb521b29d115 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.nifi; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.io.Serializable; + +/** + * A function that can create a NiFiDataPacket from an incoming instance of the given type. + * + * @param + */ +public interface NiFiDataPacketBuilder extends Function, Serializable { + + NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx); + +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java new file mode 100644 index 0000000000000..c6436cfa3fae8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.nifi; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client. The sink requires + * a NiFiDataPacketBuilder which can create instances of NiFiDataPacket from the incoming data. + */ +public class NiFiSink extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(NiFiSink.class); + + private SiteToSiteClient client; + private SiteToSiteClientConfig clientConfig; + private NiFiDataPacketBuilder builder; + + public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder builder) { + this.clientConfig = clientConfig; + this.builder = builder; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); + } + + @Override + public void invoke(T value) throws Exception { + final NiFiDataPacket niFiDataPacket = builder.createNiFiDataPacket(value, getRuntimeContext()); + + final Transaction transaction = client.createTransaction(TransferDirection.SEND); + transaction.send(niFiDataPacket.getContent(), niFiDataPacket.getAttributes()); + transaction.confirm(); + transaction.complete(); + } + + @Override + public void close() throws Exception { + super.close(); + try { + client.close(); + } catch (final IOException ioe) { + LOG.error("Unable to close SiteToSiteClient: " + ioe.getMessage()); + } + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSinkTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSinkTopology.java new file mode 100644 index 0000000000000..b70ee01fff2c7 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSinkTopology.java @@ -0,0 +1,51 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.flink.streaming.connectors.nifi; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; + +import java.util.HashMap; + +/** + * An example topology that sends data to a NiFi input port named "Data from Flink". + */ +public class NiFiSinkTopology { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data from Flink") + .buildConfig(); + + DataStreamSink dataStream = env.fromElements("one", "two", "three", "four", "five", "q") + .addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder() { + @Override + public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) { + return new StandardNiFiDataPacket(s.getBytes(), new HashMap()); + } + })); + + env.execute(); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java new file mode 100644 index 0000000000000..bf87790eae250 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.nifi; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.stream.io.StreamUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source + * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile. + */ +public class NiFiSource extends RichParallelSourceFunction { + + private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class); + + private SiteToSiteClient client; + private SiteToSiteClientConfig clientConfig; + private transient volatile boolean running; + + public NiFiSource(SiteToSiteClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); + running = true; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (running) { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + + DataPacket dataPacket = transaction.receive(); + if (dataPacket == null) { + transaction.confirm(); + transaction.complete(); + + // no data available. Wait a bit and try again + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + + } + continue; + } + + final List niFiDataPackets = new ArrayList<>(); + do { + // Read the data into a byte array and wrap it along with the attributes + // into a NiFiDataPacket. + final InputStream inStream = dataPacket.getData(); + final byte[] data = new byte[(int) dataPacket.getSize()]; + StreamUtils.fillBuffer(inStream, data); + + final Map attributes = dataPacket.getAttributes(); + + niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes)); + dataPacket = transaction.receive(); + } while (dataPacket != null); + + // Confirm transaction to verify the data + transaction.confirm(); + + for (NiFiDataPacket dp : niFiDataPackets) { + ctx.collect(dp); + } + + transaction.complete(); + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void close() throws Exception { + super.close(); + try { + client.close(); + } catch (final IOException ioe) { + LOG.error("Unable to close SiteToSiteClient: " + ioe.getMessage()); + } + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSourceTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSourceTopology.java new file mode 100644 index 0000000000000..470f8faec3d60 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSourceTopology.java @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.flink.streaming.connectors.nifi; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; + +import java.nio.charset.Charset; + +/** + * An example topology that pulls data from a NiFi output port named "Data for Flink". + */ +public class NiFiSourceTopology { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data for Flink") + .buildConfig(); + + SourceFunction nifiSource = new NiFiSource(clientConfig); + DataStream streamSource = env.addSource(nifiSource); //.setParallelism(2); + + DataStream dataStream = streamSource.map(new MapFunction() { + @Override + public String map(NiFiDataPacket value) throws Exception { + return new String(value.getContent(), Charset.defaultCharset()); + } + }); + + dataStream.print(); + env.execute(); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java new file mode 100644 index 0000000000000..5ad4bae1cca83 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.nifi; + +import java.io.Serializable; +import java.util.Map; + +/** + * An implementation of NiFiDataPacket. + */ +public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable { + private static final long serialVersionUID = 6364005260220243322L; + + private final byte[] content; + private final Map attributes; + + public StandardNiFiDataPacket(final byte[] content, final Map attributes) { + this.content = content; + this.attributes = attributes; + } + + @Override + public byte[] getContent() { + return content; + } + + @Override + public Map getAttributes() { + return attributes; + } + +} \ No newline at end of file diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml new file mode 100644 index 0000000000000..9f08b9233d277 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml @@ -0,0 +1 @@ +