Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema #29663

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -65,8 +65,9 @@ import org.apache.spark.util.Utils
* metadata. For example, when reading a partitioned table from a file system, partition columns
* will be inferred from the directory layout even if they are not specified.
*
* @param paths A list of file system paths that hold data. These will be globbed before and
* qualified. This option only works when reading from a [[FileFormat]].
* @param paths A list of file system paths that hold data. These will be globbed before if
* the "__globPaths__" option is true, and will be qualified. This option only works
* when reading from a [[FileFormat]].
* @param userSpecifiedSchema An optional specification of the schema of the data. When present
* we skip attempting to infer the schema.
* @param partitionColumns A list of column names that the relation is partitioned by. This list is
Expand Down Expand Up @@ -97,6 +98,15 @@ case class DataSource(
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver

/**
* Whether or not paths should be globbed before being used to access files.
*/
def globPaths: Boolean = {
options.get(DataSource.GLOB_PATHS_KEY)
.map(_ == "true")
.getOrElse(true)
}

bucketSpec.map { bucket =>
SchemaUtils.checkColumnNameDuplication(
bucket.bucketColumnNames, "in the bucket definition", equality)
Expand Down Expand Up @@ -223,7 +233,7 @@ case class DataSource(
// For glob pattern, we do not check it because the glob pattern might only make sense
// once the streaming job starts and some upstream source starts dropping data.
val hdfsPath = new Path(path)
if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
if (!fs.exists(hdfsPath)) {
throw new AnalysisException(s"Path does not exist: $path")
Expand Down Expand Up @@ -550,7 +560,11 @@ case class DataSource(
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
val globPath = if (globPaths) {
SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
} else {
qualified :: Nil
}

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
Expand Down Expand Up @@ -741,4 +755,9 @@ object DataSource extends Logging {
""".stripMargin)
}
}

/**
* The key in the "options" map for deciding whether or not to glob paths before use.
*/
val GLOB_PATHS_KEY = "__globPaths__"
}
Expand Up @@ -276,7 +276,7 @@ object TextInputCSVDataSource extends CSVDataSource {
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
options = options.parameters
options = options.parameters ++ Map(DataSource.GLOB_PATHS_KEY -> "false")
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
} else {
Expand Down
Expand Up @@ -119,7 +119,7 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession,
paths = inputPaths.map(_.getPath.toString),
className = classOf[TextFileFormat].getName,
options = parsedOptions.parameters
options = parsedOptions.parameters.originalMap ++ Map(DataSource.GLOB_PATHS_KEY -> "false")
).resolveRelation(checkFilesExist = false))
.select("value").as(Encoders.STRING)
}
Expand Down
Expand Up @@ -946,4 +946,27 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}
}

test("SPARK-32810: CSV and JSON data sources should be able to read files with " +
"escaped glob metacharacter in the paths") {
def escape(str: String): String = {
"""(\[|\]|\{|\})""".r.replaceAllIn(str, """\\$1""")
}

withTempDir { dir =>
val basePath = dir.getCanonicalPath

// test CSV writer / reader without specifying schema
val csvTableName = "[abc]"
spark.range(3).coalesce(1).write.csv(s"$basePath/$csvTableName")
val csvDf = spark.read.csv(s"$basePath/${escape(csvTableName)}")
assert(csvDf.collect sameElements Array(Row("0"), Row("1"), Row("2")))

// test JSON writer / reader without specifying schema
val jsonTableName = "{def}"
spark.range(3).coalesce(1).write.json(s"$basePath/$jsonTableName")
val jsonDf = spark.read.json(s"$basePath/${escape(jsonTableName)}")
assert(jsonDf.collect sameElements Array(Row(0), Row(1), Row(2)))
}
}
}