From 68df46a1093132ad3d5eef3e7a3d669105083236 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 20 Feb 2016 23:08:45 -0800 Subject: [PATCH 1/4] Change FileWriteAheadLogger to not depend on deprecated ThreadPoolContext --- .../spark/streaming/util/FileBasedWriteAheadLog.scala | 10 +++++----- .../spark/streaming/util/WriteAheadLogSuite.scala | 5 ++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 314263f26ee60..bae9d13d62319 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -22,7 +22,7 @@ import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.collection.parallel.ThreadPoolTaskSupport +import scala.collection.parallel.ExecutionContextTaskSupport import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.postfixOps @@ -144,7 +144,7 @@ private[streaming] class FileBasedWriteAheadLog( } else { // For performance gains, it makes sense to parallelize the recovery if // closeFileAfterWrite = true - seqToParIterator(threadpool, logFilesToRead, readFile).asJava + seqToParIterator(executionContext, logFilesToRead, readFile).asJava } } @@ -288,11 +288,11 @@ private[streaming] object FileBasedWriteAheadLog { * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize. */ def seqToParIterator[I, O]( - tpool: ThreadPoolExecutor, + executionContext: ExecutionContext, source: Seq[I], handler: I => Iterator[O]): Iterator[O] = { - val taskSupport = new ThreadPoolTaskSupport(tpool) - val groupSize = tpool.getMaximumPoolSize.max(8) + val taskSupport = new ExecutionContextTaskSupport(executionContext) + val groupSize = taskSupport.parallelismLevel.max(8) source.grouped(groupSize).flatMap { group => val parallelCollection = group.par parallelCollection.tasksupport = taskSupport diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 734dd93cda471..05242feb65679 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -229,6 +229,8 @@ class FileBasedWriteAheadLogSuite */ val numThreads = 8 val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool") + val executionContext = ExecutionContext.fromExecutorService(tpool) + class GetMaxCounter { private val value = new AtomicInteger() @volatile private var max: Int = 0 @@ -258,7 +260,8 @@ class FileBasedWriteAheadLogSuite val t = new Thread() { override def run() { // run the calculation on a separate thread so that we can release the latch - val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle) + val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](executionContext, + testSeq, handle) collected = iterator.toSeq } } From 9991ebce07b6fabe122719ba1c4690e733297b18 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 20 Feb 2016 23:13:40 -0800 Subject: [PATCH 2/4] Remove now unused import --- .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index bae9d13d62319..b0186b8382a88 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.util import java.nio.ByteBuffer import java.util.{Iterator => JIterator} -import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor} +import java.util.concurrent.RejectedExecutionException import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer From f2e6a25ae2581e9b6cc43950c1302c26a8602c2a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 20 Feb 2016 23:19:27 -0800 Subject: [PATCH 3/4] Update the comment to point out if the thread pool is smaller than 8, that is the number of objects kept in memory. --- .../spark/streaming/util/FileBasedWriteAheadLog.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index b0186b8382a88..9c2ccb330a771 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -283,9 +283,10 @@ private[streaming] object FileBasedWriteAheadLog { /** * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory - * at any given time, where `n` is the size of the thread pool. This is crucial for use cases - * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to - * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize. + * at any given time, where `n` is at most the max of the size of the thread pool or 8. This is + * crucial for use cases where we create `FileBasedWriteAheadLogReader`s during parallel recovery. + * We don't want to open up `k` streams altogether where `k` is the size of the Seq that we want + * to parallelize. */ def seqToParIterator[I, O]( executionContext: ExecutionContext, From ed9ff855871bfef69b39b1b9b791d1ce487093bd Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 21 Feb 2016 17:07:56 -0800 Subject: [PATCH 4/4] Use a ForkJoinPool as the base --- .../src/main/scala/org/apache/spark/util/ThreadUtils.scala | 7 +++++++ .../spark/streaming/util/FileBasedWriteAheadLog.scala | 7 ++----- .../apache/spark/streaming/util/WriteAheadLogSuite.scala | 6 +++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index f9fbe2ff858ce..d6ee183cb76ef 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -156,4 +156,11 @@ private[spark] object ThreadUtils { result } } + + /** + * Construct a new ForkJoinPool with a specified max parallelism. + */ + def newForkJoinPool(maxThreadNumber: Int): scala.concurrent.forkjoin.ForkJoinPool = { + new scala.concurrent.forkjoin.ForkJoinPool(maxThreadNumber) + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 9c2ccb330a771..b35ef43984abe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -59,11 +59,8 @@ private[streaming] class FileBasedWriteAheadLog( private val pastLogs = new ArrayBuffer[LogInfo] private val callerName = getCallerName - private val threadpoolName = { - "WriteAheadLogManager" + callerName.map(c => s" for $c").getOrElse("") - } - private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 20) - private val executionContext = ExecutionContext.fromExecutorService(threadpool) + private val forkJoinPool = ThreadUtils.newForkJoinPool(20) + private val executionContext = ExecutionContext.fromExecutorService(forkJoinPool) override protected def logName = { getClass.getName.stripSuffix("$") + diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 05242feb65679..9c7de63667e4c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -228,8 +228,8 @@ class FileBasedWriteAheadLogSuite the list of files. */ val numThreads = 8 - val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool") - val executionContext = ExecutionContext.fromExecutorService(tpool) + val fpool = ThreadUtils.newForkJoinPool(numThreads) + val executionContext = ExecutionContext.fromExecutorService(fpool) class GetMaxCounter { private val value = new AtomicInteger() @@ -276,7 +276,7 @@ class FileBasedWriteAheadLogSuite // make sure we didn't open too many Iterators assert(counter.getMax() <= numThreads) } finally { - tpool.shutdownNow() + fpool.shutdownNow() } }