From 47e09704ffbbd0115c02e98ec47f548ee8979812 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Fri, 19 Jun 2015 23:18:21 +0800 Subject: [PATCH] Use the RedirectThread instead --- .../scala/org/apache/spark/util/Utils.scala | 31 +++++++++++++++++++ .../spark/sql/hive/client/ClientWrapper.scala | 29 ++--------------- .../hive/execution/ScriptTransformation.scala | 15 +++------ 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 19157af5b6f4d..6080522c4939c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2333,3 +2333,34 @@ private[spark] class RedirectThread( } } } + +/** + * Circular buffer, which consume all of the data write to it. + */ +private[spark] class CircularBuffer extends java.io.OutputStream { + var pos: Int = 0 + var buffer = new Array[Int](10240) + + def write(i: Int): Unit = { + buffer(pos) = i + pos = (pos + 1) % buffer.size + } + + override def toString: String = { + val (end, start) = buffer.splitAt(pos) + val input = new java.io.InputStream { + val iterator = (start ++ end).iterator + + def read(): Int = if (iterator.hasNext) iterator.next() else -1 + } + val reader = new BufferedReader(new InputStreamReader(input)) + val stringBuilder = new StringBuilder + var line = reader.readLine() + while (line != null) { + stringBuilder.append(line) + stringBuilder.append("\n") + line = reader.readLine() + } + stringBuilder.toString() + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 2f771d76793e5..256b725bb7833 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -22,6 +22,8 @@ import java.net.URI import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet} import javax.annotation.concurrent.GuardedBy +import org.apache.spark.util.CircularBuffer + import scala.collection.JavaConversions._ import scala.language.reflectiveCalls @@ -66,32 +68,7 @@ private[hive] class ClientWrapper( with Logging { // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. - private val outputBuffer = new java.io.OutputStream { - var pos: Int = 0 - var buffer = new Array[Int](10240) - def write(i: Int): Unit = { - buffer(pos) = i - pos = (pos + 1) % buffer.size - } - - override def toString: String = { - val (end, start) = buffer.splitAt(pos) - val input = new java.io.InputStream { - val iterator = (start ++ end).iterator - - def read(): Int = if (iterator.hasNext) iterator.next() else -1 - } - val reader = new BufferedReader(new InputStreamReader(input)) - val stringBuilder = new StringBuilder - var line = reader.readLine() - while(line != null) { - stringBuilder.append(line) - stringBuilder.append("\n") - line = reader.readLine() - } - stringBuilder.toString() - } - } + private val outputBuffer = new CircularBuffer() private val shim = version match { case hive.v12 => new Shim_v0_12() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 6401c0610e146..b372fff3aa015 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors} import org.apache.spark.sql.types.DataType -import org.apache.spark.util.Utils +import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils} /** * Transforms the input by forking and running the specified script. @@ -176,16 +176,9 @@ case class ScriptTransformation( // Consume the error stream from the pipeline, otherwise it will be blocked if // the pipeline is full. - new Thread(new Runnable() { - override def run(): Unit = { - var value = -1 - do { - value = errorStream.read() // consume the error message stream. - } while (value != -1) - - errorStream.close() - } - }, "Thread-ScriptTransformation-STDERR-Consumer").start() + new RedirectThread(errorStream, // input stream from the pipeline + new CircularBuffer(), // output to a circular buffer + "Thread-ScriptTransformation-STDERR-Consumer").start() iterator }