From 08015eaee84ada2d6bb3a223e587c551aa2be766 Mon Sep 17 00:00:00 2001 From: "info@richards-tech.com" Date: Tue, 26 Apr 2016 11:35:54 -0400 Subject: [PATCH 1/2] Added mqtt --- .../nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml | 35 ++++ .../nifi-mqtt-processors/pom.xml | 91 ++++++++ .../apache/nifi/processors/mqtt/GetMQTT.java | 191 +++++++++++++++++ .../apache/nifi/processors/mqtt/PutMQTT.java | 194 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 16 ++ .../nifi/processors/mqtt/TestGetMQTT.java | 41 ++++ .../nifi/processors/mqtt/TestPutMQTT.java | 41 ++++ nifi-nar-bundles/nifi-mqtt-bundle/pom.xml | 37 ++++ nifi-nar-bundles/pom.xml | 1 + 9 files changed, 647 insertions(+) create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/pom.xml diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml new file mode 100644 index 000000000000..158e80fd7ac0 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml @@ -0,0 +1,35 @@ + + + 4.0.0 + + org.apache.nifi + nifi-mqtt-bundle + 1.0.0-SNAPSHOT + + nifi-mqtt-nar + nar + NiFi NAR for interacting with MQTT brokers + + true + true + + + + org.apache.nifi + nifi-mqtt-processors + + + diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml new file mode 100644 index 000000000000..5bb84fab2513 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml @@ -0,0 +1,91 @@ + + + + org.apache.nifi + nifi-mqtt-bundle + 1.0.0-SNAPSHOT + + 4.0.0 + nifi-mqtt-processors + jar + + + Eclipse Paho Repo + https://repo.eclipse.org/content/repositories/paho-releases/ + + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + 4.11 + test + + + org.apache.commons + commons-io + 1.3.2 + + + org.bytedeco + javacv + 1.1 + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.0.2 + + + org.json + json + 20151123 + + + javax.websocket + javax.websocket-api + 1.1 + + + org.glassfish.tyrus.bundles + tyrus-standalone-client-jdk + 1.12 + + + org.glassfish.tyrus + tyrus-container-grizzly-client + 1.12 + + + diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java new file mode 100644 index 000000000000..8a50a9034c93 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.OutputStreamCallback; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.io.OutputStream; +import java.io.IOException; + +@Tags({"GetMQTT"}) +@CapabilityDescription("Gets messages from an MQTT broker") +@SeeAlso({}) +@ReadsAttributes({@ReadsAttribute(attribute="", description="")}) +@WritesAttributes({@WritesAttribute(attribute="broker", description="MQTT broker that was the message source"), + @WritesAttribute(attribute="topic", description="MQTT topic on which message was received")}) +public class GetMQTT extends AbstractProcessor implements MqttCallback { + + String topic; + String broker; + String clientID; + double lastTime; + boolean firstTime = true; + + MemoryPersistence persistence = new MemoryPersistence(); + MqttClient mqttClient; + + LinkedBlockingQueue mqttQueue = new LinkedBlockingQueue(); + + public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor + .Builder().name("Broker address") + .description("MQTT broker address (tcp://:") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROPERTY_MQTT_TOPIC = new PropertyDescriptor + .Builder().name("MQTT topic") + .description("MQTT topic to subscribe to") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor + .Builder().name("MQTT client ID") + .description("MQTT client ID to use") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship RELATIONSHIP_MQTTMESSAGE = new Relationship.Builder() + .name("MQTTMessage") + .description("MQTT message output") + .build(); + + private List descriptors; + + private Set relationships; + + @Override + public void connectionLost(Throwable t) { + getLogger().info("Connection to " + broker + " lost"); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + mqttQueue.add(new MQTTQueueMessage(topic, message.getPayload())); + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList(); + descriptors.add(PROPERTY_BROKER_ADDRESS); + descriptors.add(PROPERTY_MQTT_TOPIC); + descriptors.add(PROPERTY_MQTT_CLIENTID); + + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet(); + relationships.add(RELATIONSHIP_MQTTMESSAGE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + try { + broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue(); + topic = context.getProperty(PROPERTY_MQTT_TOPIC).getValue(); + clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue(); + mqttClient = new MqttClient(broker, clientID, persistence); + MqttConnectOptions connOpts = new MqttConnectOptions(); + mqttClient.setCallback(this); + connOpts.setCleanSession(true); + getLogger().info("Connecting to broker: " + broker); + mqttClient.connect(connOpts); + mqttClient.subscribe(topic, 0); + } catch(MqttException me) { + getLogger().error("msg "+me.getMessage()); + } + } + + @OnUnscheduled + public void onUnscheduled(final ProcessContext context) { + try { + mqttClient.disconnect(); + } catch(MqttException me) { + + } + getLogger().error("Disconnected"); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final List messageList = new LinkedList(); + + mqttQueue.drainTo(messageList); + if (messageList.isEmpty()) + return; + + Iterator iterator = messageList.iterator(); + while (iterator.hasNext()) { + FlowFile messageFlowfile = session.create(); + final MQTTQueueMessage m = (MQTTQueueMessage)iterator.next(); + + messageFlowfile = session.putAttribute(messageFlowfile, "broker", broker); + messageFlowfile = session.putAttribute(messageFlowfile, "topic", topic); + messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() { + + @Override + public void process(final OutputStream out) throws IOException { + out.write(m.message); + } + }); + session.transfer(messageFlowfile, RELATIONSHIP_MQTTMESSAGE); + session.commit(); + } + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java new file mode 100644 index 000000000000..f8bfe5fccbad --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.commons.io.IOUtils; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.io.InputStream; +import java.io.IOException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"PutMQTT"}) +@CapabilityDescription("Publishes message to an MQTT topic") +@SeeAlso({}) +@ReadsAttributes({@ReadsAttribute(attribute="topic", description="Topic to publish message to")}) +@WritesAttributes({@WritesAttribute(attribute="", description="")}) +public class PutMQTT extends AbstractProcessor implements MqttCallback { + + String broker; + String clientID; + + MemoryPersistence persistence = new MemoryPersistence(); + MqttClient mqttClient; + + public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor + .Builder().name("Broker address") + .description("MQTT broker address (tcp://:") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor + .Builder().name("MQTT client ID") + .description("MQTT client ID to use") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private List descriptors; + + private Set relationships; + + @Override + public void connectionLost(Throwable t) { + getLogger().info("Connection to " + broker + " lost"); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList(); + descriptors.add(PROPERTY_BROKER_ADDRESS); + descriptors.add(PROPERTY_MQTT_CLIENTID); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet(); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + try { + broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue(); + clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue(); + mqttClient = new MqttClient(broker, clientID, persistence); + MqttConnectOptions connOpts = new MqttConnectOptions(); + mqttClient.setCallback(this); + connOpts.setCleanSession(true); + getLogger().info("Connecting to broker: " + broker); + mqttClient.connect(connOpts); + } catch(MqttException me) { + getLogger().error("msg "+me.getMessage()); + } + } + + @OnUnscheduled + public void onUnscheduled(final ProcessContext context) { + try { + mqttClient.disconnect(); + } catch(MqttException me) { + + } + getLogger().error("Disconnected"); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final AtomicReference message = new AtomicReference<>(); + + FlowFile flowfile = session.get(); + message.set(""); + + // get the MQTT topic + + String topic = flowfile.getAttribute("destination"); + + if (topic == null) { + getLogger().error("No topic attribute on flowfile"); + session.remove(flowfile); + return; + } + + // do the read + + session.read(flowfile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + try{ + message.set(IOUtils.toString(in)); + }catch(Exception e){ + getLogger().error("Failed to read flowfile " + e.getMessage()); + } + } + }); + try { + session.remove(flowfile); + } catch (Exception e) { + getLogger().error("Failed to remove flowfile " + e.getMessage()); + return; + } + + String output = message.get(); + + if ((output == null) || output.isEmpty()) { + return; + } + + try { + mqttClient.publish(topic, output.getBytes(), 0, false); + } catch(MqttException me) { + getLogger().error("msg "+me.getMessage()); + } + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 000000000000..b5a30e990722 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.mqtt.GetMQTT +org.apache.nifi.processors.mqtt.PutMQTT diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java new file mode 100644 index 000000000000..cd2273563604 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.processors.mqtt.GetMQTT; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + + +public class TestGetMQTT { + + private TestRunner testRunner; + + @Before + public void init() { + testRunner = TestRunners.newTestRunner(GetMQTT.class); + } + + @Test + public void testProcessor() { + + } + +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java new file mode 100644 index 000000000000..64a05489750c --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.processors.mqtt.PutMQTT; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + + +public class TestPutMQTT { + + private TestRunner testRunner; + + @Before + public void init() { + testRunner = TestRunners.newTestRunner(PutMQTT.class); + } + + @Test + public void testProcessor() { + + } + +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml new file mode 100644 index 000000000000..456fae366ac0 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + org.apache.nifi + nifi-nar-bundles + 1.0.0-SNAPSHOT + + nifi-mqtt-bundle + pom + + nifi-mqtt-processors + nifi-mqtt-nar + + + + + org.apache.nifi + nifi-mqtt-processors + 1.0.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 7ed2da821570..991204819653 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -59,6 +59,7 @@ nifi-jms-bundle nifi-cassandra-bundle nifi-spring-bundle + nifi-mqtt-bundle From 116a29ad84cdcd5aa8a4fb906ede6be7e73de866 Mon Sep 17 00:00:00 2001 From: "info@richards-tech.com" Date: Tue, 26 Apr 2016 12:48:32 -0400 Subject: [PATCH 2/2] Fixed attribute name in PutMQTT --- nifi-assembly/pom.xml | 5 ++++ .../nifi-mqtt-processors/pom.xml | 5 ---- .../apache/nifi/processors/mqtt/GetMQTT.java | 4 +-- .../processors/mqtt/MQTTQueueMessage.java | 29 +++++++++++++++++++ .../apache/nifi/processors/mqtt/PutMQTT.java | 5 ++-- pom.xml | 6 ++++ 6 files changed, 44 insertions(+), 10 deletions(-) create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 7f92a2a0c133..12aa4dc8a216 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -307,6 +307,11 @@ language governing permissions and limitations under the License. --> nifi-spring-nar nar + + org.apache.nifi + nifi-mqtt-nar + nar + diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml index 5bb84fab2513..9f4e9563d26d 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml @@ -67,11 +67,6 @@ org.eclipse.paho.client.mqttv3 1.0.2 - - org.json - json - 20151123 - javax.websocket javax.websocket-api diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java index 8a50a9034c93..1391317b3d8b 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java @@ -64,11 +64,11 @@ public class GetMQTT extends AbstractProcessor implements MqttCallback { MemoryPersistence persistence = new MemoryPersistence(); MqttClient mqttClient; - LinkedBlockingQueue mqttQueue = new LinkedBlockingQueue(); + LinkedBlockingQueue mqttQueue = new LinkedBlockingQueue<>(); public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor .Builder().name("Broker address") - .description("MQTT broker address (tcp://:") + .description("MQTT broker address (e.g. tcp://localhost:1883)") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java new file mode 100644 index 000000000000..0874b10e8b92 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +public class MQTTQueueMessage +{ + public String topic; + public byte[] message; + + public MQTTQueueMessage(String topic, byte[] message) { + this.topic = topic; + this.message = message; + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java index f8bfe5fccbad..29aeb1061760 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java @@ -33,7 +33,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.commons.io.IOUtils; @@ -67,7 +66,7 @@ public class PutMQTT extends AbstractProcessor implements MqttCallback { public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor .Builder().name("Broker address") - .description("MQTT broker address (tcp://:") + .description("MQTT broker address (e.g. tcp://localhost:1883)") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -152,7 +151,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // get the MQTT topic - String topic = flowfile.getAttribute("destination"); + String topic = flowfile.getAttribute("topic"); if (topic == null) { getLogger().error("No topic attribute on flowfile"); diff --git a/pom.xml b/pom.xml index 9acd949ba458..ac0b5b15dbe5 100644 --- a/pom.xml +++ b/pom.xml @@ -1066,6 +1066,12 @@ language governing permissions and limitations under the License. --> 1.0.0-SNAPSHOT nar + + org.apache.nifi + nifi-mqtt-nar + 1.0.0-SNAPSHOT + nar + org.apache.nifi nifi-hbase_1_1_2-client-service-nar