Skip to content

Commit

Permalink
added mapValues and flatMapVaules WIP for glom and mapPartitions test
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 11, 2014
1 parent a65f302 commit 90a6484
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 18 deletions.
2 changes: 2 additions & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ def _testInputStream(self, test_inputs, numSlices=None):
"""
Generate multiple files to make "stream" in Scala side for test.
Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
QueStream maybe good way to implement this function
"""
numSlices = numSlices or self._sc.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow,
Expand Down
69 changes: 53 additions & 16 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,31 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
self.ctx = ssc._sc
self._jrdd_deserializer = jrdd_deserializer

def context(self):
"""
Return the StreamingContext associated with this DStream
"""
return self._ssc

def count(self):
"""
Return a new DStream which contains the number of elements in this DStream.
"""
return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum()
return self.mapPartitions(lambda i: [sum(1 for _ in i)])._sum()

def _sum(self):
"""
Add up the elements in this DStream.
"""
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

def print_(self, label=None):
"""
Since print is reserved name for python, we cannot define a "print" method function.
This function prints serialized data in RDD in DStream because Scala and Java cannot
deserialized pickled python object. Please use DStream.pyprint() instead to print results.
deserialized pickled python object. Please use DStream.pyprint() to print results.
Call DStream.print().
Call DStream.print() and this function will print byte array in the DStream
"""
# a hack to call print function in DStream
getattr(self._jdstream, "print")(label)
Expand All @@ -63,29 +69,32 @@ def filter(self, f):
Return a new DStream containing only the elements that satisfy predicate.
"""
def func(iterator): return ifilter(f, iterator)
return self._mapPartitions(func)
return self.mapPartitions(func)

def flatMap(self, f, preservesPartitioning=False):
"""
Pass each value in the key-value pair DStream through flatMap function
without changing the keys: this also retains the original RDD's partition.
"""
def func(s, iterator): return chain.from_iterable(imap(f, iterator))
def func(s, iterator):
return chain.from_iterable(imap(f, iterator))
return self._mapPartitionsWithIndex(func, preservesPartitioning)

def map(self, f):
def map(self, f, preservesPartitioning=False):
"""
Return a new DStream by applying a function to each element of DStream.
"""
def func(iterator): return imap(f, iterator)
return self._mapPartitions(func)
def func(iterator):
return imap(f, iterator)
return self.mapPartitions(func, preservesPartitioning)

def _mapPartitions(self, f):
def mapPartitions(self, f, preservesPartitioning=False):
"""
Return a new DStream by applying a function to each partition of this DStream.
"""
def func(s, iterator): return f(iterator)
return self._mapPartitionsWithIndex(func)
def func(s, iterator):
return f(iterator)
return self._mapPartitionsWithIndex(func, preservesPartitioning)

def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
Expand Down Expand Up @@ -131,7 +140,7 @@ def combineLocally(iterator):
else:
combiners[k] = mergeValue(combiners[k], v)
return combiners.iteritems()
locally_combined = self._mapPartitions(combineLocally)
locally_combined = self.mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)

def _mergeCombiners(iterator):
Expand All @@ -143,7 +152,7 @@ def _mergeCombiners(iterator):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()

return shuffled._mapPartitions(_mergeCombiners)
return shuffled.mapPartitions(_mergeCombiners)

def partitionBy(self, numPartitions, partitionFunc=None):
"""
Expand Down Expand Up @@ -233,6 +242,34 @@ def takeAndPrint(rdd, time):

self.foreachRDD(takeAndPrint)

def mapValues(self, f):
"""
Pass each value in the key-value pair RDD through a map function
without changing the keys; this also retains the original RDD's
partitioning.
"""
map_values_fn = lambda (k, v): (k, f(v))
return self.map(map_values_fn, preservesPartitioning=True)

def flatMapValues(self, f):
"""
Pass each value in the key-value pair RDD through a flatMap function
without changing the keys; this also retains the original RDD's
partitioning.
"""
flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
return self.flatMap(flat_map_fn, preservesPartitioning=True)

def glom(self):
"""
Return a new DStream in which RDD is generated by applying glom() to RDD of
this DStream. Applying glom() to an RDD coalesces all elements within each partition into
an list.
"""
def func(iterator):
yield list(iterator)
return self.mapPartitions(func)

#def transform(self, func): - TD
# from utils import RDDFunction
# wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
Expand All @@ -242,7 +279,7 @@ def takeAndPrint(rdd, time):
def _test_output(self, result):
"""
This function is only for test case.
Store data in a DStream to result to verify the result in tese case
Store data in a DStream to result to verify the result in test case
"""
def get_output(rdd, time):
taken = rdd.collect()
Expand Down Expand Up @@ -305,4 +342,4 @@ def _jdstream(self):
return self._jdstream_val

def _is_pipelinable(self):
return not (self.is_cached)
return not self.is_cached
48 changes: 46 additions & 2 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,54 @@ def test_func(dstream):
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def _run_stream(self, test_input, test_func, expected_output):
def test_mapValues(self):
"""Basic operation test for DStream.mapValues"""
test_input = [["a", "a", "b"], ["", ""], []]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).mapValues(lambda x: x + 10)
expected_output = [[("a", 12), ("b", 11)], [("", 12)], []]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_flatMapValues(self):
"""Basic operation test for DStream.flatMapValues"""
test_input = [["a", "a", "b"], ["", ""], []]

def test_func(dstream):
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add).flatMapValues(lambda x: (x, x + 10))
expected_output = [[("a", 2), ("a", 12), ("b", 1), ("b", 11)], [("", 2), ("", 12)], []]
output = self._run_stream(test_input, test_func, expected_output)
self.assertEqual(expected_output, output)

def test_glom(self):
"""Basic operation test for DStream.glom"""
test_input = [range(1, 5), range(5, 9), range(9, 13)]
numSlices = 2

def test_func(dstream):
dstream.pyprint()
return dstream.glom()
expected_output = [[[1,2], [3,4]],[[5,6], [7,8]],[[9,10], [11,12]]]
output = self._run_stream(test_input, test_func, expected_output, numSlices)
self.assertEqual(expected_output, output)

def test_mapPartitions(self):
"""Basic operation test for DStream.mapPartitions"""
test_input = [range(1, 5), range(5, 9), range(9, 13)]
numSlices = 2

def test_func(dstream):
dstream.pyprint()
return dstream.mapPartitions(lambda x: reduce(operator.add, x))
expected_output = [[3, 7],[11, 15],[19, 23]]
output = self._run_stream(test_input, test_func, expected_output, numSlices)
self.assertEqual(expected_output, output)

def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
"""Start stream and return the output"""
# Generate input stream with user-defined input
test_input_stream = self.ssc._testInputStream(test_input)
test_input_stream = self.ssc._testInputStream(test_input, numSlices)
# Apply test function to stream
test_stream = test_func(test_input_stream)
# Add job to get output from stream
Expand Down

0 comments on commit 90a6484

Please sign in to comment.