Skip to content

Commit

Permalink
Use the RedirectThread instead
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed Jun 29, 2015
1 parent 1de771d commit 47e0970
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 37 deletions.
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2333,3 +2333,34 @@ private[spark] class RedirectThread(
}
}
}

/**
* Circular buffer, which consume all of the data write to it.
*/
private[spark] class CircularBuffer extends java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](10240)

def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.size
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while (line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
import javax.annotation.concurrent.GuardedBy

import org.apache.spark.util.CircularBuffer

import scala.collection.JavaConversions._
import scala.language.reflectiveCalls

Expand Down Expand Up @@ -66,32 +68,7 @@ private[hive] class ClientWrapper(
with Logging {

// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](10240)
def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.size
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while(line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
private val outputBuffer = new CircularBuffer()

private val shim = version match {
case hive.v12 => new Shim_v0_12()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.Utils
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}

/**
* Transforms the input by forking and running the specified script.
Expand Down Expand Up @@ -176,16 +176,9 @@ case class ScriptTransformation(

// Consume the error stream from the pipeline, otherwise it will be blocked if
// the pipeline is full.
new Thread(new Runnable() {
override def run(): Unit = {
var value = -1
do {
value = errorStream.read() // consume the error message stream.
} while (value != -1)

errorStream.close()
}
}, "Thread-ScriptTransformation-STDERR-Consumer").start()
new RedirectThread(errorStream, // input stream from the pipeline
new CircularBuffer(), // output to a circular buffer
"Thread-ScriptTransformation-STDERR-Consumer").start()

iterator
}
Expand Down

0 comments on commit 47e0970

Please sign in to comment.