From 13275d388e5d630ebff0e84c57799b36e303c3c2 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Thu, 4 Feb 2016 17:59:04 -0800 Subject: [PATCH 1/3] Replace future calls with Future --- .../spark/deploy/FaultToleranceTest.scala | 8 ++++---- .../apache/spark/deploy/worker/Worker.scala | 2 +- .../apache/spark/JobCancellationSuite.scala | 18 +++++++++--------- .../ShuffleBlockFetcherIteratorSuite.scala | 6 +++--- .../expressions/CodeGenerationSuite.scala | 2 +- .../execution/joins/BroadcastHashJoin.scala | 2 +- .../joins/BroadcastHashOuterJoin.scala | 2 +- .../thriftserver/HiveThriftServer2Suites.scala | 4 ++-- 8 files changed, 22 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index c0ede4b7c8734..0a84ac019d093 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -22,7 +22,7 @@ import java.net.URL import java.util.concurrent.TimeoutException import scala.collection.mutable.ListBuffer -import scala.concurrent.{future, promise, Await} +import scala.concurrent.{Future, Promise, Await} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.language.postfixOps @@ -249,7 +249,7 @@ private object FaultToleranceTest extends App with Logging { /** This includes Client retry logic, so it may take a while if the cluster is recovering. */ private def assertUsable() = { - val f = future { + val f = Future { try { val res = sc.parallelize(0 until 10).collect() assertTrue(res.toList == (0 until 10)) @@ -283,7 +283,7 @@ private object FaultToleranceTest extends App with Logging { numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1 } - val f = future { + val f = Future { try { while (!stateValid()) { Thread.sleep(1000) @@ -405,7 +405,7 @@ private object SparkDocker { } private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = { - val ipPromise = promise[String]() + val ipPromise = Promise[String]() val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir()) val outStream: FileWriter = new FileWriter(outFile) def findIpAndLog(line: String): Unit = { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 179d3b9f20b1f..df3c286a0a66f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -394,7 +394,7 @@ private[deploy] class Worker( // rpcEndpoint. // Copy ids so that it can be used in the cleanup thread. val appIds = executors.values.map(_.appId).toSet - val cleanupFuture = concurrent.future { + val cleanupFuture = concurrent.Future { val appDirs = workDir.listFiles() if (appDirs == null) { throw new IOException("ERROR: Failed to list files in " + appDirs) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index e13a442463e8d..c347ab8dc8020 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.Semaphore import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.concurrent.future +import scala.concurrent.Future import org.scalatest.BeforeAndAfter import org.scalatest.Matchers @@ -103,7 +103,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft val rdd1 = rdd.map(x => x) - future { + Future { taskStartedSemaphore.acquire() sc.cancelAllJobs() taskCancelledSemaphore.release(100000) @@ -126,7 +126,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft }) // jobA is the one to be cancelled. - val jobA = future { + val jobA = Future { sc.setJobGroup("jobA", "this is a job to be cancelled") sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count() } @@ -191,7 +191,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft }) // jobA is the one to be cancelled. - val jobA = future { + val jobA = Future { sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true) sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count() } @@ -231,7 +231,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft val f2 = rdd.countAsync() // Kill one of the action. - future { + Future { sem1.acquire() f1.cancel() JobCancellationSuite.twoJobsSharingStageSemaphore.release(10) @@ -247,7 +247,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft // Cancel before launching any tasks { val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync() - future { f.cancel() } + Future { f.cancel() } val e = intercept[SparkException] { f.get() } assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) } @@ -263,7 +263,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft }) val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync() - future { + Future { // Wait until some tasks were launched before we cancel the job. sem.acquire() f.cancel() @@ -277,7 +277,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft // Cancel before launching any tasks { val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000) - future { f.cancel() } + Future { f.cancel() } val e = intercept[SparkException] { f.get() } assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) } @@ -292,7 +292,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } }) val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000) - future { + Future { sem.acquire() f.cancel() } diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 828153bdbfc44..c9c2fb2691d70 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -21,7 +21,7 @@ import java.io.InputStream import java.util.concurrent.Semaphore import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.future +import scala.concurrent.Future import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito._ @@ -149,7 +149,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] - future { + Future { // Return the first two blocks, and wait till task completion before returning the 3rd one listener.onBlockFetchSuccess( ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0))) @@ -211,7 +211,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] - future { + Future { // Return the first block, and then fail. listener.onBlockFetchSuccess( ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 0c42e2fc7c5e5..b5413fbe2bbcc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -36,7 +36,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { import scala.concurrent.duration._ val futures = (1 to 20).map { _ => - future { + Future { GeneratePredicate.generate(EqualTo(Literal(1), Literal(1))) GenerateMutableProjection.generate(EqualTo(Literal(1), Literal(1)) :: Nil) GenerateOrdering.generate(Add(Literal(1), Literal(1)).asc :: Nil) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 8b275e886c46c..943ad31c0cef5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -77,7 +77,7 @@ case class BroadcastHashJoin( // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - future { + Future { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(sparkContext, executionId) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala index db8edd169dcfa..f48fc3b84864d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala @@ -76,7 +76,7 @@ case class BroadcastHashOuterJoin( // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - future { + Future { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(sparkContext, executionId) { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index ba3b26e1b7d49..29a5ab03e9f06 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -362,7 +362,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { try { // Start a very-long-running query that will take hours to finish, then cancel it in order // to demonstrate that cancellation works. - val f = future { + val f = Future { statement.executeQuery( "SELECT COUNT(*) FROM test_map " + List.fill(10)("join test_map").mkString(" ")) @@ -380,7 +380,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { // Cancellation is a no-op if spark.sql.hive.thriftServer.async=false statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") try { - val sf = future { + val sf = Future { statement.executeQuery( "SELECT COUNT(*) FROM test_map " + List.fill(4)("join test_map").mkString(" ") From 055d9c9b1db92fa16c925b13c0e8e451e27d3e8c Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Thu, 4 Feb 2016 19:23:46 -0800 Subject: [PATCH 2/3] Reorder imports to pass stylecheck --- .../main/scala/org/apache/spark/deploy/FaultToleranceTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 0a84ac019d093..15d220d01b461 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -22,7 +22,7 @@ import java.net.URL import java.util.concurrent.TimeoutException import scala.collection.mutable.ListBuffer -import scala.concurrent.{Future, Promise, Await} +import scala.concurrent.{Await, Future, Promise} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.language.postfixOps From 4078df044a563d2897153ab6ea801de7cc05f055 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Fri, 5 Feb 2016 16:38:19 -0800 Subject: [PATCH 3/3] Replace missing futures --- .../spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 29a5ab03e9f06..865197e24caf8 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -23,7 +23,7 @@ import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{future, Await, ExecutionContext, Promise} +import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.concurrent.duration._ import scala.io.Source import scala.util.{Random, Try}