From 490e338374bef5265796332f7b0a5defe6839754 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 12:15:06 -0700 Subject: [PATCH] sorted the import following Spark coding convention --- .../streaming/api/python/PythonDStream.scala | 120 ++---------------- 1 file changed, 13 insertions(+), 107 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 719dd0a6a53c2..9d4eebaadc4c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -19,42 +19,28 @@ package org.apache.spark.streaming.api.python import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} -import org.apache.spark.broadcast.Broadcast +import scala.reflect.ClassTag + import org.apache.spark._ -import org.apache.spark.util.Utils -import java.io._ -import scala.Some -import org.apache.spark.streaming.Duration -import scala.util.control.Breaks._ -import org.apache.spark.broadcast.Broadcast -import scala.Some -import org.apache.spark.streaming.Duration import org.apache.spark.rdd.RDD -import org.apache.spark.api.python.PythonRDD - - +import org.apache.spark.api.python._ +import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.api.java._ -import org.apache.spark.rdd.RDD -import org.apache.spark.api.python._ -import org.apache.spark.api.python.PairwiseRDD - -import scala.reflect.ClassTag class PythonDStream[T: ClassTag]( - parent: DStream[T], - command: Array[Byte], - envVars: JMap[String, String], - pythonIncludes: JList[String], - preservePartitoning: Boolean, - pythonExec: String, - broadcastVars: JList[Broadcast[Array[Byte]]], - accumulator: Accumulator[JList[Array[Byte]]] - ) extends DStream[Array[Byte]](parent.ssc) { + parent: DStream[T], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitoning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[Array[Byte]]], + accumulator: Accumulator[JList[Array[Byte]]]) + extends DStream[Array[Byte]](parent.ssc) { override def dependencies = List(parent) @@ -70,84 +56,4 @@ class PythonDStream[T: ClassTag]( } } val asJavaDStream = JavaDStream.fromDStream(this) - - /** - * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output - * operator, so this PythonDStream will be registered as an output stream and there materialized. - * Since serialized Python object is readable by Python, pyprint writes out binary data to - * temporary file and run python script to deserialized and print the first ten elements - */ - private[streaming] def ppyprint() { - def foreachFunc = (rdd: RDD[Array[Byte]], time: Time) => { - val iter = rdd.take(11).iterator - - // make a temporary file - val prefix = "spark" - val suffix = ".tmp" - val tempFile = File.createTempFile(prefix, suffix) - val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath)) - //write out serialized python object - PythonRDD.writeIteratorToStream(iter, tempFileStream) - tempFileStream.close() - - // This value has to be passed from python - //val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") - val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") - //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile??? - //absolute path to the python script is needed to change because we do not use pysparkstreaming - val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pysparkstreaming/streaming/pyprint.py", tempFile.getAbsolutePath) - val workerEnv = pb.environment() - - //envVars also need to be pass - //workerEnv.putAll(envVars) - val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH") - workerEnv.put("PYTHONPATH", pythonPath) - val worker = pb.start() - val is = worker.getInputStream() - val isr = new InputStreamReader(is) - val br = new BufferedReader(isr) - - println ("-------------------------------------------") - println ("Time: " + time) - println ("-------------------------------------------") - - //print value from python std out - var line = "" - breakable { - while (true) { - line = br.readLine() - if (line == null) break() - println(line) - } - } - //delete temporary file - tempFile.delete() - println() - - } - new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() - } } - - -private class PairwiseDStream(prev:DStream[Array[Byte]]) extends -DStream[(Long, Array[Byte])](prev.ssc){ - override def dependencies = List(prev) - - override def slideDuration: Duration = prev.slideDuration - - override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={ - prev.getOrCompute(validTime) match{ - case Some(rdd)=>Some(rdd) - val pairwiseRDD = new PairwiseRDD(rdd) - Some(pairwiseRDD.asJavaPairRDD.rdd) - case None => None - } - } - val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this) -} - - - - -