Skip to content

Commit

Permalink
fix map function
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and Ken Takagiwa committed Jul 20, 2014
1 parent d01a125 commit bd20e17
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
2 changes: 2 additions & 0 deletions examples/src/main/python/streaming/network_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
mapped_lines = fm_lines.map(lambda x: (x, 1))
reduced_lines = mapped_lines.reduce(add)
counted_lines = reduced_lines.count()

fm_lines.pyprint()
filtered_lines.pyprint()
mapped_lines.pyprint()
reduced_lines.pyprint()
counted_lines.pyprint()
ssc.start()
ssc.awaitTermination()
32 changes: 13 additions & 19 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,14 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
self.ctx = ssc._sc
self._jrdd_deserializer = jrdd_deserializer

def generatedRDDs(self):
"""
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
"""
pass

def count(self):
"""
"""
#TODO make sure count implementation, thiis different from what pyspark does
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum().map(lambda x: x[1])
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))

def sum(self):
def _sum(self):
"""
"""
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
Expand Down Expand Up @@ -65,15 +58,22 @@ def func(s, iterator): return chain.from_iterable(imap(f, iterator))
def map(self, f, preservesPartitioning=False):
"""
"""
def func(split, iterator): return imap(f, iterator)
return PipelinedDStream(self, func, preservesPartitioning)
def func(iterator): return imap(f, iterator)
return self.mapPartitions(func)
#return PipelinedDStream(self, func, preservesPartitioning)

def mapPartitions(self, f):
"""
"""
def func(s, iterator): return f(iterator)
return self.mapPartitionsWithIndex(func)

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
"""
return PipelinedDStream(self, f, preservesPartitioning)

def reduce(self, func, numPartitions=None):
"""
Expand All @@ -92,8 +92,8 @@ def combineLocally(iterator):

#TODO for count operation make sure count implementation
# This is different from what pyspark does
if isinstance(x, int):
x = ("", x)
#if isinstance(x, int):
# x = ("", x)

(k, v) = x
if k not in combiners:
Expand Down Expand Up @@ -166,12 +166,6 @@ def _defaultReducePartitions(self):

return self._jdstream.partitions().size()

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
"""
return PipelinedDStream(self, f, preservesPartitioning)

def _defaultReducePartitions(self):
"""
Expand Down

0 comments on commit bd20e17

Please sign in to comment.