From a344a29b9ce4bd71259452e7a9cb5b3284335ae2 Mon Sep 17 00:00:00 2001 From: Karl-Philipp Richter Date: Mon, 19 Mar 2018 02:10:34 +0100 Subject: [PATCH] [STORM-3007] storm-mqtt-examples: fixed all checkstyle warnings --- examples/storm-mqtt-examples/pom.xml | 2 +- .../mqtt/examples/CustomMessageMapper.java | 50 +++++++++++--- .../mqtt/examples/MqttBrokerPublisher.java | 65 ++++++++++++++----- .../storm/mqtt/examples/package-info.java | 20 ++++++ 4 files changed, 111 insertions(+), 26 deletions(-) create mode 100644 examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java diff --git a/examples/storm-mqtt-examples/pom.xml b/examples/storm-mqtt-examples/pom.xml index ca7c41882ac..e078637358d 100644 --- a/examples/storm-mqtt-examples/pom.xml +++ b/examples/storm-mqtt-examples/pom.xml @@ -128,7 +128,7 @@ maven-checkstyle-plugin - 16 + 0 diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java index ec5645c7bda..fa83bf9a97a 100644 --- a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java +++ b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java @@ -15,35 +15,65 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.mqtt.examples; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; import org.apache.storm.mqtt.MqttMessage; import org.apache.storm.mqtt.MqttMessageMapper; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Given a topic name: "users/{user}/{location}/{deviceId}" * and a payload of "{temperature}/{humidity}" - * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float) - * + * emits a tuple containing + * {@code user(String), deviceId(String), location(String), temperature(float), + * humidity(float)}. */ -public class CustomMessageMapper implements MqttMessageMapper { - private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class); - +public final class CustomMessageMapper implements MqttMessageMapper { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger( + CustomMessageMapper.class); + private static final int TOPIC_INDEX_1 = 2; + private static final int TOPIC_INDEX_2 = 4; + private static final int TOPIC_INDEX_3 = 3; - public Values toValues(MqttMessage message) { + /** + * Converts MQTT message to an instance of {@code Values}. + * @param message the message to convert + * @return the converted values + */ + @Override + public Values toValues(final MqttMessage message) { String topic = message.getTopic(); String[] topicElements = topic.split("/"); String[] payloadElements = new String(message.getMessage()).split("/"); - return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]), + return new Values(topicElements[TOPIC_INDEX_1], + topicElements[TOPIC_INDEX_2], + topicElements[TOPIC_INDEX_3], + Float.parseFloat(payloadElements[0]), Float.parseFloat(payloadElements[1])); } + /** + * Gets the output fields. + * @return the output fields + */ + @Override public Fields outputFields() { - return new Fields("user", "deviceId", "location", "temperature", "humidity"); + return new Fields("user", + "deviceId", + "location", + "temperature", + "humidity"); + } + + /** + * Utility constructor. + */ + private CustomMessageMapper() { } } diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java index fa8389df83b..4232eb007fb 100644 --- a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java +++ b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java @@ -15,27 +15,39 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.mqtt.examples; +import java.util.Random; import org.apache.activemq.broker.BrokerService; import org.apache.storm.mqtt.MqttLogger; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Random; - -public class MqttBrokerPublisher { - private static final Logger LOG = LoggerFactory.getLogger(MqttBrokerPublisher.class); - +/** + * A MQTT example using a Storm topology. + */ +public final class MqttBrokerPublisher { + private static final Logger LOG = LoggerFactory.getLogger( + MqttBrokerPublisher.class); private static BrokerService broker; - private static BlockingConnection connection; + private static final int TEMPERATURE_MAX = 100; + private static final int HUMIDITY_MAX = 100; + /** + * The default wait in milliseconds. + */ + private static final int WAIT_MILLIS_DEFAULT = 500; - + /** + * Initializes {@code broker} and starts it. + * @throws Exception if an exception during adding a connector occurs + */ public static void startBroker() throws Exception { LOG.info("Starting broker..."); broker = new BrokerService(); @@ -43,7 +55,7 @@ public static void startBroker() throws Exception { broker.setDataDirectory("target"); broker.start(); LOG.info("MQTT broker started"); - Runtime.getRuntime().addShutdownHook(new Thread(){ + Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { @@ -56,6 +68,10 @@ public void run() { }); } + /** + * Initializes {@code connection}. + * @throws Exception if an exception during connecting to connector occurs + */ public static void startPublisher() throws Exception { MQTT client = new MQTT(); client.setTracer(new MqttLogger()); @@ -63,7 +79,7 @@ public static void startPublisher() throws Exception { client.setClientId("MqttBrokerPublisher"); connection = client.blockingConnection(); - Runtime.getRuntime().addShutdownHook(new Thread(){ + Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { @@ -78,25 +94,44 @@ public void run() { connection.connect(); } + /** + * Publishes topics on connection. + * @throws Exception if an exception during publishing occurs + */ public static void publish() throws Exception { String topic = "/users/tgoetz/office/1234"; Random rand = new Random(); LOG.info("Publishing to topic {}", topic); LOG.info("Cntrl+C to exit."); - while(true) { - int temp = rand.nextInt(100); - int hum = rand.nextInt(100); + while (true) { + int temp = rand.nextInt(TEMPERATURE_MAX); + int hum = rand.nextInt(HUMIDITY_MAX); String payload = temp + "/" + hum; - connection.publish(topic, payload.getBytes(), QoS.AT_LEAST_ONCE, false); - Thread.sleep(500); + connection.publish(topic, + payload.getBytes(), + QoS.AT_LEAST_ONCE, + false); + Thread.sleep(WAIT_MILLIS_DEFAULT); } } - public static void main(String[] args) throws Exception{ + /** + * The main method. + * @param args the command line arguments + * @throws Exception if an exception during connections or transmission + * occurs + */ + public static void main(final String[] args) throws Exception { startBroker(); startPublisher(); publish(); } + + /** + * Utility constructor to prevent initialization. + */ + private MqttBrokerPublisher() { + } } diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java new file mode 100644 index 00000000000..52c9270b0bf --- /dev/null +++ b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * MQTT examples. + */ +package org.apache.storm.mqtt.examples;