Skip to content
Permalink
Browse files
[BAHIR-24] fix MQTT Python code, examples, add tests
Changes in this PR:

- remove unnecessary files from streaming-mqtt/python
- updated all *.py files with respect to the modified
  project structure pyspark.streaming.mqtt --> mqtt
- add test cases that were left out from the import and
  add shell script to run them:
    - streaming-mqtt/python-tests/run-python-tests.sh
    - streaming-mqtt/python-tests/tests.py
- modify MQTTTestUtils.scala to limit the required disk storage space
- modify bin/run-example script to setup PYTHONPATH to run Python examples

Closes #10
  • Loading branch information
ckadner authored and lresende committed Jul 23, 2016
1 parent 48e91fc commit 12f130846ef7523138e98e79bfd823f61acab3b3
Showing 10 changed files with 247 additions and 693 deletions.
@@ -15,6 +15,9 @@ target/
*.class
*.log

# Python
*.pyc

# Others
.checkstyle
.fbExcludeFilterFile
@@ -16,6 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -o pipefail

# make sure Spark home is set and valid
if [ -z "${SPARK_HOME}" ]; then
@@ -61,8 +62,8 @@ DESCRIPTION
USAGE EXAMPLES
EOF
grep -R "bin/run-example org.apache" --no-filename --include="*.scala" --include="*.java" "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' ; \
grep -R -A1 "bin/run-example \\\\" --no-filename --include="*.scala" --include="*.java" "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' | sed '/^--$/d' | sed 'N;s/\\\n *//g'
grep -R "bin/run-example org.apache" --no-filename --include=*.{scala,java,py} "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' ; \
grep -R -A1 "bin/run-example \\\\" --no-filename --include=*.{scala,java,py} "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' | sed '/^--$/d' | sed 'N;s/\\\n *//g'
exit 1
}

@@ -122,10 +123,22 @@ examples_jar="${module_tests_jar_path}"
# streaming-akka/target/spark-streaming-akka_2.11-2.0.0-SNAPSHOT-tests.jar \
# localhost 9999

# capture the full command line and echo it for transparency and debug purposes
cmd="${SPARK_HOME}/bin/spark-submit \
--packages ${spark_package} \
--class ${example_class} ${examples_jar} ${example_args}"
# for Python examples add all of the Bahir project's Python sources to PYTHONPATH, which in local
# mode is easier than creating a zip files to be used with the --py-files option (TODO: BAHIR-35)
# Note that --py-files with individual *.py files does not work if those modules are imported at top
# of the example script but rather imports must be pushed down to after SparkContext initialization
if [[ "$example_class" == *.py ]]; then
export PYTHONPATH="$( find "$project_dir" -path '*/python' -maxdepth 5 -type d | tr '\n' ':' )$PYTHONPATH"
cmd="${SPARK_HOME}/bin/spark-submit \
--packages ${spark_package} \
${example_class} \
${example_args}"
else
cmd="${SPARK_HOME}/bin/spark-submit \
--packages ${spark_package} \
--class ${example_class} ${examples_jar} \
${example_args}"
fi

echo "---"
echo "Spark-Submit command: $cmd"
@@ -95,7 +95,7 @@
<log4j.version>1.2.17</log4j.version>

<!-- Spark version -->
<spark.version>2.0.0-SNAPSHOT</spark.version>
<spark.version>2.0.1-SNAPSHOT</spark.version>

