From 86164950acfc794c6c9b1db3663716ac4626c55b Mon Sep 17 00:00:00 2001 From: bilna Date: Tue, 30 Dec 2014 18:36:09 +0530 Subject: [PATCH 01/11] [SPARK-4631] unit test for MQTT --- .../streaming/mqtt/MQTTStreamSuite.scala | 59 +++++++++++++++---- 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 84595acf45ccb..b27e9b5c31db1 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -17,31 +17,70 @@ package org.apache.spark.streaming.mqtt +import org.apache.spark.Logging import org.scalatest.FunSuite - +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { +abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging { val batchDuration = Seconds(1) + val master: String = "local[2]" + val framework: String = this.getClass.getSimpleName + val brokerUrl = "tcp://localhost:1883" + val topic = "def" + + def publishData(sendMessage: String): Unit = { + + try { + + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence("/tmp") + val client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) + client.connect() + val msgTopic: MqttTopic = client.getTopic(topic) + + val message: MqttMessage = new MqttMessage(String.valueOf(sendMessage).getBytes("utf-8")) + message.setQos(1) + message.setRetained(true) + msgTopic.publish(message) + println("Published data \ntopic: " + msgTopic.getName() + "\nMessage: " + message) + + client.disconnect() - private val master: String = "local[2]" + } catch { + case e: MqttException => println("Exception Caught: " + e) + } + } +} - private val framework: String = this.getClass.getSimpleName +class MQTTStreamSuite extends MQTTStreamSuiteBase { test("mqtt input stream") { val ssc = new StreamingContext(master, framework, batchDuration) - val brokerUrl = "abc" - val topic = "def" + val sendMessage = "MQTT demo for spark streaming" + + publishData(sendMessage) // tests the API, does not actually test data receiving - val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) - val test2: ReceiverInputDStream[String] = + + val receiveStream: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) - // TODO: Actually test receiving data + var result: String = "" + receiveStream.foreachRDD { rdd => + result = rdd.first + result + } + + ssc.start() + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(result)) + } ssc.stop() } -} +} \ No newline at end of file From 5ca66918455f04d748d41b98b9967218a1ab4d00 Mon Sep 17 00:00:00 2001 From: Bilna P Date: Tue, 30 Dec 2014 18:54:33 +0530 Subject: [PATCH 02/11] Update MQTTStreamSuite.scala --- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index b27e9b5c31db1..8a17ae591c080 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -34,24 +34,18 @@ abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging val framework: String = this.getClass.getSimpleName val brokerUrl = "tcp://localhost:1883" val topic = "def" - def publishData(sendMessage: String): Unit = { - try { - val persistence: MqttClientPersistence = new MqttDefaultFilePersistence("/tmp") val client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) client.connect() val msgTopic: MqttTopic = client.getTopic(topic) - val message: MqttMessage = new MqttMessage(String.valueOf(sendMessage).getBytes("utf-8")) message.setQos(1) message.setRetained(true) msgTopic.publish(message) println("Published data \ntopic: " + msgTopic.getName() + "\nMessage: " + message) - client.disconnect() - } catch { case e: MqttException => println("Exception Caught: " + e) } @@ -59,28 +53,21 @@ abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging } class MQTTStreamSuite extends MQTTStreamSuiteBase { - test("mqtt input stream") { val ssc = new StreamingContext(master, framework, batchDuration) val sendMessage = "MQTT demo for spark streaming" - publishData(sendMessage) - - // tests the API, does not actually test data receiving - val receiveStream: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) - var result: String = "" receiveStream.foreachRDD { rdd => result = rdd.first result } - ssc.start() eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { assert(sendMessage.equals(result)) } ssc.stop() } -} \ No newline at end of file +} From e8b6623e5bd31fcb583fdeae5f1c954be672403d Mon Sep 17 00:00:00 2001 From: Bilna P Date: Tue, 30 Dec 2014 20:12:19 +0530 Subject: [PATCH 03/11] Update MQTTStreamSuite.scala --- .../scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 8a17ae591c080..001fe84aec4a9 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -34,6 +34,7 @@ abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging val framework: String = this.getClass.getSimpleName val brokerUrl = "tcp://localhost:1883" val topic = "def" + def publishData(sendMessage: String): Unit = { try { val persistence: MqttClientPersistence = new MqttDefaultFilePersistence("/tmp") From 5f6bfd2f4b11c08e76d02ccf5a5594151ccd9af5 Mon Sep 17 00:00:00 2001 From: bilna Date: Wed, 31 Dec 2014 12:07:24 +0530 Subject: [PATCH 04/11] Added BeforeAndAfter --- .../streaming/mqtt/MQTTStreamSuite.scala | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 001fe84aec4a9..38bd19cabdf36 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.streaming.mqtt -import org.apache.spark.Logging -import org.scalatest.FunSuite +import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Eventually import scala.concurrent.duration._ import org.apache.spark.streaming.{Seconds, StreamingContext} @@ -27,14 +26,40 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging { - +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { val batchDuration = Seconds(1) val master: String = "local[2]" val framework: String = this.getClass.getSimpleName val brokerUrl = "tcp://localhost:1883" val topic = "def" - + + var ssc: StreamingContext = _ + + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + } + + test("mqtt input stream") { + ssc = new StreamingContext(master, framework, batchDuration) + val sendMessage = "MQTT demo for spark streaming" + publishData(sendMessage) + val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + var receiveMessage: String = "" + receiveStream.foreachRDD { rdd => + receiveMessage = rdd.first + receiveMessage + } + ssc.start() + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage)) + } + ssc.stop() + } + def publishData(sendMessage: String): Unit = { try { val persistence: MqttClientPersistence = new MqttDefaultFilePersistence("/tmp") @@ -52,23 +77,3 @@ abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging } } } - -class MQTTStreamSuite extends MQTTStreamSuiteBase { - test("mqtt input stream") { - val ssc = new StreamingContext(master, framework, batchDuration) - val sendMessage = "MQTT demo for spark streaming" - publishData(sendMessage) - val receiveStream: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) - var result: String = "" - receiveStream.foreachRDD { rdd => - result = rdd.first - result - } - ssc.start() - eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - assert(sendMessage.equals(result)) - } - ssc.stop() - } -} From b1ac4ad62ff6d537f669699d5da49bc4ee1ab154 Mon Sep 17 00:00:00 2001 From: bilna Date: Wed, 31 Dec 2014 12:24:41 +0530 Subject: [PATCH 05/11] Added BeforeAndAfter --- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 38bd19cabdf36..5808d87587cfa 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -27,13 +27,17 @@ import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { - val batchDuration = Seconds(1) - val master: String = "local[2]" - val framework: String = this.getClass.getSimpleName - val brokerUrl = "tcp://localhost:1883" - val topic = "def" - var ssc: StreamingContext = _ + private val batchDuration = Seconds(1) + private val master: String = "local[2]" + private val framework: String = this.getClass.getSimpleName + private val brokerUrl = "tcp://localhost:1883" + private val topic = "def" + private var ssc: StreamingContext = _ + + before { + ssc = new StreamingContext(master, framework, batchDuration) + } after { if (ssc != null) { @@ -43,7 +47,6 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { } test("mqtt input stream") { - ssc = new StreamingContext(master, framework, batchDuration) val sendMessage = "MQTT demo for spark streaming" publishData(sendMessage) val receiveStream: ReceiverInputDStream[String] = From 4b580943de5137e947d1a6cdadd054020932ed8e Mon Sep 17 00:00:00 2001 From: Bilna P Date: Wed, 31 Dec 2014 12:33:44 +0530 Subject: [PATCH 06/11] Update MQTTStreamSuite.scala --- .../scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 5808d87587cfa..03aafe36678f3 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -38,7 +38,6 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { before { ssc = new StreamingContext(master, framework, batchDuration) } - after { if (ssc != null) { ssc.stop() From 04503cfa7f8168038c17198b6e45b16b89591e74 Mon Sep 17 00:00:00 2001 From: bilna Date: Fri, 2 Jan 2015 17:10:42 +0530 Subject: [PATCH 07/11] Added embedded broker service for mqtt test --- external/mqtt/pom.xml | 124 +++++++++--------- .../streaming/mqtt/MQTTStreamSuite.scala | 87 +++++++++--- 2 files changed, 133 insertions(+), 78 deletions(-) diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 9025915f4447e..a38a7c01c2090 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -16,65 +16,71 @@ ~ limitations under the License. --> - - 4.0.0 - - org.apache.spark - spark-parent - 1.3.0-SNAPSHOT - ../../pom.xml - + + 4.0.0 + + org.apache.spark + spark-parent + 1.3.0-SNAPSHOT + ../../pom.xml + - org.apache.spark - spark-streaming-mqtt_2.10 - - streaming-mqtt - - jar - Spark Project External MQTT - http://spark.apache.org/ + org.apache.spark + spark-streaming-mqtt_2.10 + + streaming-mqtt + + jar + Spark Project External MQTT + http://spark.apache.org/ - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${project.version} - provided - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.0.1 - - - org.scalatest - scalatest_${scala.binary.version} - test - - - org.scalacheck - scalacheck_${scala.binary.version} - test - - - junit - junit - test - - - com.novocode - junit-interface - test - - - - target/scala-${scala.binary.version}/classes - target/scala-${scala.binary.version}/test-classes - - - org.scalatest - scalatest-maven-plugin - - - + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.0.1 + + + org.apache.activemq + activemq-core + 5.7.0 + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + junit + junit + test + + + com.novocode + junit-interface + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 03aafe36678f3..70bff113f0fbd 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -17,10 +17,14 @@ package org.apache.spark.streaming.mqtt +import java.net.{URI, ServerSocket} + +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Eventually import scala.concurrent.duration._ -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.eclipse.paho.client.mqttv3._ @@ -28,54 +32,99 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { - private val batchDuration = Seconds(1) + private val batchDuration = Milliseconds(500) private val master: String = "local[2]" private val framework: String = this.getClass.getSimpleName - private val brokerUrl = "tcp://localhost:1883" + private val freePort = findFreePort() + private val brokerUri = "//localhost:" + freePort private val topic = "def" private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ before { ssc = new StreamingContext(master, framework, batchDuration) + startUp() } + after { if (ssc != null) { ssc.stop() ssc = null } + Utils.deleteRecursively(persistenceDir) + tearDownMQTT } test("mqtt input stream") { val sendMessage = "MQTT demo for spark streaming" - publishData(sendMessage) val receiveStream: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) - var receiveMessage: String = "" + MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY) + var receiveMessage: List[String] = List() receiveStream.foreachRDD { rdd => - receiveMessage = rdd.first - receiveMessage + if (rdd.collect.length > 0) { + receiveMessage = receiveMessage ::: List(rdd.first) + receiveMessage + } } ssc.start() + publishData(sendMessage) eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - assert(sendMessage.equals(receiveMessage)) + assert(sendMessage.equals(receiveMessage(0))) } ssc.stop() } - def publishData(sendMessage: String): Unit = { + private def startUp() { + broker = new BrokerService() + connector = new TransportConnector() + connector.setName("mqtt") + connector.setUri(new URI("mqtt:" + brokerUri)) + broker.addConnector(connector) + broker.start() + } + + private def tearDownMQTT() { + if (broker != null) { + broker.stop() + broker = null + } + if (connector != null) { + connector.stop() + connector = null + } + } + + private def findFreePort(): Int = { + Utils.startServiceOnPort(23456, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) + })._2 + } + + def publishData(data: String): Unit = { + var client: MqttClient = null try { - val persistence: MqttClientPersistence = new MqttDefaultFilePersistence("/tmp") - val client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence) client.connect() - val msgTopic: MqttTopic = client.getTopic(topic) - val message: MqttMessage = new MqttMessage(String.valueOf(sendMessage).getBytes("utf-8")) - message.setQos(1) - message.setRetained(true) - msgTopic.publish(message) - println("Published data \ntopic: " + msgTopic.getName() + "\nMessage: " + message) - client.disconnect() + if (client.isConnected) { + val msgTopic: MqttTopic = client.getTopic(topic) + val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) + message.setQos(1) + message.setRetained(true) + for (i <- 0 to 10) + msgTopic.publish(message) + } } catch { case e: MqttException => println("Exception Caught: " + e) } + finally { + client.disconnect() + client.close() + client = null + } } } From 4b34ee784e7c9c489cf0c22d73311c160bc67c47 Mon Sep 17 00:00:00 2001 From: Bilna P Date: Fri, 2 Jan 2015 17:15:43 +0530 Subject: [PATCH 08/11] Update MQTTStreamSuite.scala --- .../org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 70bff113f0fbd..bfe5fed3335e4 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -45,7 +45,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { before { ssc = new StreamingContext(master, framework, batchDuration) - startUp() + setupMQTT } after { @@ -76,7 +76,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { ssc.stop() } - private def startUp() { + private def setupMQTT() { broker = new BrokerService() connector = new TransportConnector() connector.setName("mqtt") From fac3904a8e702722acca2a0e7217c5440ecda84a Mon Sep 17 00:00:00 2001 From: bilna Date: Sat, 3 Jan 2015 13:46:27 +0530 Subject: [PATCH 09/11] Correction in Indentation and coding style --- external/mqtt/pom.xml | 129 +++++++++--------- .../streaming/mqtt/MQTTStreamSuite.scala | 11 +- 2 files changed, 68 insertions(+), 72 deletions(-) diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index a38a7c01c2090..5a56b8ee45937 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -16,71 +16,70 @@ ~ limitations under the License. --> - - 4.0.0 - - org.apache.spark - spark-parent - 1.3.0-SNAPSHOT - ../../pom.xml - - + + 4.0.0 + org.apache.spark - spark-streaming-mqtt_2.10 - - streaming-mqtt - - jar - Spark Project External MQTT - http://spark.apache.org/ + spark-parent + 1.3.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-mqtt_2.10 + + streaming-mqtt + + jar + Spark Project External MQTT + http://spark.apache.org/ - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${project.version} - provided - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.0.1 - - - org.apache.activemq - activemq-core - 5.7.0 - - - org.scalatest - scalatest_${scala.binary.version} - test - - - org.scalacheck - scalacheck_${scala.binary.version} - test - - - junit - junit - test - - - com.novocode - junit-interface - test - - - - target/scala-${scala.binary.version}/classes - target/scala-${scala.binary.version}/test-classes - - - org.scalatest - scalatest-maven-plugin - - - + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.0.1 + + + org.apache.activemq + activemq-core + 5.7.0 + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + junit + junit + test + + + com.novocode + junit-interface + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index bfe5fed3335e4..98fe6cb301f52 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -45,7 +45,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { before { ssc = new StreamingContext(master, framework, batchDuration) - setupMQTT + setupMQTT() } after { @@ -54,7 +54,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { ssc = null } Utils.deleteRecursively(persistenceDir) - tearDownMQTT + tearDownMQTT() } test("mqtt input stream") { @@ -115,13 +115,10 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) message.setQos(1) message.setRetained(true) - for (i <- 0 to 10) + for (i <- 0 to 100) msgTopic.publish(message) } - } catch { - case e: MqttException => println("Exception Caught: " + e) - } - finally { + } finally { client.disconnect() client.close() client = null From acea3a31eba9d0853cb7484a16f8916219057be0 Mon Sep 17 00:00:00 2001 From: bilna Date: Sun, 4 Jan 2015 23:52:28 +0530 Subject: [PATCH 10/11] Adding dependency with scope test --- external/mqtt/pom.xml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 5a56b8ee45937..d478267b605ba 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -46,11 +46,6 @@ org.eclipse.paho.client.mqttv3 1.0.1 - - org.apache.activemq - activemq-core - 5.7.0 - org.scalatest scalatest_${scala.binary.version} @@ -71,6 +66,12 @@ junit-interface test + + org.apache.activemq + activemq-core + 5.7.0 + test + target/scala-${scala.binary.version}/classes From 5e76f04f978124bc49a335c49cb252cb787b2706 Mon Sep 17 00:00:00 2001 From: bilna Date: Fri, 9 Jan 2015 10:58:17 +0530 Subject: [PATCH 11/11] fix import order and other coding style --- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 98fe6cb301f52..39eb8b183488f 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -19,16 +19,19 @@ package org.apache.spark.streaming.mqtt import java.net.{URI, ServerSocket} +import scala.concurrent.duration._ + import org.apache.activemq.broker.{TransportConnector, BrokerService} -import org.apache.spark.util.Utils +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence + import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Eventually -import scala.concurrent.duration._ + import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.eclipse.paho.client.mqttv3._ -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence +import org.apache.spark.util.Utils class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { @@ -38,8 +41,9 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { private val freePort = findFreePort() private val brokerUri = "//localhost:" + freePort private val topic = "def" - private var ssc: StreamingContext = _ private val persistenceDir = Utils.createTempDir() + + private var ssc: StreamingContext = _ private var broker: BrokerService = _ private var connector: TransportConnector = _ @@ -115,8 +119,9 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) message.setQos(1) message.setRetained(true) - for (i <- 0 to 100) + for (i <- 0 to 100) { msgTopic.publish(message) + } } } finally { client.disconnect()