Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use ThreadUtils.newForkJoinPool in this class instead of ForkJoinPool directly?


import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Partition for UnionRDD.
Expand Down Expand Up @@ -61,7 +60,7 @@ private[spark] class UnionPartition[T: ClassTag](

object UnionRDD {
private[spark] lazy val partitionEvalTaskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))
new ForkJoinTaskSupport(ThreadUtils.newForkJoinPool("partition-eval-task-support", 8))
}

@DeveloperApi
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.language.higherKinds
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal

import org.apache.spark.SparkException
Expand Down Expand Up @@ -181,17 +180,17 @@ private[spark] object ThreadUtils {
}

/**
* Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
* Construct a new ForkJoinPool with a specified max parallelism and name prefix.
*/
def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this okay for Scala-2.11, too? We still need to support Scala-2.11, don't we?

According to SPARK-13398, this was a hack for Scala-2.11 because we can't use Java's ForkJoinPool directly in Scala 2.11 since it uses a ExecutionContext which reports system parallelism.

Hi, @holdenk . Could you give us some opinion on this? The old issue is resolved now? Are we safe to revert the old SPARK-13398?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed we are still supporting Scala-2.11. It wouldn't even compile for Scala-2.11 because of the parameter type of ForkJoinTaskSupport. Thanks for pointing it out!

// Custom factory to set thread names
val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
override def newThread(pool: SForkJoinPool) =
new SForkJoinWorkerThread(pool) {
val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
override def newThread(pool: ForkJoinPool) =
new ForkJoinWorkerThread(pool) {
setName(prefix + "-" + super.getName)
}
}
new SForkJoinPool(maxThreadNumber, factory,
new ForkJoinPool(maxThreadNumber, factory,
null, // handler
false // asyncMode
)
Expand Down