Skip to content

Commit

Permalink
fix print and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 27, 2014
1 parent b32774c commit 74df565
Showing 1 changed file with 23 additions and 33 deletions.
56 changes: 23 additions & 33 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from itertools import chain, ifilter, imap
import operator
from datetime import datetime

from pyspark import RDD
from pyspark.storagelevel import StorageLevel
Expand Down Expand Up @@ -54,17 +55,6 @@ def sum(self):
"""
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

def print_(self, label=None):
"""
Since print is reserved name for python, we cannot define a "print" method function.
This function prints serialized data in RDD in DStream because Scala and Java cannot
deserialized pickled python object. Please use DStream.pyprint() to print results.
Call DStream.print() and this function will print byte array in the DStream
"""
# a hack to call print function in DStream
getattr(self._jdstream, "print")(label)

def filter(self, f):
"""
Return a new DStream containing only the elements that satisfy predicate.
Expand Down Expand Up @@ -154,19 +144,15 @@ def foreachRDD(self, func):
jfunc = RDDFunction(self.ctx, func, self._jrdd_deserializer)
self.ctx._jvm.PythonForeachDStream(self._jdstream.dstream(), jfunc)

def pyprint(self):
def pprint(self):
"""
Print the first ten elements of each RDD generated in this DStream. This is an output
operator, so this DStream will be registered as an output stream and there materialized.
"""
def takeAndPrint(rdd, time):
"""
Closure to take element from RDD and print first 10 elements.
This closure is called by py4j callback server.
"""
taken = rdd.take(11)
print "-------------------------------------------"
print "Time: %s" % (str(time))
print "Time: %s" % datetime.fromtimestamp(time / 1000.0)
print "-------------------------------------------"
for record in taken[:10]:
print record
Expand All @@ -176,6 +162,20 @@ def takeAndPrint(rdd, time):

self.foreachRDD(takeAndPrint)

def collect(self):
"""
Collect each RDDs into the returned list.
:return: list, which will have the collected items.
"""
result = []

def get_output(rdd, time):
r = rdd.collect()
result.append(r)
self.foreachRDD(get_output)
return result

def mapValues(self, f):
"""
Pass each value in the key-value pair RDD through a map function
Expand All @@ -196,9 +196,9 @@ def flatMapValues(self, f):

def glom(self):
"""
Return a new DStream in which RDD is generated by applying glom() to RDD of
this DStream. Applying glom() to an RDD coalesces all elements within each partition into
an list.
Return a new DStream in which RDD is generated by applying glom()
to RDD of this DStream. Applying glom() to an RDD coalesces all
elements within each partition into an list.
"""
def func(iterator):
yield list(iterator)
Expand Down Expand Up @@ -228,11 +228,11 @@ def checkpoint(self, interval):
Mark this DStream for checkpointing. It will be saved to a file inside the
checkpoint directory set with L{SparkContext.setCheckpointDir()}
@param interval: Time interval after which generated RDD will be checkpointed
interval has to be pyspark.streaming.duration.Duration
@param interval: time in seconds, after which generated RDD will
be checkpointed
"""
self.is_checkpointed = True
self._jdstream.checkpoint(interval._jduration)
self._jdstream.checkpoint(self._ssc._jduration(interval))
return self

def groupByKey(self, numPartitions=None):
Expand All @@ -245,7 +245,6 @@ def groupByKey(self, numPartitions=None):
Note: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey will provide much
better performance.
"""
return self.transform(lambda rdd: rdd.groupByKey(numPartitions))

Expand Down Expand Up @@ -288,15 +287,6 @@ def saveAsPickleFile(rdd, time):

return self.foreachRDD(saveAsPickleFile)

def collect(self):
result = []

def get_output(rdd, time):
r = rdd.collect()
result.append(r)
self.foreachRDD(get_output)
return result

def transform(self, func):
return TransformedDStream(self, lambda a, t: func(a), True)

Expand Down

0 comments on commit 74df565

Please sign in to comment.