Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26368][SQL] Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex #23317

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,14 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
* @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
* @param getFileIndex [[InMemoryFileIndex]] for getting partition schema and file list
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for moving it to the caller side.

// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = fileIndex.getOrElse {
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
}
getFileIndex: () => InMemoryFileIndex): (StructType, StructType) = {
lazy val tempFileIndex = getFileIndex()

val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
Expand Down Expand Up @@ -236,7 +229,15 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
}
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)

val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, () => {
// The operations below are expensive therefore try not to do them if we don't need to,
// e.g., in streaming mode, we have already inferred and registered partition columns,
// we will never have to materialize the lazy val below
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
})
SourceInfo(
s"FileSource[$path]",
StructType(dataSchema ++ partitionSchema),
Expand Down Expand Up @@ -370,7 +371,7 @@ case class DataSource(
} else {
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, Some(index))
getOrInferFileFormatSchema(format, () => index)
(index, resultDataSchema, resultPartitionSchema)
}

Expand Down