diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 459cee31f2e8e..998fa24eba91b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -174,7 +174,7 @@ class PythonDStream[T: ClassTag]( } } - +/* private class PairwiseDStream(prev:DStream[Array[Byte]]) extends DStream[(Long, Array[Byte])](prev.ssc){ override def dependencies = List(prev) @@ -277,6 +277,7 @@ class PythonTransformedDStream( ======= val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this) } +*/ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index f6dcd2fb88f45..5377dfa52d461 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -619,10 +619,7 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } - - - - +//TODO move pyprint to PythonDStream /** * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output * operator, so this PythonDStream will be registered as an output stream and there materialized. @@ -643,6 +640,7 @@ abstract class DStream[T: ClassTag] ( tempFileStream.close() // This value has to be passed from python + // Python currently does not do cluster deployment. But what happened val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???