Skip to content

Commit

Permalink
merge with remote branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and Ken Takagiwa committed Jul 18, 2014
2 parents ae464e0 + 69e9cd3 commit 7d05109
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
18 changes: 18 additions & 0 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
return PipelinedDStream(self, f, preservesPartitioning)

def _defaultReducePartitions(self):
"""
"""
# hard code to avoid the error
if self.ctx._conf.contains("spark.default.parallelism"):
return self.ctx.defaultParallelism
else:
return self.getNumPartitions()

return self._jdstream.partitions().size()

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
"""
return PipelinedDStream(self, f, preservesPartitioning)

def _defaultReducePartitions(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,4 @@ DStream[Array[Byte]](prev.ssc){
}
}
val asJavaDStream = JavaDStream.fromDStream(this)
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<<<<<<< HEAD
/*
=======
>>>>>>> 69e9cd33a58b880f96cc9c3e5e62eaa415c49843
package org.apache.spark.streaming.api.python
import org.apache.spark.Accumulator
Expand All @@ -12,8 +15,16 @@ import org.apache.spark.streaming.dstream.DStream
import scala.reflect.ClassTag
<<<<<<< HEAD
class PythonTransformedDStream[T: ClassTag](
parent: DStream[T],
=======
/**
* Created by ken on 7/15/14.
*/
class PythonTransformedDStream[T: ClassTag](
parents: Seq[DStream[T]],
>>>>>>> 69e9cd33a58b880f96cc9c3e5e62eaa415c49843
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
Expand All @@ -29,6 +40,7 @@ class PythonTransformedDStream[T: ClassTag](
//pythonDStream compute
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
<<<<<<< HEAD
// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
// parents.map(_.getOrCompute(validTime).orNull).to
Expand Down

0 comments on commit 7d05109

Please sign in to comment.