From a70569cc0a0f89433e61224732a3fe09aa2669f7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 2 Nov 2014 18:00:47 +0800 Subject: [PATCH] Fixes race condition in CliSuite --- .../sql/hive/thriftserver/CliSuite.scala | 35 ++++++++----------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 8a72e9d2aef57..e8ffbc5b954d4 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -18,19 +18,17 @@ package org.apache.spark.sql.hive.thriftserver +import java.io._ + import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.{Await, Promise} import scala.sys.process.{Process, ProcessLogger} -import java.io._ -import java.util.concurrent.atomic.AtomicInteger - import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.spark.{SparkException, Logging} +import org.apache.spark.Logging import org.apache.spark.sql.catalyst.util.getTempFilePath class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { @@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { """.stripMargin.split("\\s+").toSeq ++ extraArgs } - // AtomicInteger is needed because stderr and stdout of the forked process are handled in - // different threads. - val next = new AtomicInteger(0) + var next = 0 val foundAllExpectedAnswers = Promise.apply[Unit]() val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes) val buffer = new ArrayBuffer[String]() + val lock = new Object - def captureOutput(source: String)(line: String) { + def captureOutput(source: String)(line: String): Unit = lock.synchronized { buffer += s"$source> $line" - // If we haven't found all expected answers... - if (next.get() < expectedAnswers.size) { - // If another expected answer is found... - if (line.startsWith(expectedAnswers(next.get()))) { - // If all expected answers have been found... - if (next.incrementAndGet() == expectedAnswers.size) { - foundAllExpectedAnswers.trySuccess(()) - } + // If we haven't found all expected answers and another expected answer comes up... + if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) { + next += 1 + // If all expected answers have been found... + if (next == expectedAnswers.size) { + foundAllExpectedAnswers.trySuccess(()) } } } @@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging { |======================= |Spark SQL CLI command line: ${command.mkString(" ")} | - |Executed query ${next.get()} "${queries(next.get())}", - |But failed to capture expected output "${expectedAnswers(next.get())}" within $timeout. + |Executed query $next "${queries(next)}", + |But failed to capture expected output "${expectedAnswers(next)}" within $timeout. | |${buffer.mkString("\n")} |===========================