Skip to content

Commit

Permalink
reimplement by using local thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
xuanyuanking committed Jan 23, 2018
1 parent b8a28eb commit dc373ae
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,15 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val PARALLEL_GET_GLOBBED_PATH_PARALLELISM =
buildConf("spark.sql.sources.parallelGetGlobbedPath.parallelism")
.doc("The number of threads to get a collection of path in parallel. Set the " +
"number to avoid generating too many threads.")
.intConf
.checkValue(parallel => parallel >= 0, "The maximum number of threads allowed for getting " +
"globbed paths at driver side must not be negative")
.createWithDefault(32)

// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
Expand Down Expand Up @@ -1351,6 +1360,9 @@ class SQLConf extends Serializable with Logging {
def parallelPartitionDiscoveryParallelism: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)

def parallelGetGlobbedPathParallelism: Int =
getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_PARALLELISM)

def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources

import java.util.{Locale, ServiceConfigurationError, ServiceLoader}
import java.util.concurrent.Callable

import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
Expand All @@ -44,7 +45,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
Expand Down Expand Up @@ -154,8 +155,7 @@ case class DataSource(
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
DataSource.getGlobbedPaths(sparkSession, fs, serializableConfiguration, qualified)
DataSource.getGlobbedPaths(sparkSession, fs, hadoopConf, qualified)
}.toArray
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
}
Expand Down Expand Up @@ -675,8 +675,7 @@ object DataSource extends Logging {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val globPath = getGlobbedPaths(sparkSession, fs, serializableConfiguration, qualified)
val globPath = getGlobbedPaths(sparkSession, fs, hadoopConf, qualified)

if (globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
Expand All @@ -691,28 +690,33 @@ object DataSource extends Logging {

/**
* Return all paths represented by the wildcard string.
* Follow [[InMemoryFileIndex]].bulkListLeafFile and reuse the conf.
* Use a local thread pool to do this while there's too many paths.
*/
private def getGlobbedPaths(
sparkSession: SparkSession,
fs: FileSystem,
hadoopConf: SerializableConfiguration,
hadoopConf: Configuration,
qualified: Path): Seq[Path] = {
val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified)
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
if (paths.size < sparkSession.sessionState.conf.parallelGetGlobbedPathParallelism) {
SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
} else {
val parallelPartitionDiscoveryParallelism =
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
val expanded = sparkSession.sparkContext
.parallelize(paths, numParallelism)
.map { pathString =>
val path = new Path(pathString)
val fs = path.getFileSystem(hadoopConf.value)
SparkHadoopUtil.get.globPathIfNecessary(fs, path).map(_.toString)
}.collect()
expanded.flatMap(paths => paths.map(new Path(_))).toSeq
val parallelGetGlobbedPathParallelism =
sparkSession.sessionState.conf.parallelGetGlobbedPathParallelism
val numParallelism = Math.min(paths.size, parallelGetGlobbedPathParallelism * 2)
val threadPool = ThreadUtils.newDaemonCachedThreadPool(
"parallel-get-globbed-paths-thread-pool", numParallelism)
val result = paths.map { pathStr =>
threadPool.submit(new Callable[Seq[Path]] {
override def call(): Seq[Path] = {
val path = new Path(pathStr)
val fs = path.getFileSystem(hadoopConf)
SparkHadoopUtil.get.globPathIfNecessary(fs, path)
}
})
}.flatMap(_.get)
threadPool.shutdownNow()
result
}
}
}

0 comments on commit dc373ae

Please sign in to comment.