From efde0e9b778d5dff6602a2e18d47c06108136157 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 27 Apr 2018 12:39:35 +0530 Subject: [PATCH 01/10] Porting some changes from spark build, to fix my local bahir build --- pom.xml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index dc54de19..41ae9085 100644 --- a/pom.xml +++ b/pom.xml @@ -407,7 +407,7 @@ org.apache.maven.plugins maven-enforcer-plugin - 1.4.1 + 3.0.0-M1 enforce-versions @@ -433,6 +433,7 @@ --> org.jboss.netty org.codehaus.groovy + *:*_2.10 true @@ -482,7 +483,8 @@ net.alchim31.maven scala-maven-plugin - 3.3.1 + + 3.2.2 eclipse-add-source From 37487c131f602c2879daf5760396f40cab306f11 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 9 May 2018 12:54:17 +0530 Subject: [PATCH 02/10] Migrating Mqtt spark structured streaming connector to DatasourceV2 API. --- pom.xml | 10 +- .../akka/AkkaStreamSourceSuite.scala | 2 +- .../streaming/mqtt/MQTTStreamWordCount.scala | 2 +- .../sql/streaming/mqtt/MQTTStreamSource.scala | 236 +++++++++++------- .../sql/streaming/mqtt/MessageStore.scala | 90 ++++--- .../mqtt/LocalMessageStoreSuite.scala | 6 +- .../mqtt/MQTTStreamSourceSuite.scala | 52 ++-- .../sql/streaming/mqtt/MQTTTestUtils.scala | 11 +- 8 files changed, 253 insertions(+), 156 deletions(-) diff --git a/pom.xml b/pom.xml index 41ae9085..e00d9be8 100644 --- a/pom.xml +++ b/pom.xml @@ -99,7 +99,7 @@ 1.2.17 - 2.2.0 + 2.3.0 1.1.0 @@ -348,13 +348,13 @@ org.scalatest scalatest_${scala.binary.version} - 2.2.6 + 3.0.3 test org.scalacheck scalacheck_${scala.binary.version} - 1.12.5 + 1.13.5 test @@ -559,7 +559,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.19.1 + 2.20.1 @@ -569,7 +569,7 @@ **/*Suite.java ${project.build.directory}/surefire-reports - -Xmx3g -Xss4096k -XX:ReservedCodeCacheSize=${CodeCacheSize} + -ea -Xmx3g -Xss4m -XX:ReservedCodeCacheSize=${CodeCacheSize} streaming-mqtt sql-streaming-mqtt streaming-twitter diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala index cd03acb3..5482c32b 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala @@ -176,8 +176,7 @@ class StressTestMQTTSource extends MQTTStreamSourceSuite { val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe() val query = writeStreamResults(sqlContext, dataFrame) - mqttTestUtils.publishData("test", sendMessage, noOfMsgs / 2) - mqttTestUtils.publishData("test", sendMessage, noOfMsgs / 2) + mqttTestUtils.publishData("test", sendMessage, noOfMsgs ) query.processAllAvailable() query.awaitTermination(25000) From 7adc253c44f117f827fe995fd61ddbbe61436c52 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 11 May 2018 12:16:59 +0530 Subject: [PATCH 06/10] BAHIR-83, does not seem to bother us anymore. --- bin/test-BAHIR-83.sh | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100755 bin/test-BAHIR-83.sh diff --git a/bin/test-BAHIR-83.sh b/bin/test-BAHIR-83.sh new file mode 100755 index 00000000..7a1ffdc9 --- /dev/null +++ b/bin/test-BAHIR-83.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +set -o pipefail + +for i in `seq 100` ; do + mvn scalatest:test -pl sql-streaming-mqtt -q \ + -Dsuites='*.BasicMQTTSourceSuite @ Recovering offset from the last processed offset.' | \ + grep -q "TEST FAILED" && echo "$i: failed" +done From 3cdb10236272298cd35d80bdd467520756df14ed Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 11 May 2018 13:35:45 +0530 Subject: [PATCH 07/10] Updated documents to reflect the change is usage of the connector. Also added a best practices guide. --- sql-streaming-mqtt/README.md | 45 ++++++++++++++++++- .../mqtt/JavaMQTTStreamWordCount.java | 2 +- .../mqtt/MQTTStreamSourceSuite.scala | 3 +- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md index 2cfbe0f8..d3645849 100644 --- a/sql-streaming-mqtt/README.md +++ b/sql-streaming-mqtt/README.md @@ -68,7 +68,7 @@ An example, for scala API to count words from incoming message stream. val lines = spark.readStream .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") .option("topic", topic) - .load(brokerUrl).as[(String, Timestamp)] + .load(brokerUrl).selectExpr("CAST(payload AS STRING)").as[String] // Split the lines into words val words = lines.map(_._1).flatMap(_.split(" ")) @@ -95,7 +95,8 @@ An example, for Java API to count words from incoming message stream. .readStream() .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") .option("topic", topic) - .load(brokerUrl).select("value").as(Encoders.STRING()); + .load(brokerUrl) + .selectExpr("CAST(payload AS STRING)").as(Encoders.STRING()); // Split the lines into words Dataset words = lines.flatMap(new FlatMapFunction() { @@ -118,3 +119,43 @@ An example, for Java API to count words from incoming message stream. Please see `JavaMQTTStreamWordCount.java` for full example. +## Best Practices. + +1. > *MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport.* + +The design of Mqtt and the purpose it serves goes well together, but often in an application it is of outmost value to have reliablity. Since mqtt is not a distributed message queue and thus does not offer the highest level of reliability features. It should be redirected via a kafka message queue to take advantage of a distributed message queue. Infact, using a kafka message queue offers a lot of possiblities including a single kafka topic subscribed to several mqtt sources and even a single mqtt stream publishing to multiple kafka topics. Kafka is a reliable and scalable message queue. + +2. Often the message payload is not of the default character encoding or contains binary that needs to be parsed using a particular parser. In such cases, spark mqtt payload should be processed using the external parser. For example: + + * Scala API example: +```scala + // Create DataFrame representing the stream of input lines from connection to mqtt server + val lines = spark.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", topic) + .load(brokerUrl).select("payload").as[Array[Byte]].map(externalParser(_)) +``` + + * Java API example +```java + // Create DataFrame representing the stream of input lines from connection to mqtt server + Dataset lines = spark + .readStream() + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", topic) + .load(brokerUrl).selectExpr("CAST(payload AS BINARY)").as(Encoders.BINARY()); + + // Split the lines into words + Dataset words = lines.map(new MapFunction() { + @Override + public String call(byte[] bytes) throws Exception { + return new String(bytes); // Plug in external parser here. + } + }, Encoders.STRING()).flatMap(new FlatMapFunction() { + @Override + public Iterator call(String x) { + return Arrays.asList(x.split(" ")).iterator(); + } + }, Encoders.STRING()); + +``` diff --git a/sql-streaming-mqtt/examples/src/main/java/org/apache/bahir/examples/sql/streaming/mqtt/JavaMQTTStreamWordCount.java b/sql-streaming-mqtt/examples/src/main/java/org/apache/bahir/examples/sql/streaming/mqtt/JavaMQTTStreamWordCount.java index 519d9a03..4e87c990 100644 --- a/sql-streaming-mqtt/examples/src/main/java/org/apache/bahir/examples/sql/streaming/mqtt/JavaMQTTStreamWordCount.java +++ b/sql-streaming-mqtt/examples/src/main/java/org/apache/bahir/examples/sql/streaming/mqtt/JavaMQTTStreamWordCount.java @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception { .readStream() .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") .option("topic", topic) - .load(brokerUrl).select("value").as(Encoders.STRING()); + .load(brokerUrl).selectExpr("CAST(payload AS STRING)").as(Encoders.STRING()); // Split the lines into words Dataset words = lines.flatMap(new FlatMapFunction() { diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala index 5482c32b..61ce63d3 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala @@ -55,8 +55,7 @@ class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with B protected val tmpDir: String = tempDir.getAbsolutePath - protected def writeStreamResults(sqlContext: SQLContext, - dataFrame: DataFrame): StreamingQuery = { + protected def writeStreamResults(sqlContext: SQLContext, dataFrame: DataFrame): StreamingQuery = { import sqlContext.implicits._ val query: StreamingQuery = dataFrame.selectExpr("CAST(payload AS STRING)").as[String] .writeStream.format("parquet").start(s"$tmpDir/t.parquet") From 201ff5a37f8e40f9a7f70c60189f91374ff91555 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 11 May 2018 17:01:48 +0530 Subject: [PATCH 08/10] Spell checked, and added another best practice scenario. --- sql-streaming-mqtt/README.md | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md index d3645849..0b9d7c72 100644 --- a/sql-streaming-mqtt/README.md +++ b/sql-streaming-mqtt/README.md @@ -121,15 +121,17 @@ Please see `JavaMQTTStreamWordCount.java` for full example. ## Best Practices. -1. > *MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport.* +1. Turn Mqtt into a more reliable messaging service. -The design of Mqtt and the purpose it serves goes well together, but often in an application it is of outmost value to have reliablity. Since mqtt is not a distributed message queue and thus does not offer the highest level of reliability features. It should be redirected via a kafka message queue to take advantage of a distributed message queue. Infact, using a kafka message queue offers a lot of possiblities including a single kafka topic subscribed to several mqtt sources and even a single mqtt stream publishing to multiple kafka topics. Kafka is a reliable and scalable message queue. +> *MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport.* + +The design of Mqtt and the purpose it serves goes well together, but often in an application it is of utmost value to have reliability. Since mqtt is not a distributed message queue and thus does not offer the highest level of reliability features. It should be redirected via a kafka message queue to take advantage of a distributed message queue. In fact, using a kafka message queue offers a lot of possibilities including a single kafka topic subscribed to several mqtt sources and even a single mqtt stream publishing to multiple kafka topics. Kafka is a reliable and scalable message queue. 2. Often the message payload is not of the default character encoding or contains binary that needs to be parsed using a particular parser. In such cases, spark mqtt payload should be processed using the external parser. For example: * Scala API example: ```scala - // Create DataFrame representing the stream of input lines from connection to mqtt server + // Create DataFrame representing the stream of binary messages val lines = spark.readStream .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") .option("topic", topic) @@ -138,7 +140,7 @@ The design of Mqtt and the purpose it serves goes well together, but often in an * Java API example ```java - // Create DataFrame representing the stream of input lines from connection to mqtt server + // Create DataFrame representing the stream of binary messages Dataset lines = spark .readStream() .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") @@ -159,3 +161,10 @@ The design of Mqtt and the purpose it serves goes well together, but often in an }, Encoders.STRING()); ``` + +3. What is the solution for a situation when there are a large number of varied mqtt sources, each with different schema and throughput characteristics. + +This is an anti-pattern for spark structured streaming, which is designed to process a single schema, high volume streaming feed. Generally, one would create a lot of streaming pipelines to solve this problem. This would either require a very sophisticated scheduling setup or will waste a lot of resources, as it is not certain which stream is using more amount of data. + +The general solution is both less optimum and is more cumbersome to operate, with multiple moving parts incurs a high maintenance overall. As an alternative, in this situation, one can setup a single topic kafka-spark stream, where message from each of the varied stream contains a unique tag separating one from other streams. This way at the processing end, one can distinguish the message from one another and apply the right kind of decoding and processing. Similarly while storing, each message can be distinguished from others by a tag that distinguishes. + From b284635e0043706fd346b02f232670fbfb9e56c3 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 11 May 2018 17:02:07 +0530 Subject: [PATCH 09/10] fixed error reporting. --- .../org/apache/bahir/sql/streaming/mqtt/MessageStore.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala index 54b09cd7..d7d26572 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala @@ -63,14 +63,14 @@ trait Serializer { def serialize[T](x: T): Array[Byte] } -class JavaSerializer extends Serializer { +class JavaSerializer extends Serializer with Logging { override def deserialize[T](x: Array[Byte]): T = { val bis = new ByteArrayInputStream(x) val in = new ObjectInputStream(bis) val obj = if (in != null) { val o = in.readObject() - Try(in.close()) + Try(in.close()).recover { case t: Throwable => log.warn("failed to close stream", t) } o } else { null @@ -85,7 +85,7 @@ class JavaSerializer extends Serializer { out.flush() if (bos != null) { val bytes: Array[Byte] = bos.toByteArray - Try(bos.close()) + Try(bos.close()).recover { case t: Throwable => log.warn("failed to close stream", t) } bytes } else { null From d9a9335bb0c46b48f77ccb173c8a5b9ad23c1ad3 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 14 May 2018 16:35:10 +0530 Subject: [PATCH 10/10] Testing BAHIR-83 again. --- bin/test-BAHIR-83.sh | 3 +-- sql-streaming-mqtt/README.md | 6 ++++-- .../sql/streaming/mqtt/MQTTStreamSource.scala | 9 +++++---- .../sql/streaming/mqtt/MQTTStreamSourceSuite.scala | 14 ++++++++------ .../bahir/sql/streaming/mqtt/MQTTTestUtils.scala | 2 +- 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/bin/test-BAHIR-83.sh b/bin/test-BAHIR-83.sh index 7a1ffdc9..659dd8c8 100755 --- a/bin/test-BAHIR-83.sh +++ b/bin/test-BAHIR-83.sh @@ -19,7 +19,6 @@ set -o pipefail for i in `seq 100` ; do - mvn scalatest:test -pl sql-streaming-mqtt -q \ - -Dsuites='*.BasicMQTTSourceSuite @ Recovering offset from the last processed offset.' | \ + mvn scalatest:test -pl sql-streaming-mqtt -q -Dsuites='*.BasicMQTTSourceSuite' | \ grep -q "TEST FAILED" && echo "$i: failed" done diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md index 0b9d7c72..b7f06021 100644 --- a/sql-streaming-mqtt/README.md +++ b/sql-streaming-mqtt/README.md @@ -59,7 +59,9 @@ This source uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/jav * `connectionTimeout` Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information. * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`. * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`. - + * `maxInflight` Same as `MqttConnectOptions.setMaxInflight` + * `autoReconnect` Same as `MqttConnectOptions.setAutomaticReconnect` + ### Scala API An example, for scala API to count words from incoming message stream. @@ -164,7 +166,7 @@ The design of Mqtt and the purpose it serves goes well together, but often in an 3. What is the solution for a situation when there are a large number of varied mqtt sources, each with different schema and throughput characteristics. -This is an anti-pattern for spark structured streaming, which is designed to process a single schema, high volume streaming feed. Generally, one would create a lot of streaming pipelines to solve this problem. This would either require a very sophisticated scheduling setup or will waste a lot of resources, as it is not certain which stream is using more amount of data. +Generally, one would create a lot of streaming pipelines to solve this problem. This would either require a very sophisticated scheduling setup or will waste a lot of resources, as it is not certain which stream is using more amount of data. The general solution is both less optimum and is more cumbersome to operate, with multiple moving parts incurs a high maintenance overall. As an alternative, in this situation, one can setup a single topic kafka-spark stream, where message from each of the varied stream contains a unique tag separating one from other streams. This way at the processing end, one can distinguish the message from one another and apply the right kind of decoding and processing. Similarly while storing, each message can be distinguished from others by a tag that distinguishes. diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala index e1e26076..2f75ee22 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala @@ -27,7 +27,6 @@ import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap import scala.collection.immutable.IndexedSeq import scala.collection.mutable.ListBuffer -import scala.util.{Failure, Success, Try} import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} @@ -280,15 +279,17 @@ class MQTTStreamSourceProvider extends DataSourceV2 .KEEP_ALIVE_INTERVAL_DEFAULT.toString).toInt val mqttVersion: Int = parameters.get("mqttVersion").orElse(MqttConnectOptions .MQTT_VERSION_DEFAULT.toString).toInt - val cleanSession: Boolean = parameters.get("cleanSession").orElse("false").toBoolean + val cleanSession: Boolean = parameters.get("cleanSession").orElse("true").toBoolean val qos: Int = parameters.get("QoS").orElse("1").toInt - + val autoReconnect: Boolean = parameters.get("autoReconnect").orElse("false").toBoolean + val maxInflight: Int = parameters.get("maxInflight").orElse("60").toInt val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() - mqttConnectOptions.setAutomaticReconnect(true) + mqttConnectOptions.setAutomaticReconnect(autoReconnect) mqttConnectOptions.setCleanSession(cleanSession) mqttConnectOptions.setConnectionTimeout(connectionTimeout) mqttConnectOptions.setKeepAliveInterval(keepAlive) mqttConnectOptions.setMqttVersion(mqttVersion) + mqttConnectOptions.setMaxInflight(maxInflight) (username, password) match { case (u: String, p: String) if u != null && p != null => mqttConnectOptions.setUserName(u) diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala index 61ce63d3..2ce72da9 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala @@ -59,7 +59,9 @@ class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with B import sqlContext.implicits._ val query: StreamingQuery = dataFrame.selectExpr("CAST(payload AS STRING)").as[String] .writeStream.format("parquet").start(s"$tmpDir/t.parquet") - while (!query.isActive) {} + while (!query.status.isTriggerActive) { + Thread.sleep(20) + } query } @@ -81,8 +83,9 @@ class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with B val ds: DataStreamReader = sqlContext.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") - .option("topic", "test").option("clientId", "clientId") - .option("QoS", "2") + .option("topic", "test").option("clientId", "clientId").option("connectionTimeout", "120") + .option("keepAlive", "1200").option("maxInflight", "120").option("autoReconnect", "false") + .option("cleanSession", "true").option("QoS", "2") val dataFrame = if (!filePersistence) { ds.option("persistence", "memory").load("tcp://" + mqttTestUtils.brokerUri) @@ -101,13 +104,12 @@ class BasicMQTTSourceSuite extends MQTTStreamSourceSuite { val sendMessage = "MQTT is a message queue." - val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe() val query = writeStreamResults(sqlContext, dataFrame) mqttTestUtils.publishData("test", sendMessage) query.processAllAvailable() - query.awaitTermination(5000) + query.awaitTermination(10000) val resultBuffer: mutable.Buffer[String] = readBackStreamingResults(sqlContext) @@ -160,7 +162,7 @@ class BasicMQTTSourceSuite extends MQTTStreamSourceSuite { class StressTestMQTTSource extends MQTTStreamSourceSuite { // Run with -Xmx1024m - ignore("Send and receive messages of size 100MB.") { + test("Send and receive messages of size 100MB.") { val freeMemory: Long = Runtime.getRuntime.freeMemory() diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTTestUtils.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTTestUtils.scala index f105a631..817ec9a4 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTTestUtils.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTTestUtils.scala @@ -80,7 +80,7 @@ class MQTTTestUtils(tempDir: File, port: Int = 0) extends Logging { val msgTopic = client.getTopic(topic) for (i <- 0 until N) { try { - Thread.sleep(10) + Thread.sleep(20) val message = new MqttMessage(data.getBytes()) message.setQos(2) // message.setId(i) setting id has no effect.