Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 30, 2014
1 parent 8466916 commit a13ff34
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 42 deletions.
32 changes: 30 additions & 2 deletions examples/src/main/python/streaming/hdfs_wordcount.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,42 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in new text files created in the given directory
Usage: hdfs_wordcount.py <directory>
<directory> is the directory that Spark Streaming will use to find and read new text files.
To run this on your local machine on directory `localdir`, run this example
$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localdir
Then create a text file in `localdir` and the words in the file will get counted.
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <directory>"
print >> sys.stderr, "Usage: hdfs_wordcount.py <directory>"
exit(-1)

sc = SparkContext(appName="PythonStreamingWordCount")
sc = SparkContext(appName="PythonStreamingHDFSWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.textFileStream(sys.argv[1])
Expand Down
30 changes: 29 additions & 1 deletion examples/src/main/python/streaming/network_wordcount.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,39 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <hostname> <port>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: wordcount <hostname> <port>"
print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>"
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
Expand Down
10 changes: 5 additions & 5 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,11 @@ def transform(self, dstreams, transformFunc):
jdstreams = ListConverter().convert([d._jdstream for d in dstreams],
SparkContext._gateway._gateway_client)
# change the final serializer to sc.serializer
jfunc = RDDFunction(self._sc,
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
*[d._jrdd_deserializer for d in dstreams])

jdstream = self._jvm.PythonDStream.callTransform(self._jssc, jdstreams, jfunc)
func = RDDFunction(self._sc,
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
*[d._jrdd_deserializer for d in dstreams])
jfunc = self._jvm.RDDFunction(func)
jdstream = self._jssc.transform(jdstreams, jfunc)
return DStream(jdstream, self, self._sc.serializer)

def union(self, *dstreams):
Expand Down
3 changes: 0 additions & 3 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,6 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash):
"""
return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc))

# def foreach(self, func):
# return self.foreachRDD(lambda _, rdd: rdd.foreach(func))

def foreachRDD(self, func):
"""
Apply a function to each RDD in this DStream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ private[python] class RDDFunction(@transient var pfunc: PythonRDDFunction)
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable {

def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
PythonDStream.some(pfunc.call(time.milliseconds, List(PythonDStream.wrapRDD(rdd)).asJava))
Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava)).map(_.rdd)
}

def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
val rdds = List(PythonDStream.wrapRDD(rdd), PythonDStream.wrapRDD(rdd2)).asJava
PythonDStream.some(pfunc.call(time.milliseconds, rdds))
val rdds = List(rdd.map(JavaRDD.fromRDD(_)).orNull, rdd2.map(JavaRDD.fromRDD(_)).orNull).asJava
Option(pfunc.call(time.milliseconds, rdds)).map(_.rdd)
}

// for function.Function2
Expand Down Expand Up @@ -115,39 +115,13 @@ private[python] object PythonDStream {
serializer = new RDDFunctionSerializer(ser)
}

// convert Option[RDD[_]] to JavaRDD, handle null gracefully
def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
if (rdd.isDefined) {
JavaRDD.fromRDD(rdd.get)
} else {
null
}
}

// convert JavaRDD to Option[RDD[Array[Byte]]] to , handle null gracefully
def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = {
if (jrdd != null) {
Some(jrdd.rdd)
} else {
None
}
}

// helper function for DStream.foreachRDD(),
// cannot be `foreachRDD`, it will confusing py4j
def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonRDDFunction){
def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonRDDFunction) {
val func = new RDDFunction((pfunc))
jdstream.dstream.foreachRDD((rdd, time) => func(Some(rdd), time))
}

// helper function for ssc.transform()
def callTransform(ssc: JavaStreamingContext, jdsteams: JList[JavaDStream[_]],
pyfunc: PythonRDDFunction)
:JavaDStream[Array[Byte]] = {
val func = new RDDFunction(pyfunc)
ssc.transform(jdsteams, func)
}

// convert list of RDD into queue of RDDs, for ssc.queueStream()
def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): java.util.Queue[JavaRDD[Array[Byte]]] = {
val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
Expand Down Expand Up @@ -232,7 +206,7 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
func(parent.getOrCompute(validTime), parent2.getOrCompute(validTime), validTime)
}

val asJavaDStream = JavaDStream.fromDStream(this)
val asJavaDStream = JavaDStream.fromDStream(this)
}

/**
Expand Down

0 comments on commit a13ff34

Please sign in to comment.