Skip to content

Commit

Permalink
address all comments
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 7, 2014
1 parent 6db00da commit bebeb4a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
8 changes: 4 additions & 4 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ def takeAndPrint(time, rdd):
def mapValues(self, f):
"""
Return a new DStream by applying a map function to the value of
each key-value pairs in 'this' DStream without changing the key.
each key-value pairs in this DStream without changing the key.
"""
map_values_fn = lambda (k, v): (k, f(v))
return self.map(map_values_fn, preservesPartitioning=True)

def flatMapValues(self, f):
"""
Return a new DStream by applying a flatmap function to the value
of each key-value pairs in 'this' DStream without changing the key.
of each key-value pairs in this DStream without changing the key.
"""
flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
return self.flatMap(flat_map_fn, preservesPartitioning=True)
Expand Down Expand Up @@ -276,7 +276,7 @@ def saveAsTextFile(t, rdd):
def transform(self, func):
"""
Return a new DStream in which each RDD is generated by applying a function
on each RDD of 'this' DStream.
on each RDD of this DStream.
`func` can have one argument of `rdd`, or have two arguments of
(`time`, `rdd`)
Expand All @@ -290,7 +290,7 @@ def transform(self, func):
def transformWith(self, func, other, keepSerializer=False):
"""
Return a new DStream in which each RDD is generated by applying a function
on each RDD of 'this' DStream and 'other' DStream.
on each RDD of this DStream and 'other' DStream.
`func` can have two arguments of (`rdd_a`, `rdd_b`) or have three
arguments of (`time`, `rdd_a`, `rdd_b`)
Expand Down
17 changes: 16 additions & 1 deletion python/pyspark/streaming/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@

class TransformFunction(object):
"""
This class is for py4j callback.
This class wraps a function RDD[X] -> RDD[Y] that was passed to
DStream.transform(), allowing it to be called from Java via Py4J's
callback server.
Java calls this function with a sequence of JavaRDDs and this function
returns a single JavaRDD pointer back to Java.
"""
_emptyRDD = None

Expand Down Expand Up @@ -63,6 +68,16 @@ class Java:


class TransformFunctionSerializer(object):
"""
This class implements a serializer for PythonTransformFunction Java
objects.
This is necessary because the Java PythonTransformFunction objects are
actually Py4J references to Python objects and thus are not directly
serializable. When Java needs to serialize a PythonTransformFunction,
it uses this class to invoke Python, which returns the serialized function
as a byte array.
"""
def __init__(self, ctx, serializer, gateway=None):
self.ctx = ctx
self.serializer = serializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ private[python] trait PythonTransformFunctionSerializer {
}

/**
* Wrapper for PythonTransformFunction
* Wraps a PythonTransformFunction (which is a Python object accessed through Py4J)
* so that it looks like a Scala function and can be transparently serialized and
* deserialized by Java.
*/
private[python] class TransformFunction(@transient var pfunc: PythonTransformFunction)
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable {
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] {

def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava))
Expand Down Expand Up @@ -87,6 +89,9 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun

/**
* Helpers for PythonTransformFunctionSerializer
*
* PythonTransformFunctionSerializer is logically a singleton that's happens to be
* implemented as a Python object.
*/
private[python] object PythonTransformFunctionSerializer {

Expand Down Expand Up @@ -119,7 +124,7 @@ private[python] object PythonTransformFunctionSerializer {
}

/**
* Helper functions
* Helper functions, which are called from Python via Py4J.
*/
private[python] object PythonDStream {

Expand Down

0 comments on commit bebeb4a

Please sign in to comment.