<!-- Streaming Akka connector -->
<akka.group>com.typesafe.akka</akka.group>
@@ -20,23 +20,35 @@
Usage: mqtt_wordcount.py <broker url> <topic>
To run this in your local machine, you need to setup a MQTT broker and publisher first,
Mosquitto is one of the open source MQTT Brokers, see
http://mosquitto.org/
Eclipse paho project provides number of clients and utilities for working with MQTT, see
http://www.eclipse.org/paho/#getting-started
and then run the example
`$ bin/spark-submit --jars \
external/mqtt-assembly/target/scala-*/spark-streaming-mqtt-assembly-*.jar \
examples/src/main/python/streaming/mqtt_wordcount.py \
tcp://localhost:1883 foo`
like Mosquitto (http://mosquitto.org/) an easy to use and install open source MQTT Broker.
On Mac OS Mosquitto can be installed with Homebrew `$ brew install mosquitto`.
On Ubuntu mosquitto can be installed with the command `$ sudo apt-get install mosquitto`.
Alternatively, the Eclipse paho project provides a number of clients and utilities for
working with MQTT, see http://www.eclipse.org/paho/#getting-started
How to run this example locally:
(1) Start Mqtt message broker/server, i.e. Mosquitto:
`$ mosquitto -p 1883`
(2) Run the publisher:
`$ bin/run-example \
org.apache.spark.examples.streaming.mqtt.MQTTPublisher tcp://localhost:1883 foo`
(3) Run the example:
`$ bin/run-example \
streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py tcp://localhost:1883 foo`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
from mqtt import MQTTUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
@@ -0,0 +1,79 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -o pipefail

# make sure Spark home is set and valid
if [ -z "${SPARK_HOME}" ]; then
echo "SPARK_HOME is not set" >&2
exit 1
elif [ ! -d "${SPARK_HOME}" ]; then
echo "SPARK_HOME does not point to a valid directory" >&2
exit 1
fi

# pinpoint the module folder and project root folder
bin_dir=$( dirname "$0" )
module_dir=$( cd "${bin_dir}/.." && pwd -P )
project_dir=$( cd "${module_dir}/.." && pwd -P )
stdout_log="${module_dir}/target/python-tests-python-output.log"
stderr_log="${module_dir}/target/python-tests-java-output.log"

# use the module name to find the tests jar file that contains the example to run
module_name=${module_dir#"${project_dir}"/}
module_tests_jar_path=$( find "${module_dir}/target" -name "*${module_name}*-tests.jar" -maxdepth 1 | head -1 )

if [ -z "${module_tests_jar_path}" ] || [ ! -e "${module_tests_jar_path}" ]; then
echo "Could not find module tests jar file in ${module_dir}/target/" >&2
echo "Run \"mvn clean install\" and retry running this example" >&2
exit 1
fi

# use maven-help-plugin to determine project version and Scala version
module_version=$( cd "${module_dir}" && mvn org.apache.maven.plugins:maven-help-plugin:2.2:evaluate -Dexpression=project.version | grep -v "INFO\|WARNING\|ERROR\|Downloading" | tail -1 )
scala_version=$( cd "${module_dir}" && mvn org.apache.maven.plugins:maven-help-plugin:2.2:evaluate -Dexpression=scala.binary.version | grep -v "INFO\|WARNING\|ERROR\|Downloading" | tail -1 )

# we are using spark-submit with --packages to run the tests and all necessary dependencies are
# resolved by maven which requires running "mvn" or "mvn install" first
spark_packages="org.apache.bahir:spark-${module_name}_${scala_version}:${module_version}"

# find additional test-scoped dependencies and add them to the --packages list
test_dependencies=$( cd "${project_dir}" && mvn dependency:tree -Dscope=test -Dtokens=standard -pl ${module_name} | grep "\[INFO\] +- [a-z].*:test" | grep -ivE "spark|bahir|scala|junit" | sed 's/\[INFO\] +- //; s/:jar//; s/:test//' )
for td in ${test_dependencies}; do
spark_packages="${spark_packages},${td}"
done

# since we are running locally, we can use PYTHONPATH instead of --py-files (TODO: BAHIR-35)
export PYTHONPATH="${module_dir}/python:${PYTHONPATH}"

# run the tests via spark-submit and capture the output in two separate log files (stdout=Python,
# stderr=Java) while only printing stdout to console
"${SPARK_HOME}"/bin/spark-submit \
--master local[*] \
--driver-memory 512m \
--packages "${spark_packages}" \
--jars "${module_tests_jar_path}" \
"${module_dir}/python-tests/tests.py" \
1> >( tee "${stdout_log}" | grep -w '[[:alpha:]=-]\{2,\}' ) \
2> "${stderr_log}"

# if the Python code doesn't get executed due to errors in SparkSubmit the stdout log file will be
# empty and nothing was logged to the console, then lets print the stderr log (Java output)
if [ ! -s "${stdout_log}" ]; then
cat "${stderr_log}"
echo "Error during test execution"
fi
@@ -0,0 +1,99 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys
import time
import random

if sys.version_info[:2] <= (2, 6):
try:
import unittest2 as unittest
except ImportError:
sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
sys.exit(1)
else:
import unittest

from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.tests import PySparkStreamingTestCase
from mqtt import MQTTUtils

class MQTTStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds
duration = 1

def setUp(self):
super(MQTTStreamTests, self).setUp()

MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
self._MQTTTestUtils.setup()

def tearDown(self):
if self._MQTTTestUtils is not None:
self._MQTTTestUtils.teardown()
self._MQTTTestUtils = None

super(MQTTStreamTests, self).tearDown()

def _randomTopic(self):
return "topic-%d" % random.randint(0, 10000)

def _startContext(self, topic):
# Start the StreamingContext and also collect the result
stream = MQTTUtils.createStream(self.ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic)
result = []

def getOutput(_, rdd):
for data in rdd.collect():
result.append(data)

stream.foreachRDD(getOutput)
self.ssc.start()
return result

def test_mqtt_stream(self):
"""Test the Python MQTT stream API."""
sendData = "MQTT demo for spark streaming"
topic = self._randomTopic()
result = self._startContext(topic)

def retry():
self._MQTTTestUtils.publishData(topic, sendData)
# Because "publishData" sends duplicate messages, here we should use > 0
self.assertTrue(len(result) > 0)
self.assertEqual(sendData, result[0])

# Retry it because we don't know when the receiver will start.
self._retry_or_timeout(retry)

def _retry_or_timeout(self, test_func):
start_time = time.time()
while True:
try:
test_func()
break
except:
if time.time() - start_time > self.timeout:
raise
time.sleep(0.01)


if __name__ == "__main__":
unittest.main()

This file was deleted.

0 comments on commit 12f1308

Please sign in to comment.