From b6f88c75170a3bbfc01502061f73412f76e255a4 Mon Sep 17 00:00:00 2001 From: "Lerch, Kay" Date: Sun, 10 Apr 2016 12:48:07 +0200 Subject: [PATCH 1/2] NIFI-1767 - Added 1st iteration of IOT-Processors for reading and writing AWS IOT shadow and MQTT topics --- .../nifi-aws-processors/pom.xml | 15 ++ .../aws/iot/AbstractAWSIoTProcessor.java | 232 ++++++++++++++++++ .../iot/AbstractAWSIoTShadowProcessor.java | 59 +++++ .../processors/aws/iot/ConsumeAWSIoTMqtt.java | 162 ++++++++++++ .../processors/aws/iot/GetAWSIoTShadow.java | 114 +++++++++ .../processors/aws/iot/PublishAWSIoTMqtt.java | 139 +++++++++++ .../processors/aws/iot/PutAWSIoTShadow.java | 107 ++++++++ .../processors/aws/iot/util/AWS4Signer.java | 121 +++++++++ .../processors/aws/iot/util/IoTMessage.java | 44 ++++ .../iot/util/MqttWebSocketAsyncClient.java | 117 +++++++++ .../aws/iot/util/WebSocketNetworkModule.java | 195 +++++++++++++++ .../org.apache.nifi.processor.Processor | 5 +- .../nifi/processors/aws/iot/ITGetIOTMqtt.java | 72 ++++++ .../processors/aws/iot/ITGetIOTShadow.java | 66 +++++ .../nifi/processors/aws/iot/ITPutIOTMqtt.java | 75 ++++++ .../processors/aws/iot/ITPutIOTShadow.java | 69 ++++++ pom.xml | 15 ++ 17 files changed, 1606 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTShadowProcessor.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/ConsumeAWSIoTMqtt.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoTShadow.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PublishAWSIoTMqtt.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoTShadow.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/AWS4Signer.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/IoTMessage.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/WebSocketNetworkModule.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITGetIOTMqtt.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITGetIOTShadow.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITPutIOTMqtt.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITPutIOTShadow.java diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml index 3f0f84306dfd..b1a2d3fb6045 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -35,6 +35,21 @@ com.amazonaws aws-java-sdk + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.0.2 + + + org.eclipse.jetty.websocket + websocket-client + 9.2.15.v20160210 + + + javax.websocket + javax.websocket-api + 1.1 + org.apache.nifi nifi-mock diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java new file mode 100644 index 000000000000..1a4d4519c1bf --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.iot.AWSIotClient; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.iot.util.AWS4Signer; +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor { + static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint"; + static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client"; + static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive"; + static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic"; + static final String PROP_NAME_QOS = "aws.iot.mqtt.qos"; + /** + * Amazon's current service limit on websocket connection duration + */ + static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24; + /** + * When to start indicating the need for connection renewal (in seconds before actual termination) + */ + static final Integer DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20; + static final String PROP_DEFAULT_CLIENT = AbstractAWSIoTProcessor.class.getSimpleName(); + /** + * Default QoS level for message delivery + */ + static final Integer DEFAULT_QOS = 0; + String awsTopic; + int awsQos; + MqttWebSocketAsyncClient mqttClient; + String awsEndpoint; + String awsClientId; + + static final Integer mqttActionTimeout = -1; + + private String awsRegion; + private Integer awsKeepAliveSeconds; + private Date dtLastConnect; + + public static final PropertyDescriptor PROP_ENDPOINT = new PropertyDescriptor + .Builder().name(PROP_NAME_ENDPOINT) + .description("Your endpoint identifier in AWS IoT (e.g. A1B71MLXKNCXXX)") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_CLIENT = new PropertyDescriptor + .Builder().name(PROP_NAME_CLIENT) + .description("MQTT client ID to use. Under the cover your input will be extended by a random " + + "string to ensure a unique id among all connected clients.") + .required(false) + .defaultValue(PROP_DEFAULT_CLIENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_KEEPALIVE = new PropertyDescriptor + .Builder().name(PROP_NAME_KEEPALIVE) + .description("Seconds a WebSocket-connection remains open after automatically renewing it. " + + "This is neccessary due to Amazon's service limit on WebSocket connection duration. " + + "As soon as the limit is changed by Amazon you can adjust the value here. Never use " + + "a duration longer than supported by Amazon. This processor renews the connection " + + "" + DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION + " seconds before the " + + "actual expiration. If no value set the default will be " + PROP_DEFAULT_KEEPALIVE + ".") + .required(false) + .defaultValue(PROP_DEFAULT_KEEPALIVE.toString()) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor + .Builder().name(PROP_NAME_TOPIC) + .description("MQTT topic to work with. (pattern: $aws/things/mything/shadow/update).") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor + .Builder().name(PROP_NAME_QOS) + .description("Decide for at most once (0) or at least once (1) message-receiption. " + + "Currently AWS IoT does not support QoS-level 2. If no value set the default QoS " + + "is " + DEFAULT_QOS + ".") + .required(false) + .allowableValues("0", "1") + .defaultValue(DEFAULT_QOS.toString()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + /** + * Create client using credentials provider. This is the preferred way for creating clients + */ + @Override + protected AWSIotClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials provider "); + // Actually this client is not needed. However, it is initialized due to the pattern of + // AbstractAWSCredentialsProviderProcessor + return new AWSIotClient(credentialsProvider, config); + } + + /** + * Create client using AWSCredentails + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ + @Override + protected AWSIotClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials "); + // Actually this client is not needed. it is initialized due to the pattern of + // AbstractAWSProcessor + return new AWSIotClient(credentials, config); + } + + /** + * Gets ready an MQTT client by connecting to a AWS IoT WebSocket endpoint specific to the properties + * @param context processor context + */ + void init(final ProcessContext context) { + // read out properties + awsEndpoint = context.getProperty(PROP_ENDPOINT).getValue(); + awsRegion = context.getProperty(REGION).getValue(); + awsClientId = context.getProperty(PROP_CLIENT).isSet() ? context.getProperty(PROP_CLIENT).getValue() : PROP_DEFAULT_CLIENT; + awsKeepAliveSeconds = context.getProperty(PROP_KEEPALIVE).isSet() ? context.getProperty(PROP_KEEPALIVE).asInteger() : PROP_DEFAULT_KEEPALIVE; + awsTopic = context.getProperty(PROP_TOPIC).getValue(); + awsQos = context.getProperty(PROP_QOS).isSet() ? context.getProperty(PROP_QOS).asInteger() : DEFAULT_QOS; + // initialize and connect to mqtt endpoint + mqttClient = connect(context); + } + + @OnStopped + public void onStopped(final ProcessContext context) { + try { + mqttClient.disconnect(); + } catch (MqttException me) { + getLogger().warn("MQTT " + me.getMessage()); + } + getLogger().info("Disconnected"); + } + + /** + * Returns the lifetime-seconds of the established websocket-connection + * @return seconds + */ + long getConnectionDuration() { + return dtLastConnect != null + ? TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - dtLastConnect.getTime()) + : awsKeepAliveSeconds + 1; + } + + /** + * In seconds get the remaining lifetime of the connection. It is not the actual time to + * expiration but an advice to when it is worth renewing the connection. + * @return seconds + */ + long getRemainingConnectionLifetime() { + return awsKeepAliveSeconds - DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION; + } + + /** + * Indicates if WebSocket connection is about to expire. It gives the caller an advice + * to renew the connection some time before the actual expiration. + * @return Indication (if true caller should renew the connection) + */ + boolean isConnectionAboutToExpire() { + return getConnectionDuration() > getRemainingConnectionLifetime(); + } + + /** + * Connects to the websocket-endpoint over an MQTT client. + * @param context processcontext + * @return websocket connection client + */ + MqttWebSocketAsyncClient connect(ProcessContext context) { + getCredentialsProvider(context).refresh(); + AWSCredentials awsCredentials = getCredentialsProvider(context).getCredentials(); + MqttWebSocketAsyncClient _mqttClient = null; + + // generate mqtt endpoint-address with authentication details + String strEndpointAddress; + try { + strEndpointAddress = AWS4Signer.getAddress(awsRegion, awsEndpoint, awsCredentials); + } catch (Exception e) { + getLogger().error("Error while generating AWS endpoint-address caused by " + e.getMessage()); + return null; + } + // extend clientId with random string in order to ensure unique id per connection + String clientId = awsClientId + RandomStringUtils.random(12, true, false); + + final MqttConnectOptions options = new MqttConnectOptions(); + options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); + options.setKeepAliveInterval(0); + getLogger().info("Connecting to AWS IoT: " + awsEndpoint + " with " + clientId); + + // set up mqtt-Client with endpoint-address + try { + _mqttClient = new MqttWebSocketAsyncClient(strEndpointAddress, clientId, getLogger()); + // start connecting and wait for completion + _mqttClient.connect(options).waitForCompletion(mqttActionTimeout); + } catch (MqttException e) { + getLogger().error("Error while connecting to AWS websocket-endpoint caused by " + e.getMessage()); + } + // keep in mind last connection date to be aware of its expiration later on + dtLastConnect = new Date(); + getLogger().info("Connected to AWS IoT."); + return _mqttClient; + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTShadowProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTShadowProcessor.java new file mode 100644 index 000000000000..307ed170a4a8 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTShadowProcessor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.iotdata.AWSIotDataClient; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +public abstract class AbstractAWSIoTShadowProcessor extends AbstractAWSCredentialsProviderProcessor { + protected static final String PROP_NAME_THING = "aws.iot.thing"; + + public static final PropertyDescriptor PROP_THING = new PropertyDescriptor + .Builder().name(PROP_NAME_THING) + .description("Name your registered thing in AWS IoT.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + /** + * Create client using credentials provider. This is the preferred way for creating clients + */ + @Override + protected AWSIotDataClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials provider "); + + return new AWSIotDataClient(credentialsProvider, config); + } + + /** + * Create client using AWSCredentails + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ + @Override + protected AWSIotDataClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials "); + + return new AWSIotDataClient(credentials, config); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/ConsumeAWSIoTMqtt.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/ConsumeAWSIoTMqtt.java new file mode 100644 index 000000000000..b69c78616402 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/ConsumeAWSIoTMqtt.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processors.aws.iot.util.IoTMessage; +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttException; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Get", "Subscribe", "Receive"}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Subscribes to and receives messages from MQTT-topic(s) of AWS IoT." + + "The processor keeps open a WebSocket connection and will automatically renew the " + + "connection to overcome Amazon's service limit on maximum connection duration. Depending on " + + "your set up QoS the processor will miss some messages (QoS=0) or receives messages twice (QoS=1) " + + "while reconnecting to AWS IoT WebSocket endpoint. We strongly recommend you to make use of " + + "processor isolation as concurrent subscriptions to an MQTT topic result in multiple message receiptions.") +@SeeAlso({ GetAWSIoTShadow.class }) +@WritesAttributes({ + @WritesAttribute(attribute = "aws.iot.mqtt.endpoint", description = "AWS endpoint this message was received from."), + @WritesAttribute(attribute = "aws.iot.mqtt.topic", description = "MQTT topic this message was received from."), + @WritesAttribute(attribute = "aws.iot.mqtt.client", description = "MQTT client which received the message."), + @WritesAttribute(attribute = "aws.iot.mqtt.qos", description = "Underlying MQTT quality-of-service.") +}) +public class ConsumeAWSIoTMqtt extends AbstractAWSIoTProcessor { + + public static final List properties = Collections.unmodifiableList( + Arrays.asList( + PROP_QOS, + PROP_TOPIC, + PROP_ENDPOINT, + PROP_KEEPALIVE, + PROP_CLIENT, + AWS_CREDENTIALS_PROVIDER_SERVICE, + REGION)); + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + // init to build up mqtt connection over web-sockets + init(context); + if (mqttClient != null && mqttClient.isConnected()) { + try { + // subscribe to topic with configured qos in order to start receiving messages + mqttClient.subscribe(awsTopic, awsQos); + } catch (MqttException e) { + getLogger().error("Error while subscribing to topic " + awsTopic + " with client-id " + mqttClient.getClientId() + " caused by " + e.getMessage()); + } + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + // check if connection is about to terminate + if (isConnectionAboutToExpire()) { + MqttWebSocketAsyncClient _mqttClient = null; + try { + // before subscribing to the topic with new connection first unsubscribe + // old connection from same topic if subscription is set to QoS 0 + if (awsQos == 0) { + mqttClient.unsubscribe(awsTopic).waitForCompletion(mqttActionTimeout); + } + // establish a second connection + _mqttClient = connect(context); + // now subscribe to topic with new connection + _mqttClient.subscribe(awsTopic, awsQos).waitForCompletion(mqttActionTimeout); + // between re-subscription and disconnect from old connection + // QoS=0 subscription eventually lose some messages + // QoS=1 subscription eventually receive some messages twice + // now terminate old connection + mqttClient.disconnect().waitForCompletion(mqttActionTimeout); + } catch (MqttException e) { + getLogger().error("Error while renewing connection with client " + mqttClient.getClientId() + " caused by " + e.getMessage()); + } finally { + if (_mqttClient != null) { + // grab messages left over from old connection + _mqttClient.setAwsQueuedMqttMessages(mqttClient.getAwsQueuedMqttMessages()); + // now set the new connection as the default connection + mqttClient = _mqttClient; + } + } + } + processQueuedMessages(session); + } + + @OnStopped + public void onStopped(final ProcessContext context) { + + } + + private void processQueuedMessages(ProcessSession session) { + LinkedBlockingQueue messageQueue = mqttClient.getAwsQueuedMqttMessages(); + + while (!messageQueue.isEmpty()) { + FlowFile flowFile = session.create(); + final IoTMessage msg = messageQueue.peek(); + final Map attributes = new HashMap<>(); + + attributes.put(PROP_NAME_ENDPOINT, awsEndpoint); + attributes.put(PROP_NAME_TOPIC, msg.getTopic()); + attributes.put(PROP_NAME_CLIENT, awsClientId); + attributes.put(PROP_NAME_QOS, msg.getQos().toString()); + flowFile = session.putAllAttributes(flowFile, attributes); + + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(msg.getPayload()); + } + }); + session.transfer(flowFile, REL_SUCCESS); + session.commit(); + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoTShadow.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoTShadow.java new file mode 100644 index 000000000000..6cc349b3af50 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoTShadow.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import com.amazonaws.services.iotdata.AWSIotDataClient; +import com.amazonaws.services.iotdata.model.GetThingShadowRequest; +import com.amazonaws.services.iotdata.model.GetThingShadowResult; +import org.apache.nifi.annotation.behavior.*; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.HashMap; +import java.util.Map; + +@EventDriven +@Tags({"Amazon", "AWS", "IOT", "Shadow", "Get"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("Gets last persisted state of a thing in AWS IoT by reading out the shadow. " + + "A shadow might change more often than you get triggered. In order to get every message send " + + "out by a thing you better use ConsumeAWSIoTMqtt processor. You can dynamically set a thing-name " + + "when overriding the processor-configuration with a message-attribute \"aws.iot.thing.override\".") +@SeeAlso({ ConsumeAWSIoTMqtt.class }) +@ReadsAttributes({ + @ReadsAttribute(attribute = "aws.iot.thing.override", description = "Overrides the processor configuration for topic."), +}) +@WritesAttributes({ + @WritesAttribute(attribute = "aws.iot.thing", description = "Thing name in AWS IoT"), +}) +public class GetAWSIoTShadow extends AbstractAWSIoTShadowProcessor { + public static final List properties = Collections.unmodifiableList( + Arrays.asList( + PROP_THING, + AWS_CREDENTIALS_PROVIDER_SERVICE, + TIMEOUT, + PROXY_HOST, + PROXY_HOST_PORT, + REGION)); + + private final static String ATTR_NAME_THING = PROP_NAME_THING + ".override"; + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final AWSIotDataClient iotClient = this.getClient(); + + if (iotClient == null) { + getLogger().error("AWS-Client was not initialized. See logs to find reasons."); + return; + } + // get flowfile + FlowFile flowFile = session.get(); + // if provided override configured thing name with name from corresponding message attribute + String thingName = flowFile != null && flowFile.getAttributes().containsKey(ATTR_NAME_THING) + ? flowFile.getAttribute(ATTR_NAME_THING) + : context.getProperty(PROP_NAME_THING).getValue(); + + FlowFile flowFileOut = flowFile == null ? session.create() : flowFile; + + // ask shadow of the thing for last reported state by requesting the API of AWS + final GetThingShadowRequest iotRequest = new GetThingShadowRequest().withThingName(thingName); + final GetThingShadowResult iotResponse = iotClient.getThingShadow(iotRequest); + + //FlowFile flowFileOut = session.create(); + final Map attributes = new HashMap<>(); + attributes.put(PROP_NAME_THING, thingName); + flowFileOut = session.putAllAttributes(flowFileOut, attributes); + + flowFileOut = session.write(flowFileOut, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(iotResponse.getPayload().array()); + } + }); + session.transfer(flowFileOut, REL_SUCCESS); + session.commit(); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PublishAWSIoTMqtt.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PublishAWSIoTMqtt.java new file mode 100644 index 000000000000..2e72afa16320 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PublishAWSIoTMqtt.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.eclipse.paho.client.mqttv3.MqttException; + +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.Map; + +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor keeps open a WebSocket connection and will automatically renew the " + + "connection to overcome Amazon's service limit on maximum connection duration. Most of the " + + "configuration can be overridden by values coming in as message attributes. This applies for " + + "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"), the qos-level " + + "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")") +@SeeAlso({ PutAWSIoTShadow.class }) +@ReadsAttributes({ + @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides the processor configuration for topic."), + @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides the processor configuration for quality of service."), + @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides the processor configuration for retaining a published state in the AWS shadow.") +}) +@WritesAttributes({ + @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details") +}) +public class PublishAWSIoTMqtt extends AbstractAWSIoTProcessor { + private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained"; + private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override"; + private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override"; + private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override"; + private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception"; + private final static Boolean PROP_DEFAULT_RETAINED = false; + private Boolean shouldRetain; + + public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor + .Builder().name(PROP_NAME_RETAINED) + .description("For messages being published, a true setting indicates that the MQTT server " + + "should retain a copy of the message. The message will then be transmitted to new " + + "subscribers to a topic that matches the message topic. For subscribers registering " + + "a new subscription, the flag being true indicates that the received message is not " + + "a new one, but one that has been retained by the MQTT server.") + .required(true) + .defaultValue(PROP_DEFAULT_RETAINED.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final List properties = Collections.unmodifiableList( + Arrays.asList( + PROP_QOS, + PROP_TOPIC, + PROP_RETAINED, + PROP_ENDPOINT, + PROP_KEEPALIVE, + PROP_CLIENT, + AWS_CREDENTIALS_PROVIDER_SERVICE, + TIMEOUT, + REGION)); + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + shouldRetain = context.getProperty(PROP_RETAINED).isSet() ? context.getProperty(PROP_RETAINED).asBoolean() : PROP_DEFAULT_RETAINED; + init(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + // check if MQTT-connection is about to expire + if (isConnectionAboutToExpire()) { + // renew connection + mqttClient = connect(context); + } + // get flowfile + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + Map attributes = flowFile.getAttributes(); + // if provided override MQTT configuration with values from the corresponding message attributes + String topic = attributes.containsKey(ATTR_NAME_TOPIC) ? attributes.get(ATTR_NAME_TOPIC) : awsTopic; + Integer qos = attributes.containsKey(ATTR_NAME_QOS) ? Integer.parseInt(attributes.get(ATTR_NAME_QOS)) : awsQos; + Boolean retained = attributes.containsKey(ATTR_NAME_RETAINED) ? Boolean.parseBoolean(attributes.get(ATTR_NAME_RETAINED)) : shouldRetain; + // get message content + final ByteArrayOutputStream fileContentStream = new ByteArrayOutputStream(); + session.exportTo(flowFile, fileContentStream); + + try { + // publish messages to mqtt-topic(s) + mqttClient.publish(topic, fileContentStream.toByteArray(), qos, retained).waitForCompletion(); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, awsEndpoint + "(" + awsClientId + ")"); + } catch (MqttException e) { + getLogger().error("Error while publishing to topics with client " + mqttClient.getClientId() + " caused by " + e.getMessage()); + context.yield(); + } + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoTShadow.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoTShadow.java new file mode 100644 index 000000000000..0c7a6e73c881 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoTShadow.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import com.amazonaws.services.iotdata.AWSIotDataClient; +import com.amazonaws.services.iotdata.model.UpdateThingShadowRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +@Tags({"Amazon", "AWS", "IOT", "Shadow", "Put", "Update", "Write"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Sets state of a thing in AWS IoT by updating the shadow. You can dynamically set a thing-name " + + "when overriding the processor-configuration with a message-attribute \"aws.iot.thing.override\".") +@SeeAlso({ PublishAWSIoTMqtt.class }) +@ReadsAttributes({ + @ReadsAttribute(attribute = "aws.iot.thing.override", description = "Overrides the processor configuration for topic."), +}) +@WritesAttributes({ + @WritesAttribute(attribute = "aws.iot.thing", description = "Underlying MQTT quality-of-service.") +}) +public class PutAWSIoTShadow extends AbstractAWSIoTShadowProcessor { + public static final List properties = Collections.unmodifiableList( + Arrays.asList( + PROP_THING, + TIMEOUT, + AWS_CREDENTIALS_PROVIDER_SERVICE, + PROXY_HOST, + PROXY_HOST_PORT, + REGION)); + + private final static String ATTR_NAME_THING = PROP_NAME_THING + ".override"; + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + // get flowfile + FlowFile flowFile = session.get(); + if (flowFile == null) return; + + // if provided override configured thing name with the name from the corresponding message attribute + String thingName = flowFile.getAttributes().containsKey(ATTR_NAME_THING) + ? flowFile.getAttribute(ATTR_NAME_THING) + : context.getProperty(PROP_NAME_THING).getValue(); + + // get message content and put it into the flowfile + final ByteArrayOutputStream fileContentStream = new ByteArrayOutputStream(); + session.exportTo(flowFile, fileContentStream); + + ByteBuffer buffer = ByteBuffer.wrap(fileContentStream.toByteArray()); + + final AWSIotDataClient iotClient = getClient(); + final UpdateThingShadowRequest iotRequest = new UpdateThingShadowRequest() + .withThingName(thingName) + .withPayload(buffer); + + try { + iotClient.updateThingShadow(iotRequest); + session.transfer(flowFile, REL_SUCCESS); + session.commit(); + } catch (final Exception e) { + getLogger().error("Error while updating thing shadow {}; routing to failure", new Object[]{e}); + session.penalize(flowFile); + } + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/AWS4Signer.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/AWS4Signer.java new file mode 100644 index 000000000000..c95dfffe2af7 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/AWS4Signer.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot.util; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSSessionCredentials; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import java.io.UnsupportedEncodingException; +import java.math.BigInteger; +import java.net.URLEncoder; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; + +public class AWS4Signer { + private static final String AWS_IOT_DATE_FORMAT = "yyyyMMdd'T'HHmmssXXX"; + private static final String AWS_IOT_SIGNED_HEADERS = "host"; + private static final String AWS_IOT_DATESTAMP_FORMAT = "yyyyMMdd"; + private static final String AWS_IOT_ALGORITHM = "AWS4-HMAC-SHA256"; + private static final String AWS_IOT_SERVICE = "iotdevicegateway"; + private static final String AWS_IOT_PROTOCOL = "wss"; + private static final String AWS_HTTP_METHOD = "GET"; + private static final String AWS_IOT_WS_PATH = "/mqtt"; + + public static String getAddress(String strRegion, String strEndpointId, AWSCredentials awsCredentials) throws Exception { + Date dtNow = new Date(); + + String strHost = strEndpointId + ".iot." + strRegion + ".amazonaws.com"; + String strAmzdate = GetUTCdatetimeAsString(dtNow, AWS_IOT_DATE_FORMAT); + String strDatestamp = GetUTCdatetimeAsString(dtNow, AWS_IOT_DATESTAMP_FORMAT); + String strHashedPayload = sha256(""); + + String strCredentialScope = + strDatestamp + "/" + + strRegion + "/" + + AWS_IOT_SERVICE + "/aws4_request"; + + String strAmzCredential = + awsCredentials.getAWSAccessKeyId() + "/" + strCredentialScope; + + String strCanonicalQuerystring = + "X-Amz-Algorithm=" + AWS_IOT_ALGORITHM + + "&X-Amz-Credential=" + URLEncoder.encode(strAmzCredential, "UTF-8") + + "&X-Amz-Date=" + strAmzdate + + "&X-Amz-SignedHeaders=" + AWS_IOT_SIGNED_HEADERS; + + String strCanonicalHeaders = "host:" + strHost + "\n"; + + String strCanonicalRequest = + AWS_HTTP_METHOD + "\n" + AWS_IOT_WS_PATH + "\n" + + strCanonicalQuerystring + "\n" + + strCanonicalHeaders + "\n" + AWS_IOT_SIGNED_HEADERS + "\n" + + strHashedPayload; + + String strToSign = AWS_IOT_ALGORITHM + "\n" + + strAmzdate + "\n" + + strCredentialScope + "\n" + + sha256(strCanonicalRequest); + + byte[] bSigningKey = getSignatureKey(awsCredentials.getAWSSecretKey(), strDatestamp, strRegion, AWS_IOT_SERVICE); + byte[] bSignature = HmacSHA256(strToSign, bSigningKey); + + strCanonicalQuerystring += "&X-Amz-Signature=" + bin2hex(bSignature); + + if (awsCredentials instanceof AWSSessionCredentials) { + String strSessionToken = ((AWSSessionCredentials) awsCredentials).getSessionToken(); + strCanonicalQuerystring += "&X-Amz-Security-Token=" + URLEncoder.encode(strSessionToken, "UTF-8"); + } + + return AWS_IOT_PROTOCOL + "://" + strHost + AWS_IOT_WS_PATH + "?" + strCanonicalQuerystring; + } + + private static String sha256(String data) throws UnsupportedEncodingException, NoSuchAlgorithmException { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + digest.reset(); + return bin2hex(digest.digest(data.getBytes("UTF-8"))); + } + + private static String bin2hex(byte[] data) { + return String.format("%0" + (data.length * 2) + "X", new BigInteger(1, data)).toLowerCase(); + } + + private static String GetUTCdatetimeAsString(Date dt, String strFormat) { + final SimpleDateFormat sdf = new SimpleDateFormat(strFormat); + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + return sdf.format(dt); + } + + private static byte[] HmacSHA256(String data, byte[] key) throws Exception { + String algorithm="HmacSHA256"; + Mac mac = Mac.getInstance(algorithm); + mac.init(new SecretKeySpec(key, algorithm)); + return mac.doFinal(data.getBytes("UTF8")); + } + + private static byte[] getSignatureKey(String key, String dateStamp, String regionName, String serviceName) throws Exception { + byte[] kSecret = ("AWS4" + key).getBytes("UTF8"); + byte[] kDate = HmacSHA256(dateStamp, kSecret); + byte[] kRegion = HmacSHA256(regionName, kDate); + byte[] kService = HmacSHA256(serviceName, kRegion); + return HmacSHA256("aws4_request", kService); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/IoTMessage.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/IoTMessage.java new file mode 100644 index 000000000000..ef1b15fb66a3 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/IoTMessage.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot.util; + +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class IoTMessage { + private final String topic; + private final byte[] payload; + private final Integer qos; + + public IoTMessage(MqttMessage message, String topic) { + this.topic = topic; + this.payload = message.getPayload(); + this.qos = message.getQos(); + } + + public String getTopic() { + return topic; + } + + public byte[] getPayload() { + return payload; + } + + public Integer getQos() { + return qos; + } +} + diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java new file mode 100644 index 000000000000..1b39c7163f10 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot.util; + +import java.net.URI; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.nifi.logging.ProcessorLog; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.TimerPingSender; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttSecurityException; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.internal.NetworkModule; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +public class MqttWebSocketAsyncClient extends MqttAsyncClient implements MqttCallback { + + protected volatile LinkedBlockingQueue awsQueuedMqttMessages = new LinkedBlockingQueue(); + protected final ProcessorLog logger; + protected final String serverURI; + + protected static String createDummyURI(String original) { + if (!original.startsWith("ws:") && !original.startsWith("wss:")) { + return original; + } + final URI uri = URI.create(original); + return "tcp://DUMMY-" + uri.getHost() + ":" + + (uri.getPort() > 0 ? uri.getPort() : 80); + } + + protected static boolean isDummyURI(String uri) { + return uri.startsWith("tcp://DUMMY-"); + } + + public MqttWebSocketAsyncClient(String serverURI, String clientId, + ProcessorLog logger) throws MqttException { + super(createDummyURI(serverURI), clientId, new MemoryPersistence(), new TimerPingSender()); + this.serverURI = serverURI; + this.logger = logger; + this.setCallback(this); + } + + @Override + protected NetworkModule[] createNetworkModules(String address, + MqttConnectOptions options) throws MqttException{ + String[] serverURIs = options.getServerURIs(); + String[] array = serverURIs == null ? new String[] { address } : + serverURIs.length == 0 ? new String[] { address }: serverURIs; + + NetworkModule[] networkModules = new NetworkModule[array.length]; + for (int i = 0; i < array.length; i++) { + networkModules[i] = createNetworkModule(array[i], options); + } + return networkModules; + } + + protected NetworkModule createNetworkModule(String input, + MqttConnectOptions options) throws MqttException, + MqttSecurityException { + final String address = isDummyURI(input) ? this.serverURI : input; + if (!address.startsWith("ws:") && !address.startsWith("wss:")) { + return super.createNetworkModules(address, options)[0]; + } + + final String subProtocol = (options.getMqttVersion() == MqttConnectOptions.MQTT_VERSION_3_1) ? "mqttv3.1" : "mqtt"; + return newWebSocketNetworkModule(URI.create(address), subProtocol, options); + } + + protected NetworkModule newWebSocketNetworkModule(URI uri, + String subProtocol, MqttConnectOptions options) { + final WebSocketNetworkModule netModule = new WebSocketNetworkModule( + uri, subProtocol, getClientId()); + netModule.setConnectTimeout(options.getConnectionTimeout()); + return netModule; + } + + public LinkedBlockingQueue getAwsQueuedMqttMessages() { + return awsQueuedMqttMessages; + } + + public void setAwsQueuedMqttMessages(LinkedBlockingQueue queue) { + awsQueuedMqttMessages = queue; + } + + @Override + public void connectionLost(Throwable t) { + logger.error("Connection to " + this.getServerURI() + " lost with cause: " + t.getMessage()); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + logger.debug("Message arrived from topic: " + topic); + awsQueuedMqttMessages.add(new IoTMessage(message, topic)); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/WebSocketNetworkModule.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/WebSocketNetworkModule.java new file mode 100644 index 000000000000..3d26f195aeb1 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/WebSocketNetworkModule.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.net.ConnectException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.Future; + +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.internal.NetworkModule; +import org.eclipse.paho.client.mqttv3.logging.Logger; +import org.eclipse.paho.client.mqttv3.logging.LoggerFactory; + +public class WebSocketNetworkModule extends WebSocketAdapter implements NetworkModule { + private static final String CLASS_NAME = WebSocketNetworkModule.class.getName(); + private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME); + private final URI uri; + private final String subProtocol; + private final PipedOutputStream receiverStream = new PipedOutputStream(); + private final PipedInputStream inputStream; + private WebSocketClient client; + private int conTimeout; + + private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream() { + @Override + public void flush() throws IOException { + final ByteBuffer byteBuffer; + synchronized (this) { + byteBuffer = ByteBuffer.wrap(toByteArray()); + reset(); + } + // Asynchronous call + getRemote().sendBytes(byteBuffer); + getRemote().flush(); + } + }; + + public WebSocketNetworkModule(URI uri, String subProtocol, + String resourceContext) { + log.setResourceName(resourceContext); + this.uri = uri; + this.subProtocol = subProtocol; + try { + this.inputStream = new PipedInputStream(receiverStream); + } catch (IOException unexpected) { + throw new IllegalStateException(unexpected); + } + } + + protected ClientUpgradeRequest createClientUpgradeRequest() { + final ClientUpgradeRequest request = new ClientUpgradeRequest(); + // you can manipulate the request by overriding this method. + return request; + } + + protected WebSocketClient createWebSocketClient() { + final WebSocketClient client = new WebSocketClient( + createSslContextFactory()); + // you can manipulate the client by overriding this method. + return client; + } + + protected SslContextFactory createSslContextFactory() { + return new SslContextFactory(); + } + + @Override + public void start() throws IOException, MqttException { + final String methodName = "start"; + try { + // @TRACE 252=connect to host {0} port {1} timeout {2} + if (log.isLoggable(Logger.FINE)) { + log.fine( + CLASS_NAME, + methodName, + "252", + new Object[] { uri.toString(), + Integer.valueOf(uri.getPort()), + Long.valueOf(conTimeout * 1000) }); + } + client = createWebSocketClient(); + client.setConnectTimeout(conTimeout * 1000); + if (client.isStarted() == false) { + client.start(); + } + + final ClientUpgradeRequest request = createClientUpgradeRequest(); + request.setSubProtocols(subProtocol); + final Future future = client.connect(this, uri, request); + // Replays the same behavior as Socket.connect(). + // blocks until the connection is established or some error occurs. + future.get(); + + } catch (ConnectException ex) { + // @TRACE 250=Failed to create TCP socket + log.fine(CLASS_NAME, methodName, "250", null, ex); + throw new MqttException( + MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex); + + } catch (Exception ex) { + // @TRACE 250=Failed to create TCP socket + log.fine(CLASS_NAME, methodName, "250", null, ex); + throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR, + ex); + } + } + + @Override + public InputStream getInputStream() throws IOException { + return inputStream; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return outputStream; + } + + @Override + public void stop() throws IOException { + try { + client.stop(); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + public void setConnectTimeout(int timeout) { + this.conTimeout = timeout; + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) { + try { + this.receiverStream.write(payload, offset, len); + this.receiverStream.flush(); + } catch (IOException e) { + log.fine(CLASS_NAME, "onWebSocketError", "401", null, e); + throw new IllegalStateException(e); + } + } + + @Override + public void onWebSocketError(Throwable cause) { + if (log.isLoggable(Logger.FINE)) { + log.fine(CLASS_NAME, "onWebSocketError", "401", null, cause); + } + } + + @Override + public void onWebSocketConnect(Session sess) { + super.onWebSocketConnect(sess); + if (log.isLoggable(Logger.FINE)) { + log.fine(CLASS_NAME, "onWebSocketConnect", "116", + new Object[] { uri.toString() + ", WebSocket CONNECTED." }); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + super.onWebSocketClose(statusCode, reason); + if (log.isLoggable(Logger.FINE)) { + log.fine(CLASS_NAME, "onWebSocketConnect", "116", + new Object[] { uri.toString() + ", WebSocket CLOSED." }); + } + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 27e39f050570..4456c06b6387 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -27,4 +27,7 @@ org.apache.nifi.processors.aws.dynamodb.PutDynamoDB org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric - +org.apache.nifi.processors.aws.iot.ConsumeAWSIoTMqtt +org.apache.nifi.processors.aws.iot.PublishAWSIoTMqtt +org.apache.nifi.processors.aws.iot.GetAWSIoTShadow +org.apache.nifi.processors.aws.iot.PutAWSIoTShadow diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITGetIOTMqtt.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITGetIOTMqtt.java new file mode 100644 index 000000000000..ce810e30d915 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITGetIOTMqtt.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import com.amazonaws.regions.Regions; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; + +public class ITGetIOTMqtt { + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Ignore + @Test + public void testSimpleGetUsingCredentialsProviderService() throws Throwable { + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + final TestRunner runner = TestRunners.newTestRunner(new ConsumeAWSIoTMqtt()); + final String clientId = ConsumeAWSIoTMqtt.class.getSimpleName(); + final String endpoint = "A1B71MLXKNXXXX"; + final String topic = "$aws/things/nifiConsumer/shadow/update"; + final String qos = "0"; + final Regions region = Regions.US_EAST_1; + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(ConsumeAWSIoTMqtt.PROP_CLIENT, clientId); + runner.setProperty(ConsumeAWSIoTMqtt.PROP_ENDPOINT, endpoint); + runner.setProperty(ConsumeAWSIoTMqtt.PROP_TOPIC, topic); + runner.setProperty(ConsumeAWSIoTMqtt.PROP_QOS, qos); + runner.setProperty(ConsumeAWSIoTMqtt.REGION, region.getName()); + runner.setProperty(ConsumeAWSIoTMqtt.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + + runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + + // ensure that the Controller Service is configured accordingly + runner.assertValid(serviceImpl); + + // If the Controller Service is not valid, this method will throw an IllegalStateException. Otherwise, the service is now ready to use. + runner.enableControllerService(serviceImpl); + + // trigger once + runner.run(1); + + final List flowFiles = runner.getFlowFilesForRelationship(ConsumeAWSIoTMqtt.REL_SUCCESS); + for (final MockFlowFile mff : flowFiles) { + mff.assertAttributeEquals(ConsumeAWSIoTMqtt.PROP_NAME_ENDPOINT, endpoint); + mff.assertAttributeEquals(ConsumeAWSIoTMqtt.PROP_NAME_CLIENT, clientId); + mff.assertAttributeEquals(ConsumeAWSIoTMqtt.PROP_NAME_TOPIC, topic); + mff.assertAttributeEquals(ConsumeAWSIoTMqtt.PROP_NAME_QOS, qos); + } + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITGetIOTShadow.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITGetIOTShadow.java new file mode 100644 index 000000000000..6715e9babcb8 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITGetIOTShadow.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; + +public class ITGetIOTShadow { + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Ignore + @Test + public void testSimpleGetUsingCredentialsProviderService() throws Throwable { + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + final TestRunner runner = TestRunners.newTestRunner(new GetAWSIoTShadow()); + final String thing = "thingName"; + final Region region = Regions.getCurrentRegion(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(GetAWSIoTShadow.PROP_THING, thing); + runner.setProperty(GetAWSIoTShadow.REGION, region.getName()); + runner.setProperty(GetAWSIoTShadow.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + + runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + + // ensure that the Controller Service is configured accordingly + runner.assertValid(serviceImpl); + + // If the Controller Service is not valid, this method will throw an IllegalStateException. Otherwise, the service is now ready to use. + runner.enableControllerService(serviceImpl); + + runner.run(1); + + // validate that the FlowFiles went where they were expected to go + runner.assertAllFlowFilesTransferred(GetAWSIoTShadow.REL_SUCCESS, 1); + + final List flowFiles = runner.getFlowFilesForRelationship(GetAWSIoTShadow.REL_SUCCESS); + for (final MockFlowFile mff : flowFiles) { + mff.assertAttributeEquals(GetAWSIoTShadow.PROP_NAME_THING, thing); + } + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITPutIOTMqtt.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITPutIOTMqtt.java new file mode 100644 index 000000000000..0bd956e39fa1 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITPutIOTMqtt.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; + +public class ITPutIOTMqtt { + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Ignore + @Test + public void testSimplePutUsingCredentialsProviderService() throws Throwable { + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + final TestRunner runner = TestRunners.newTestRunner(new PublishAWSIoTMqtt()); + final String clientId = PublishAWSIoTMqtt.class.getSimpleName(); + final String endpoint = "A1B71MLXKNXXXX"; + final String topic = "$aws/things/nifiConsumer/shadow/update"; + final String qos = "0"; + final byte[] message = "{\"state\":{\"desired\":{\"key\":\"value\"}}}".getBytes(); + final Region region = Regions.getCurrentRegion(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(PublishAWSIoTMqtt.PROP_CLIENT, clientId); + runner.setProperty(PublishAWSIoTMqtt.PROP_ENDPOINT, endpoint); + runner.setProperty(PublishAWSIoTMqtt.PROP_TOPIC, topic); + runner.setProperty(PublishAWSIoTMqtt.PROP_QOS, qos); + runner.setProperty(PublishAWSIoTMqtt.REGION, region.getName()); + runner.setProperty(PublishAWSIoTMqtt.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + + runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + + // ensure that the Controller Service is configured accordingly + runner.assertValid(serviceImpl); + + // If the Controller Service is not valid, this method will throw an IllegalStateException. Otherwise, the service is now ready to use. + runner.enableControllerService(serviceImpl); + + runner.enqueue(message); + + runner.run(1); + + // validate that the FlowFiles went where they were expected to go + runner.assertAllFlowFilesTransferred(PublishAWSIoTMqtt.REL_SUCCESS, 1); + + final List flowFiles = runner.getFlowFilesForRelationship(PublishAWSIoTMqtt.REL_SUCCESS); + for (final MockFlowFile mff : flowFiles) { + mff.assertContentEquals(message); + } + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITPutIOTShadow.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITPutIOTShadow.java new file mode 100644 index 000000000000..5a37fab8f880 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/iot/ITPutIOTShadow.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; + +public class ITPutIOTShadow { + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Ignore + @Test + public void testSimplePutUsingCredentialsProviderService() throws Throwable { + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + final TestRunner runner = TestRunners.newTestRunner(new PutAWSIoTShadow()); + final String thing = "thingName"; + final byte[] message = "{\"state\":{\"desired\":{\"key\":\"value\"}}}".getBytes(); + final Region region = Regions.getCurrentRegion(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(PutAWSIoTShadow.PROP_THING, thing); + runner.setProperty(PutAWSIoTShadow.REGION, region.getName()); + runner.setProperty(PutAWSIoTShadow.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + + runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + + // ensure that the Controller Service is configured accordingly + runner.assertValid(serviceImpl); + + // If the Controller Service is not valid, this method will throw an IllegalStateException. Otherwise, the service is now ready to use. + runner.enableControllerService(serviceImpl); + + runner.enqueue(message); + + runner.run(1); + + // validate that the FlowFiles went where they were expected to go + runner.assertAllFlowFilesTransferred(PutAWSIoTShadow.REL_SUCCESS, 1); + + final List flowFiles = runner.getFlowFilesForRelationship(PutAWSIoTShadow.REL_SUCCESS); + for (final MockFlowFile mff : flowFiles) { + mff.assertContentEquals(message); + } + } +} diff --git a/pom.xml b/pom.xml index a07e7ff0c5e1..4a7fa869b6ab 100644 --- a/pom.xml +++ b/pom.xml @@ -1465,6 +1465,21 @@ language governing permissions and limitations under the License. --> org.codehaus.groovy groovy-all + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.0.2 + + + org.eclipse.jetty.websocket + websocket-client + 9.2.15.v20160210 + + + javax.websocket + javax.websocket-api + 1.1 + From 0c5b029551203afdc870f09f9235c27faa474049 Mon Sep 17 00:00:00 2001 From: Andre F de Miranda Date: Sun, 19 Feb 2017 23:53:04 +1100 Subject: [PATCH 2/2] NIFI-1767 - Addresses peer review feedback --- .../aws/iot/util/MqttWebSocketAsyncClient.java | 6 +++--- pom.xml | 15 --------------- 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java index 1b39c7163f10..4c818f412fcd 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/util/MqttWebSocketAsyncClient.java @@ -19,7 +19,7 @@ import java.net.URI; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.logging.ComponentLog; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; @@ -34,7 +34,7 @@ public class MqttWebSocketAsyncClient extends MqttAsyncClient implements MqttCallback { protected volatile LinkedBlockingQueue awsQueuedMqttMessages = new LinkedBlockingQueue(); - protected final ProcessorLog logger; + protected final ComponentLog logger; protected final String serverURI; protected static String createDummyURI(String original) { @@ -51,7 +51,7 @@ protected static boolean isDummyURI(String uri) { } public MqttWebSocketAsyncClient(String serverURI, String clientId, - ProcessorLog logger) throws MqttException { + ComponentLog logger) throws MqttException { super(createDummyURI(serverURI), clientId, new MemoryPersistence(), new TimerPingSender()); this.serverURI = serverURI; this.logger = logger; diff --git a/pom.xml b/pom.xml index 4a7fa869b6ab..a07e7ff0c5e1 100644 --- a/pom.xml +++ b/pom.xml @@ -1465,21 +1465,6 @@ language governing permissions and limitations under the License. --> org.codehaus.groovy groovy-all - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.0.2 - - - org.eclipse.jetty.websocket - websocket-client - 9.2.15.v20160210 - - - javax.websocket - javax.websocket-api - 1.1 -