Skip to content

Commit

Permalink
[SPARK-17396][CORE] Share the task support between UnionRDD instances.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Share the ForkJoinTaskSupport between UnionRDD instances to avoid creating a huge number of threads if lots of RDDs are created at the same time.

## How was this patch tested?

This uses existing UnionRDD tests.

Author: Ryan Blue <blue@apache.org>

Closes #14985 from rdblue/SPARK-17396-use-shared-pool.
  • Loading branch information
rdblue authored and srowen committed Sep 10, 2016
1 parent bcdd259 commit 6ea5055
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag

Expand Down Expand Up @@ -58,6 +58,11 @@ private[spark] class UnionPartition[T: ClassTag](
}
}

object UnionRDD {
private[spark] lazy val partitionEvalTaskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))
}

@DeveloperApi
class UnionRDD[T: ClassTag](
sc: SparkContext,
Expand All @@ -68,13 +73,10 @@ class UnionRDD[T: ClassTag](
private[spark] val isPartitionListingParallel: Boolean =
rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)

@transient private lazy val partitionEvalTaskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))

override def getPartitions: Array[Partition] = {
val parRDDs = if (isPartitionListingParallel) {
val parArray = rdds.par
parArray.tasksupport = partitionEvalTaskSupport
parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
parArray
} else {
rdds
Expand Down

0 comments on commit 6ea5055

Please sign in to comment.