Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20408][SQL] Get the glob path in parallel to reduce resolve relation time #17702

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,18 @@ class SparkHadoopUtil extends Logging {
if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
}

def expandGlobPath(fs: FileSystem, pattern: Path): Seq[String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests for this method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add UT in SparkHadoopUtilSuite.scala

val arr = pattern.toString.split("/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not parse the path string ourselves, it's too risky, we may miss some special cases like windows path, escape character, etc. Let's take a look at org.apache.hadoop.fs.Globber and see if we can reuse some parser API there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply, agree with you.

val firstIdx = arr.indexWhere(_.exists("{}[]*?\\".toSet.contains))
if (isGlobPath(pattern) && firstIdx != arr.length - 1) {
val parentPath = arr.slice(0, firstIdx + 1).mkString("/")
Option(fs.globStatus(new Path(parentPath))).map{ statuses =>
statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
+ "/" + arr.slice(firstIdx + 1, arr.length).mkString("/")).toSeq
}.getOrElse(Seq.empty[String])
} else Seq(pattern.toString)
}

/**
* Lists all the files in a directory with the specified prefix, and does not end with the
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@

package org.apache.spark.deploy

import java.io.File
import java.security.PrivilegedExceptionAction

import scala.util.Random

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.hadoop.security.UserGroupInformation
import org.scalatest.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.{LocalSparkContext, SparkFunSuite}
import org.apache.spark.util.Utils

class SparkHadoopUtilSuite extends SparkFunSuite with Matchers {
class SparkHadoopUtilSuite extends SparkFunSuite with Matchers with LocalSparkContext{
test("check file permission") {
import FsAction._
val testUser = s"user-${Random.nextInt(100)}"
Expand Down Expand Up @@ -77,6 +80,48 @@ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers {
})
}

test("test expanding glob path") {
val tmpDir = Utils.createTempDir()
val rootDir = tmpDir.getCanonicalPath
try {
// Prepare nested dir env: /tmpPath/dir-${dirIndex}/part-${fileIndex}
for (i <- 1 to 10) {
val dirStr = new StringFormat(i.toString)
val curDir = rootDir + dirStr.formatted("/dir-%4s").replaceAll(" ", "0")
for (j <- 1 to 10) {
val fileStr = new StringFormat(j.toString)
val file = new File(curDir, fileStr.formatted("/part-%4s").replaceAll(" ", "0"))

file.getParentFile.exists() || file.getParentFile.mkdirs()
file.createNewFile()
}
}
val sparkHadoopUtil = new SparkHadoopUtil
val fs = FileSystem.getLocal(new Configuration())

// test partial match
sparkHadoopUtil.expandGlobPath(fs, new Path(s"${rootDir}/dir-000[1-5]/*"))
.sortWith(_.compareTo(_) < 0) should be(Seq(
s"file:${rootDir}/dir-0001/*",
s"file:${rootDir}/dir-0002/*",
s"file:${rootDir}/dir-0003/*",
s"file:${rootDir}/dir-0004/*",
s"file:${rootDir}/dir-0005/*"))

// test wild cast on the leaf files
sparkHadoopUtil.expandGlobPath(fs, new Path(s"${rootDir}/dir-0001/*")) should be(Seq(
s"${rootDir}/dir-0001/*"))

// test path is not globPath
sparkHadoopUtil.expandGlobPath(fs, new Path(s"${rootDir}/dir-0001/part-0001")) should be(Seq(
s"${rootDir}/dir-0001/part-0001"))

// test the wrong wild cast
sparkHadoopUtil.expandGlobPath(fs, new Path(s"${rootDir}/000[1-5]/*")) should be(
Seq.empty[String])
} finally Utils.deleteRecursively(tmpDir)
}

private def fileStatus(
owner: String,
group: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,15 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val PARALLEL_GET_GLOBBED_PATH_PARALLELISM =
buildConf("spark.sql.sources.parallelGetGlobbedPath.parallelism")
.doc("The number of threads to get a collection of path in parallel. Set the " +
"number to avoid generating too many threads.")
.intConf
.checkValue(parallel => parallel >= 0, "The maximum number of threads allowed for getting " +
"globbed paths at driver side must not be negative")
.createWithDefault(32)

// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
Expand Down Expand Up @@ -1351,6 +1360,9 @@ class SQLConf extends Serializable with Logging {
def parallelPartitionDiscoveryParallelism: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)

def parallelGetGlobbedPathParallelism: Int =
getConf(SQLConf.PARALLEL_GET_GLOBBED_PATH_PARALLELISM)

def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.spark.sql.execution.datasources

import java.util.{Locale, ServiceConfigurationError, ServiceLoader}
import java.util.concurrent.Callable

import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
Expand All @@ -44,7 +45,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
Expand Down Expand Up @@ -154,7 +155,7 @@ case class DataSource(
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
DataSource.getGlobbedPaths(sparkSession, fs, hadoopConf, qualified)
}.toArray
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
}
Expand Down Expand Up @@ -384,7 +385,8 @@ case class DataSource(
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.flatMap(
DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray
DataSource.checkAndGlobPathIfNecessary(hadoopConf,
_, checkFilesExist, sparkSession)).toArray

val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)
Expand Down Expand Up @@ -668,11 +670,12 @@ object DataSource extends Logging {
private def checkAndGlobPathIfNecessary(
hadoopConf: Configuration,
path: String,
checkFilesExist: Boolean): Seq[Path] = {
checkFilesExist: Boolean,
sparkSession: SparkSession): Seq[Path] = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
val globPath = getGlobbedPaths(sparkSession, fs, hadoopConf, qualified)

if (globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
Expand All @@ -684,4 +687,36 @@ object DataSource extends Logging {
}
globPath
}

/**
* Return all paths represented by the wildcard string.
* Use a local thread pool to do this while there's too many paths.
*/
private def getGlobbedPaths(
sparkSession: SparkSession,
fs: FileSystem,
hadoopConf: Configuration,
qualified: Path): Seq[Path] = {
val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified)
if (paths.size < sparkSession.sessionState.conf.parallelGetGlobbedPathParallelism) {
SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
} else {
val parallelGetGlobbedPathParallelism =
sparkSession.sessionState.conf.parallelGetGlobbedPathParallelism
val numParallelism = Math.min(paths.size, parallelGetGlobbedPathParallelism * 2)
val threadPool = ThreadUtils.newDaemonCachedThreadPool(
"parallel-get-globbed-paths-thread-pool", numParallelism)
val result = paths.map { pathStr =>
threadPool.submit(new Callable[Seq[Path]] {
override def call(): Seq[Path] = {
val path = new Path(pathStr)
val fs = path.getFileSystem(hadoopConf)
SparkHadoopUtil.get.globPathIfNecessary(fs, path)
}
})
}.flatMap(_.get)
threadPool.shutdownNow()
result
}
}
}