Skip to content

Commit

Permalink
Changed the test case to focus the test operation
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 19, 2014
1 parent 199e37f commit 58150f5
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 40 deletions.
3 changes: 1 addition & 2 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,9 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this?
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this?
java_import(gateway.jvm, "org.apache.spark.streaming.*") # for Duration and Time
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
Expand Down
9 changes: 4 additions & 5 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pyspark.rdd import _JavaStackTrace
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
from pyspark.streaming.utils import rddToFileName
from pyspark.streaming.utils import rddToFileName, RDDFunction


from py4j.java_collections import ListConverter, MapConverter
Expand Down Expand Up @@ -227,7 +227,6 @@ def foreachRDD(self, func):
This is an output operator, so this DStream will be registered as an output
stream and there materialized.
"""
from utils import RDDFunction
wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
self.ctx._jvm.PythonForeachDStream(self._jdstream.dstream(), wrapped_func)

Expand Down Expand Up @@ -386,18 +385,18 @@ def saveAsTextFile(rdd, time):

return self.foreachRDD(saveAsTextFile)

def saveAsPickledFiles(self, prefix, suffix=None):
def saveAsPickleFiles(self, prefix, suffix=None):
"""
Save this DStream as a SequenceFile of serialized objects. The serializer
used is L{pyspark.serializers.PickleSerializer}, default batch size
is 10.
"""

def saveAsTextFile(rdd, time):
def saveAsPickleFile(rdd, time):
path = rddToFileName(prefix, suffix, time)
rdd.saveAsPickleFile(path)

return self.foreachRDD(saveAsTextFile)
return self.foreachRDD(saveAsPickleFile)


# TODO: implement updateStateByKey
Expand Down
79 changes: 46 additions & 33 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class PySparkStreamingTestCase(unittest.TestCase):
def setUp(self):
class_name = self.__class__.__name__
self.ssc = StreamingContext(appName=class_name, duration=Seconds(1))
time.sleep(1)

def tearDown(self):
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
Expand Down Expand Up @@ -186,68 +187,73 @@ def test_func(dstream):

def test_reduceByKey_batch(self):
"""Basic operation test for DStream.reduceByKey with batch deserializer"""
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]
test_input = [[("a", 1), ("a", 1), ("b", 1), ("b", 1)],
[("", 1),("", 1), ("", 1), ("", 1)],
[(1, 1), (1, 1), (2, 1), (2, 1), (3, 1)]]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
expected_output = [[("a", 2), ("b", 2)], [("", 4)]]
return dstream.reduceByKey(operator.add)
expected_output = [[("a", 2), ("b", 2)], [("", 4)], [(1, 2), (2, 2), (3 ,1)]]
output = self._run_stream(test_input, test_func, expected_output)
for result in (output, expected_output):
self._sort_result_based_on_key(result)
self.assertEqual(expected_output, output)

def test_reduceByKey_unbatch(self):
"""Basic operation test for DStream.reduceByKey with unbatch deserilizer"""
test_input = [["a", "a", "b"], ["", ""], []]
test_input = [[("a", 1), ("a", 1), ("b", 1)], [("", 1), ("", 1)], []]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
return dstream.reduceByKey(operator.add)
expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
output = self._run_stream(test_input, test_func, expected_output)
for result in (output, expected_output):
self._sort_result_based_on_key(result)
self.assertEqual(expected_output, output)

def test_mapValues_batch(self):
"""Basic operation test for DStream.mapValues with batch deserializer"""
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]
test_input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
[("", 4), (1, 1), (2, 2), (3, 3)]]

def test_func(dstream):
return dstream.map(lambda x: (x, 1))\
.reduceByKey(operator.add)\
.mapValues(lambda x: x + 10)
expected_output = [[("a", 12), ("b", 12)], [("", 14)]]
return dstream.mapValues(lambda x: x + 10)
expected_output = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)],
[("", 14), (1, 11), (2, 12), (3, 13)]]
output = self._run_stream(test_input, test_func, expected_output)
for result in (output, expected_output):
self._sort_result_based_on_key(result)
self.assertEqual(expected_output, output)

def test_mapValues_unbatch(self):
"""Basic operation test for DStream.mapValues with unbatch deserializer"""
test_input = [["a", "a", "b"], ["", ""], []]
test_input = [[("a", 2), ("b", 1)], [("", 2)], []]

def test_func(dstream):
return dstream.map(lambda x: (x, 1))\
.reduceByKey(operator.add)\
.mapValues(lambda x: x + 10)
return dstream.mapValues(lambda x: x + 10)
expected_output = [[("a", 12), ("b", 11)], [("", 12)], []]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_flatMapValues_batch(self):
"""Basic operation test for DStream.flatMapValues with batch deserializer"""
test_input = [["a", "a", "b", "b"], ["", "", "", ""]]
test_input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], [("", 4), (1, 1), (2, 1), (3, 1)]]

def test_func(dstream):
return dstream.map(lambda x: (x, 1))\
.reduceByKey(operator.add)\
.flatMapValues(lambda x: (x, x + 10))
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12)], [("", 4), ("", 14)]]
return dstream.flatMapValues(lambda x: (x, x + 10))
expected_output = [[("a", 2), ("a", 12), ("b", 2), ("b", 12),
("c", 1), ("c", 11), ("d", 1), ("d", 11)],
[("", 4), ("", 14), (1, 1), (1, 11),
(2, 1), (2, 11), (3, 1), (3, 11)]]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_flatMapValues_unbatch(self):
"""Basic operation test for DStream.flatMapValues with unbatch deserializer"""
test_input = [["a", "a", "b"], ["", ""], []]
test_input = [[("a", 2), ("b", 1)], [("", 2)], []]

def test_func(dstream):
return dstream.map(lambda x: (x, 1))\
.reduceByKey(operator.add)\
.flatMapValues(lambda x: (x, x + 10))
return dstream.flatMapValues(lambda x: (x, x + 10))
expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)
Expand Down Expand Up @@ -302,7 +308,7 @@ def f(iterator):

def test_countByValue_batch(self):
"""Basic operation test for DStream.countByValue with batch deserializer."""
test_input = [range(1, 5) + range(1,5), range(5, 7) + range(5, 9), ["a", "a", "b", ""]]
test_input = [range(1, 5) * 2, range(5, 7) + range(5, 9), ["a", "a", "b", ""]]

def test_func(dstream):
return dstream.countByValue()
Expand Down Expand Up @@ -330,9 +336,12 @@ def test_func(dstream):

def test_groupByKey_batch(self):
"""Basic operation test for DStream.groupByKey with batch deserializer."""
test_input = [range(1, 5), [1, 1, 1, 2, 2, 3], ["a", "a", "b", "", "", ""]]
test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
[(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
[("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).groupByKey()
return dstream.groupByKey()
expected_output = [[(1, [1]), (2, [1]), (3, [1]), (4, [1])],
[(1, [1, 1, 1]), (2, [1, 1]), (3, [1])],
[("a", [1, 1]), ("b", [1]), ("", [1, 1, 1])]]
Expand All @@ -344,10 +353,12 @@ def test_func(dstream):

def test_groupByKey_unbatch(self):
"""Basic operation test for DStream.groupByKey with unbatch deserializer."""
test_input = [range(1, 4), [1, 1, ""], ["a", "a", "b"]]
test_input = [[(1, 1), (2, 1), (3, 1)],
[(1, 1), (1, 1), ("", 1)],
[("a", 1), ("a", 1), ("b", 1)]]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).groupByKey()
return dstream.groupByKey()
expected_output = [[(1, [1]), (2, [1]), (3, [1])],
[(1, [1, 1]), ("", [1])],
[("a", [1, 1]), ("b", [1])]]
Expand All @@ -359,11 +370,13 @@ def test_func(dstream):

def test_combineByKey_batch(self):
"""Basic operation test for DStream.combineByKey with batch deserializer."""
test_input = [range(1, 5), [1, 1, 1, 2, 2, 3], ["a", "a", "b", "", "", ""]]
test_input = [[(1, 1), (2, 1), (3, 1), (4, 1)],
[(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)],
[("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]]

def test_func(dstream):
def add(a, b): return a + str(b)
return dstream.map(lambda x: (x, 1)).combineByKey(str, add, add)
return dstream.combineByKey(str, add, add)
expected_output = [[(1, "1"), (2, "1"), (3, "1"), (4, "1")],
[(1, "111"), (2, "11"), (3, "1")],
[("a", "11"), ("b", "1"), ("", "111")]]
Expand All @@ -374,11 +387,11 @@ def add(a, b): return a + str(b)

def test_combineByKey_unbatch(self):
"""Basic operation test for DStream.combineByKey with unbatch deserializer."""
test_input = [range(1, 4), [1, 1, ""], ["a", "a", "b"]]
test_input = [[(1, 1), (2, 1), (3 ,1)], [(1, 1), (1, 1), ("", 1)], [("a", 1), ("a", 1), ("b", 1)]]

def test_func(dstream):
def add(a, b): return a + str(b)
return dstream.map(lambda x: (x, 1)).combineByKey(str, add, add)
return dstream.combineByKey(str, add, add)
expected_output = [[(1, "1"), (2, "1"), (3, "1")],
[(1, "11"), ("", "1")],
[("a", "11"), ("b", "1")]]
Expand Down Expand Up @@ -446,4 +459,4 @@ def tearDownClass(cls):


if __name__ == "__main__":
unittest.main()
unittest.main(verbosity=2)

0 comments on commit 58150f5

Please sign in to comment.