From d9677e1af2dba9551f72f17d9bfc64891f235ae2 Mon Sep 17 00:00:00 2001 From: "zhichao.li" Date: Wed, 27 May 2015 09:33:20 +0800 Subject: [PATCH 1/2] redirect the error desitination to be the same as the current process --- .../hive/execution/ScriptTransformation.scala | 2 ++ .../sql/hive/execution/SQLQuerySuite.scala | 24 +++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index bfd26e0170c70..898395c2296ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io.{BufferedReader, DataInputStream, DataOutputStream, EOFException, InputStreamReader} +import java.lang.ProcessBuilder.Redirect import java.util.Properties import scala.collection.JavaConversions._ @@ -58,6 +59,7 @@ case class ScriptTransformation( child.execute().mapPartitions { iter => val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd) + builder.redirectError(Redirect.INHERIT) val proc = builder.start() val inputStream = proc.getInputStream val outputStream = proc.getOutputStream diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index b707f5e68489b..ee3117ab96090 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.hive.execution +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.time.Span +import org.scalatest.time.Millis + import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException @@ -629,12 +633,24 @@ class SQLQuerySuite extends QueryTest { .queryExecution.analyzed } - test("test script transform") { + test("test script transform for stdout") { val data = (1 to 100000).map { i => (i, i, i) } data.toDF("d1", "d2", "d3").registerTempTable("script_trans") - assert(100000 === - sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans") - .queryExecution.toRdd.count()) + failAfter(Span(20000, Millis)) { + assert(100000 === + sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans") + .queryExecution.toRdd.count()) + } + } + + test("test script transform for stderr") { + val data = (1 to 100000).map { i => (i, i, i) } + data.toDF("d1", "d2", "d3").registerTempTable("script_trans") + failAfter(Span(20000, Millis)) { + assert(0 === + sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans") + .queryExecution.toRdd.count()) + } } test("window function: udaf with aggregate expressin") { From 8418c97397f21528000ccf61170a47048ad864a1 Mon Sep 17 00:00:00 2001 From: "zhichao.li" Date: Fri, 12 Jun 2015 10:32:45 +0800 Subject: [PATCH 2/2] add comments and remove useless failAfter logic --- .../hive/execution/ScriptTransformation.scala | 5 +++++ .../sql/hive/execution/SQLQuerySuite.scala | 20 ++++++------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 898395c2296ed..8f3e2ff7470fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -59,6 +59,11 @@ case class ScriptTransformation( child.execute().mapPartitions { iter => val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd) + // redirectError(Redirect.INHERIT) would consume the error output from buffer and + // then print it to stderr (inherit the target from the current Scala process). + // If without this there would be 2 issues: + // 1) The error msg generated by the script process would be hidden. + // 2) If the error msg is too big to chock up the buffer, the input logic would be hung builder.redirectError(Redirect.INHERIT) val proc = builder.start() val inputStream = proc.getInputStream diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ee3117ab96090..37dfd8b0f52c3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,10 +17,6 @@ package org.apache.spark.sql.hive.execution -import org.scalatest.concurrent.Timeouts._ -import org.scalatest.time.Span -import org.scalatest.time.Millis - import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException @@ -636,21 +632,17 @@ class SQLQuerySuite extends QueryTest { test("test script transform for stdout") { val data = (1 to 100000).map { i => (i, i, i) } data.toDF("d1", "d2", "d3").registerTempTable("script_trans") - failAfter(Span(20000, Millis)) { - assert(100000 === - sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans") - .queryExecution.toRdd.count()) - } + assert(100000 === + sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans") + .queryExecution.toRdd.count()) } test("test script transform for stderr") { val data = (1 to 100000).map { i => (i, i, i) } data.toDF("d1", "d2", "d3").registerTempTable("script_trans") - failAfter(Span(20000, Millis)) { - assert(0 === - sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans") - .queryExecution.toRdd.count()) - } + assert(0 === + sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans") + .queryExecution.toRdd.count()) } test("window function: udaf with aggregate expressin") {