Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
prabeesh committed Jul 12, 2015
1 parent 1f0cfe9 commit 80474d1
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,9 @@ class MQTTStreamTests(PySparkStreamingTestCase):
def setUp(self):
super(MQTTStreamTests, self).setUp()

utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
self._utils = utilsClz.newInstance()
self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
self._MQTTTestUtils.setup()

def tearDown(self):
Expand All @@ -850,7 +850,7 @@ def tearDown(self):
def _randomTopic(self):
return "topic-%d" % random.randint(0, 10000)

def _validateStreamResult(self, sendData, stream):
def _validateStreamResult(self, sendData, dstream):
result = []

def get_output(_, rdd):
Expand All @@ -862,16 +862,15 @@ def get_output(_, rdd):
self.assertEqual(sendData, receiveData)

def test_mqtt_stream(self):
"""Test the Python Kafka stream API."""
"""Test the Python MQTT stream API."""
topic = self._randomTopic()
sendData = "MQTT demo for spark streaming"
ssc = self.ssc

self._MQTTTestUtils.createTopic(topic)
self._MQTTTestUtils.waitForReceiverToStart(ssc)
self._MQTTTestUtils.publishData(topic, sendData)

stream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic)
stream = MQTTUtils.createStream(ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic)
self._validateStreamResult(sendData, stream)


Expand Down

0 comments on commit 80474d1

Please sign in to comment.