From 5857989426db9cc51e34bf09942101750fff60ea Mon Sep 17 00:00:00 2001 From: prabs Date: Fri, 23 Jan 2015 15:21:00 +0530 Subject: [PATCH 01/10] modified to adhere to accepted coding standards --- .../examples/streaming/MQTTWordCount.scala | 38 ++++++++++--------- .../streaming/mqtt/MQTTInputDStream.scala | 32 +++++++--------- .../spark/streaming/mqtt/MQTTUtils.scala | 5 ++- .../streaming/mqtt/MQTTStreamSuite.scala | 12 +++--- 4 files changed, 42 insertions(+), 45 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index 6ff0c47793a25..77fbf5c8d5e9b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -17,7 +17,7 @@ package org.apache.spark.examples.streaming -import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic} +import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence import org.apache.spark.storage.StorageLevel @@ -31,8 +31,6 @@ import org.apache.spark.SparkConf */ object MQTTPublisher { - var client: MqttClient = _ - def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: MQTTPublisher ") @@ -42,25 +40,29 @@ object MQTTPublisher { StreamingExamples.setStreamingLogLevels() val Seq(brokerUrl, topic) = args.toSeq + + var client: MqttClient = null try { - var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp") - client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) - } catch { - case e: MqttException => println("Exception Caught: " + e) - } + val persistence = new MqttDefaultFilePersistence("/tmp") + client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) - client.connect() + client.connect() - val msgtopic: MqttTopic = client.getTopic(topic) - val msg: String = "hello mqtt demo for spark streaming" + val msgtopic = client.getTopic(topic) + val msgContent = "hello mqtt demo for spark streaming" + val message = new MqttMessage(msgContent.getBytes("utf-8")) - while (true) { - val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8")) - msgtopic.publish(message) - println("Published data. topic: " + msgtopic.getName() + " Message: " + message) + while (true) { + msgtopic.publish(message) + println("Published data. topic: " + msgtopic.getName() + " Message: " + message) + } + + } catch { + case e: MqttException => println("Exception Caught: " + e) + } finally { + client.disconnect() } - client.disconnect() } } @@ -96,9 +98,9 @@ object MQTTWordCount { val sparkConf = new SparkConf().setAppName("MQTTWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) - - val words = lines.flatMap(x => x.toString.split(" ")) + val words = lines.flatMap(x => x.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() ssc.start() ssc.awaitTermination() diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 77661f71ada21..b1a8eda10ec4b 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -17,28 +17,22 @@ package org.apache.spark.streaming.mqtt +import java.io.IOException +import java.util.concurrent.Executors +import java.util.Properties + +import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import java.util.Properties -import java.util.concurrent.Executors -import java.io.IOException - -import org.eclipse.paho.client.mqttv3.MqttCallback -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken -import org.eclipse.paho.client.mqttv3.MqttException -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.MqttTopic import org.apache.spark.Logging +import org.apache.spark.streaming.dstream._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver /** @@ -88,18 +82,18 @@ class MQTTReceiver( client.subscribe(topic) // Callback automatically triggers as and when new message arrives on specified topic - val callback: MqttCallback = new MqttCallback() { + val callback = new MqttCallback() { // Handles Mqtt message - override def messageArrived(arg0: String, arg1: MqttMessage) { - store(new String(arg1.getPayload(),"utf-8")) + override def messageArrived(topic: String, message: MqttMessage) { + store(new String(message.getPayload(),"utf-8")) } - override def deliveryComplete(arg0: IMqttDeliveryToken) { + override def deliveryComplete(token: IMqttDeliveryToken) { } - override def connectionLost(arg0: Throwable) { - restart("Connection lost ", arg0) + override def connectionLost(cause: Throwable) { + restart("Connection lost ", cause) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index c5ffe51f9986c..7fb04ef6bb39b 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -17,11 +17,12 @@ package org.apache.spark.streaming.mqtt +import scala.reflect.ClassTag + import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} -import scala.reflect.ClassTag import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} +import org.apache.spark.streaming.StreamingContext object MQTTUtils { /** diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index fe53a29cba0c9..40ec4b6aa6d1d 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -38,8 +38,8 @@ import org.apache.spark.util.Utils class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { private val batchDuration = Milliseconds(500) - private val master: String = "local[2]" - private val framework: String = this.getClass.getSimpleName + private val master = "local[2]" + private val framework = this.getClass.getSimpleName private val freePort = findFreePort() private val brokerUri = "//localhost:" + freePort private val topic = "def" @@ -65,7 +65,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { test("mqtt input stream") { val sendMessage = "MQTT demo for spark streaming" - val receiveStream: ReceiverInputDStream[String] = + val receiveStream = MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY) var receiveMessage: List[String] = List() receiveStream.foreachRDD { rdd => @@ -113,12 +113,12 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { def publishData(data: String): Unit = { var client: MqttClient = null try { - val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence) client.connect() if (client.isConnected) { - val msgTopic: MqttTopic = client.getTopic(topic) - val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) + val msgTopic = client.getTopic(topic) + val message = new MqttMessage(data.getBytes("utf-8")) message.setQos(1) message.setRetained(true) for (i <- 0 to 100) { From 66919a34ab1838f0f0dbc2ee76903532fa5117b8 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Fri, 23 Jan 2015 22:55:42 +0530 Subject: [PATCH 02/10] changed MqttDefaultFilePersistence to MemoryPersistence the hard coded '\tmp' may create garbage. --- .../apache/spark/examples/streaming/MQTTWordCount.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index 77fbf5c8d5e9b..cc7f8dc28b583 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -18,7 +18,7 @@ package org.apache.spark.examples.streaming import org.eclipse.paho.client.mqttv3._ -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} @@ -44,7 +44,7 @@ object MQTTPublisher { var client: MqttClient = null try { - val persistence = new MqttDefaultFilePersistence("/tmp") + val persistence = new MemoryPersistence() client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) client.connect() @@ -54,8 +54,9 @@ object MQTTPublisher { val message = new MqttMessage(msgContent.getBytes("utf-8")) while (true) { + Thread.sleep(100) msgtopic.publish(message) - println("Published data. topic: " + msgtopic.getName() + " Message: " + message) + println("Published data. topic: %s; Message: %s".format(msgtopic.getName(), message)) } } catch { From cd57029ff66c5a741581facd4e08db8c9744edb4 Mon Sep 17 00:00:00 2001 From: prabs Date: Tue, 3 Feb 2015 23:08:35 +0530 Subject: [PATCH 03/10] address the comments --- .../spark/examples/streaming/MQTTWordCount.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index cc7f8dc28b583..e56c9327e4fe4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -54,11 +54,14 @@ object MQTTPublisher { val message = new MqttMessage(msgContent.getBytes("utf-8")) while (true) { - Thread.sleep(100) - msgtopic.publish(message) - println("Published data. topic: %s; Message: %s".format(msgtopic.getName(), message)) - } - + try { + msgtopic.publish(message) + println("Published data. topic: %s; Message: %s".format(msgtopic.getName(), message)) + } catch { + case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => + Thread.sleep(10) // wait for Spark streaming to consume something from the message queue + } + } } catch { case e: MqttException => println("Exception Caught: " + e) } finally { From 838c38e9d4ab4733395aecd460221dcac5d5b2bd Mon Sep 17 00:00:00 2001 From: prabs Date: Tue, 10 Feb 2015 00:03:38 +0530 Subject: [PATCH 04/10] adress the comment --- .../apache/spark/streaming/mqtt/MQTTInputDStream.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index b1a8eda10ec4b..8c15219c54e5b 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -26,7 +26,13 @@ import scala.collection.Map import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttCallback +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import org.apache.spark.Logging From 0cc67bd106c5cf1c3cf8ce1270efa8ad1e658e5d Mon Sep 17 00:00:00 2001 From: prabs Date: Tue, 10 Feb 2015 00:05:09 +0530 Subject: [PATCH 05/10] adress the comment --- .../org/apache/spark/streaming/mqtt/MQTTInputDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 8c15219c54e5b..ec6fe9a180b5f 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -36,9 +36,9 @@ import org.eclipse.paho.client.mqttv3.MqttTopic import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import org.apache.spark.Logging -import org.apache.spark.streaming.dstream._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver /** From 22dd7f7f34edf3a6cd5585a6b0f54a24718271e4 Mon Sep 17 00:00:00 2001 From: prabs Date: Tue, 10 Feb 2015 00:36:56 +0530 Subject: [PATCH 06/10] address the comments --- .../org/apache/spark/examples/streaming/MQTTWordCount.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index e56c9327e4fe4..de8f999aeab55 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -56,7 +56,7 @@ object MQTTPublisher { while (true) { try { msgtopic.publish(message) - println("Published data. topic: %s; Message: %s".format(msgtopic.getName(), message)) + println(s"Published data. topic: {msgtopic.getName()}; Message: {message}") } catch { case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => Thread.sleep(10) // wait for Spark streaming to consume something from the message queue From c035bdccd93db5f1b32d955a72acf5dd0ebadcc1 Mon Sep 17 00:00:00 2001 From: prabs Date: Tue, 10 Feb 2015 22:57:57 +0530 Subject: [PATCH 07/10] adress the comment --- .../org/apache/spark/examples/streaming/MQTTWordCount.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index de8f999aeab55..4e7ee0a749035 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -65,7 +65,9 @@ object MQTTPublisher { } catch { case e: MqttException => println("Exception Caught: " + e) } finally { - client.disconnect() + if (client != null) { + client.disconnect() + } } } } From 46f9619a0b850a0abe334883e2f3a7d6d246dc63 Mon Sep 17 00:00:00 2001 From: prabs Date: Tue, 10 Feb 2015 22:58:24 +0530 Subject: [PATCH 08/10] adress the comment --- .../org/apache/spark/examples/streaming/MQTTWordCount.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index 4e7ee0a749035..ca6b87b26193a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -59,7 +59,7 @@ object MQTTPublisher { println(s"Published data. topic: {msgtopic.getName()}; Message: {message}") } catch { case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => - Thread.sleep(10) // wait for Spark streaming to consume something from the message queue + Thread.sleep(10) // wait for Spark streaming to consume something from the message queue } } } catch { From ccc0765a1e80d074ef9a5622b13e342fa2c84a21 Mon Sep 17 00:00:00 2001 From: prabs Date: Tue, 10 Feb 2015 23:25:45 +0530 Subject: [PATCH 09/10] adress the comment --- .../main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 7fb04ef6bb39b..1142d0f56ba34 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -20,9 +20,9 @@ package org.apache.spark.streaming.mqtt import scala.reflect.ClassTag import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} -import org.apache.spark.streaming.StreamingContext object MQTTUtils { /** From bd2cb49f84f0dd849439aeb8919025610c0c6403 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Wed, 25 Feb 2015 00:37:50 +0530 Subject: [PATCH 10/10] adress the comment --- .../org/apache/spark/examples/streaming/MQTTWordCount.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index ca6b87b26193a..f40caad322f59 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -59,7 +59,8 @@ object MQTTPublisher { println(s"Published data. topic: {msgtopic.getName()}; Message: {message}") } catch { case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => - Thread.sleep(10) // wait for Spark streaming to consume something from the message queue + Thread.sleep(10) + println("Queue is full, wait for to consume data from the message queue") } } } catch {