Skip to content

Commit

Permalink
reduceByKey is working
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Sep 20, 2014
1 parent c455c8d commit 6f98e50
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 83 deletions.
Binary file added examples/src/main/python/streaming/wordcount.pyc
Binary file not shown.
6 changes: 2 additions & 4 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ def add_shuffle_key(split, iterator):
keyed = PipelinedDStream(self, add_shuffle_key)
keyed._bypass_serializer = True
with _JavaStackTrace(self.ctx) as st:
#JavaDStream
pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream()
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc))
jdstream = pairDStream.partitionBy(partitioner).values()
id(partitionFunc))
jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream()
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
# This is required so that id(partitionFunc) remains unique, even if
# partitionFunc is a lambda:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class PythonDStream[T: ClassTag](
case None => None
}
}
<<<<<<< HEAD

val asJavaDStream = JavaDStream.fromDStream(this)

Expand Down Expand Up @@ -134,87 +133,31 @@ DStream[(Long, Array[Byte])](prev.ssc){
}
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
}
=======
val asJavaDStream = JavaDStream.fromDStream(this)

/**
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
* operator, so this PythonDStream will be registered as an output stream and there materialized.
* Since serialized Python object is readable by Python, pyprint writes out binary data to
* temporary file and run python script to deserialized and print the first ten elements
*/
private[streaming] def ppyprint() {
def foreachFunc = (rdd: RDD[Array[Byte]], time: Time) => {
val iter = rdd.take(11).iterator

// make a temporary file
val prefix = "spark"
val suffix = ".tmp"
val tempFile = File.createTempFile(prefix, suffix)
val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath))
//write out serialized python object
PythonRDD.writeIteratorToStream(iter, tempFileStream)
tempFileStream.close()

// This value has to be passed from python
//val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
//val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
//absolute path to the python script is needed to change because we do not use pysparkstreaming
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pysparkstreaming/streaming/pyprint.py", tempFile.getAbsolutePath)
val workerEnv = pb.environment()

//envVars also need to be pass
//workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
val is = worker.getInputStream()
val isr = new InputStreamReader(is)
val br = new BufferedReader(isr)

println ("-------------------------------------------")
println ("Time: " + time)
println ("-------------------------------------------")

//print value from python std out
var line = ""
breakable {
while (true) {
line = br.readLine()
if (line == null) break()
println(line)
}
}
//delete temporary file
tempFile.delete()
println()

}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}
}


private class PairwiseDStream(prev:DStream[Array[Byte]]) extends
DStream[(Long, Array[Byte])](prev.ssc){
private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
DStream[Array[Byte]](prev.ssc){
override def dependencies = List(prev)

override def slideDuration: Duration = prev.slideDuration

override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={
override def compute(validTime:Time):Option[RDD[Array[Byte]]]={
prev.getOrCompute(validTime) match{
case Some(rdd)=>Some(rdd)
val pairwiseRDD = new PairwiseRDD(rdd)
Some(pairwiseRDD.asJavaPairRDD.rdd)
/*
* This is equivalent to following python code
* with _JavaStackTrace(self.context) as st:
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
* partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
* id(partitionFunc))
* jrdd = pairRDD.partitionBy(partitioner).values()
* rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
*/
Some(pairwiseRDD.asJavaPairRDD.partitionBy(partitioner).values().rdd)
case None => None
}
}
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
val asJavaDStream = JavaDStream.fromDStream(this)
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
}





>>>>>>> added reducedByKey not working yet
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/*
package org.apache.spark.streaming.api.python
import org.apache.spark.Accumulator
Expand All @@ -10,11 +12,8 @@ import org.apache.spark.streaming.dstream.DStream
import scala.reflect.ClassTag
/**
* Created by ken on 7/15/14.
*/
class PythonTransformedDStream[T: ClassTag](
parents: Seq[DStream[T]],
parent: DStream[T],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
Expand All @@ -30,8 +29,14 @@ class PythonTransformedDStream[T: ClassTag](
//pythonDStream compute
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
Some()
// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
// parents.map(_.getOrCompute(validTime).orNull).to
// parent = parents.head.asInstanceOf[RDD]
// Some()
}
val asJavaDStream = JavaDStream.fromDStream(this)
val asJavaDStream = JavaDStream.fromDStream(this)
}
*/

0 comments on commit 6f98e50

Please sign in to comment.