Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xuanyuanking committed Sep 9, 2018
1 parent 4782afe commit 1319cd3
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
import java.util.concurrent.Callable

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
Expand All @@ -40,7 +41,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -266,6 +267,41 @@ class SparkHadoopUtil extends Logging {
}
}

/**
* Return all paths represented by the wildcard string. This will be done in main thread by
* default while the value of config `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0,
* a local thread pool will expand the globbed paths. The mainly difference between this parallel
* mode and `InMemoryFileIndex.listLeafFiles` is we need firstly expand paths represented by
* wildcard and then use local thread pool to get the path in parralle.
*/
def getGlobbedPaths(
parallelGetGlobbedPathEnabled: Boolean,
parallelGetGlobbedPathThreshold: Int,
parallelGetGlobbedPathNumThreads: Int,
fs: FileSystem,
hadoopConf: Configuration,
qualified: Path): Seq[Path] = {
if (!parallelGetGlobbedPathEnabled) {
globPathIfNecessary(fs, qualified)
} else {
val paths = expandGlobPath(fs, qualified, parallelGetGlobbedPathThreshold)
val numThreads =
Math.min(paths.size, parallelGetGlobbedPathNumThreads)
val threadPool = ThreadUtils.newDaemonCachedThreadPool(
"parallel-get-globbed-paths-thread-pool", numThreads)
val result = paths.map { path =>
threadPool.submit(new Callable[Seq[Path]] {
override def call(): Seq[Path] = {
val fs = path.getFileSystem(hadoopConf)
globPathIfNecessary(fs, path)
}
})
}.flatMap(_.get)
threadPool.shutdownNow()
result
}
}

/**
* Lists all the files in a directory with the specified prefix, and does not end with the
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1832,9 +1832,6 @@ class SQLConf extends Serializable with Logging {
def parallelGetGlobbedPathNumThreads: Int =
getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_NUM_THREADS)

def parallelGetGlobbedPathEnabled: Boolean =
parallelGetGlobbedPathNumThreads > 0

def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)

def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ case class DataSource(
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver

private val parallelGetGlobbedPathNumThreads =
sparkSession.sessionState.conf.parallelGetGlobbedPathNumThreads
private val parallelGetGlobbedPathEnabled = parallelGetGlobbedPathNumThreads > 0
private val parallelGetGlobbedPathThreshold =
sparkSession.sessionState.conf.parallelGetGlobbedPathThreshold

bucketSpec.map { bucket =>
SchemaUtils.checkColumnNameDuplication(
bucket.bucketColumnNames, "in the bucket definition", equality)
Expand Down Expand Up @@ -544,7 +550,13 @@ case class DataSource(
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = DataSource.getGlobbedPaths(sparkSession, fs, hadoopConf, qualified)
val globPath = SparkHadoopUtil.get.getGlobbedPaths(
parallelGetGlobbedPathEnabled,
parallelGetGlobbedPathThreshold,
parallelGetGlobbedPathNumThreads,
fs,
hadoopConf,
qualified)

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
Expand Down Expand Up @@ -735,37 +747,4 @@ object DataSource extends Logging {
""".stripMargin)
}
}

/**
* Return all paths represented by the wildcard string.
* This will be done in main thread by default while the value of config
* `spark.sql.sources.parallelGetGlobbedPath.numThreads` > 0, a local thread
* pool will expand the globbed paths.
*/
private def getGlobbedPaths(
sparkSession: SparkSession,
fs: FileSystem,
hadoopConf: Configuration,
qualified: Path): Seq[Path] = {
if (!sparkSession.sessionState.conf.parallelGetGlobbedPathEnabled) {
SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
} else {
val getGlobbedPathThreshold = sparkSession.sessionState.conf.parallelGetGlobbedPathThreshold
val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified, getGlobbedPathThreshold)
val numThreads =
Math.min(paths.size, sparkSession.sessionState.conf.parallelGetGlobbedPathNumThreads)
val threadPool = ThreadUtils.newDaemonCachedThreadPool(
"parallel-get-globbed-paths-thread-pool", numThreads)
val result = paths.map { path =>
threadPool.submit(new Callable[Seq[Path]] {
override def call(): Seq[Path] = {
val fs = path.getFileSystem(hadoopConf)
SparkHadoopUtil.get.globPathIfNecessary(fs, path)
}
})
}.flatMap(_.get)
threadPool.shutdownNow()
result
}
}
}

0 comments on commit 1319cd3

Please sign in to comment.