Skip to content

Commit

Permalink
added reducedByKey not working yet
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and Ken Takagiwa committed Jul 17, 2014
1 parent 5720979 commit 571d52d
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 2 deletions.
10 changes: 9 additions & 1 deletion examples/src/main/python/streaming/wordcount.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
import sys
from operator import add

from pyspark.conf import SparkConf
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.duration import *

if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <directory>"
exit(-1)
ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
conf = SparkConf()
conf.setAppName("PythonStreamingWordCount")
conf.set("spark.default.parallelism", 1)

# ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
ssc = StreamingContext(conf=conf, duration=Seconds(1))

lines = ssc.textFileStream(sys.argv[1])
fm_lines = lines.flatMap(lambda x: x.split(" "))
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
mapped_lines = fm_lines.map(lambda x: (x, 1))
reduced_lines = mapped_lines.reduce(add)

fm_lines.pyprint()
filtered_lines.pyprint()
mapped_lines.pyprint()
reduced_lines.pyprint()
ssc.start()
ssc.awaitTermination()
26 changes: 25 additions & 1 deletion python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

__all__ = ["DStream"]


class DStream(object):
def __init__(self, jdstream, ssc, jrdd_deserializer):
self._jdstream = jdstream
Expand Down Expand Up @@ -69,7 +70,7 @@ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners,
"""
"""
if numPartitions is None:
numPartitions = self.ctx._defaultParallelism()
numPartitions = self._defaultReducePartitions()
def combineLocally(iterator):
combiners = {}
for x in iterator:
Expand Down Expand Up @@ -130,8 +131,31 @@ def add_shuffle_key(split, iterator):
return dstream

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
"""
return PipelinedDStream(self, f, preservesPartitioning)

def _defaultReducePartitions(self):
"""
"""
# hard code to avoid the error
return 2
if self.ctx._conf.contains("spark.default.parallelism"):
return self.ctx.defaultParallelism
else:
return self.getNumPartitions()

def getNumPartitions(self):
"""
Returns the number of partitions in RDD
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.getNumPartitions()
2
"""
return self._jdstream.partitions().size()


class PipelinedDStream(DStream):
def __init__(self, prev, func, preservesPartitioning=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,91 @@ class PythonDStream[T: ClassTag](
case None => None
}
}
<<<<<<< HEAD

val asJavaDStream = JavaDStream.fromDStream(this)
}
=======
val asJavaDStream = JavaDStream.fromDStream(this)

/**
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
* operator, so this PythonDStream will be registered as an output stream and there materialized.
* Since serialized Python object is readable by Python, pyprint writes out binary data to
* temporary file and run python script to deserialized and print the first ten elements
*/
private[streaming] def ppyprint() {
def foreachFunc = (rdd: RDD[Array[Byte]], time: Time) => {
val iter = rdd.take(11).iterator

// make a temporary file
val prefix = "spark"
val suffix = ".tmp"
val tempFile = File.createTempFile(prefix, suffix)
val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath))
//write out serialized python object
PythonRDD.writeIteratorToStream(iter, tempFileStream)
tempFileStream.close()

// This value has to be passed from python
//val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
//val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
//absolute path to the python script is needed to change because we do not use pysparkstreaming
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pysparkstreaming/streaming/pyprint.py", tempFile.getAbsolutePath)
val workerEnv = pb.environment()

//envVars also need to be pass
//workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
val is = worker.getInputStream()
val isr = new InputStreamReader(is)
val br = new BufferedReader(isr)

println ("-------------------------------------------")
println ("Time: " + time)
println ("-------------------------------------------")

//print value from python std out
var line = ""
breakable {
while (true) {
line = br.readLine()
if (line == null) break()
println(line)
}
}
//delete temporary file
tempFile.delete()
println()

}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}
}


private class PairwiseDStream(prev:DStream[Array[Byte]]) extends
DStream[(Long, Array[Byte])](prev.ssc){
override def dependencies = List(prev)

override def slideDuration: Duration = prev.slideDuration

override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={
prev.getOrCompute(validTime) match{
case Some(rdd)=>Some(rdd)
val pairwiseRDD = new PairwiseRDD(rdd)
Some(pairwiseRDD.asJavaPairRDD.rdd)
case None => None
}
}
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
}





>>>>>>> added reducedByKey not working yet

0 comments on commit 571d52d

Please sign in to comment.