Skip to content

Commit

Permalink
Propagate TaskContext to writer threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 29, 2015
1 parent 57c9b4e commit b4b1702
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ private[spark] class PythonRDD(

override def run(): Unit = Utils.logUncaughtExceptions {
try {
TaskContext.setTaskContext(context)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
val dataOut = new DataOutputStream(stream)
// Partition index
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,15 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
partition: Int): Unit = {

val env = SparkEnv.get
val taskContext = TaskContext.get()
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val stream = new BufferedOutputStream(output, bufferSize)

new Thread("writer for R") {
override def run(): Unit = {
try {
SparkEnv.set(env)
TaskContext.setTaskContext(taskContext)
val dataOut = new DataOutputStream(stream)
dataOut.writeInt(partition)

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private[spark] class PipedRDD[T: ClassTag](
// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + command) {
override def run() {
TaskContext.setTaskContext(context)
val out = new PrintWriter(proc.getOutputStream)

// scalastyle:off println
Expand Down

0 comments on commit b4b1702

Please sign in to comment.