Skip to content

Commit

Permalink
[SPARK-37769][SQL][FOLLOWUP] Filtering files if metadata columns are …
Browse files Browse the repository at this point in the history
…present in the data filter

### What changes were proposed in this pull request?
Follow-up PR of #34575. Filtering files if metadata columns are present in the data filter.

### Why are the changes needed?
Performance improvements.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UTs and a new UT.

Closes #35055 from Yaohua628/spark-37769.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
Yaohua628 authored and cloud-fan committed Jan 19, 2022
1 parent 61abae3 commit 817d1d7
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 17 deletions.
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String


/**
Expand Down Expand Up @@ -192,6 +193,36 @@ object FileFormat {

// create a file metadata struct col
def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT)

// create an internal row given required metadata fields and file information
def createMetadataInternalRow(
fieldNames: Seq[String],
filePath: Path,
fileSize: Long,
fileModificationTime: Long): InternalRow =
updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames,
filePath, fileSize, fileModificationTime)

// update an internal row given required metadata fields and file information
def updateMetadataInternalRow(
row: InternalRow,
fieldNames: Seq[String],
filePath: Path,
fileSize: Long,
fileModificationTime: Long): InternalRow = {
fieldNames.zipWithIndex.foreach { case (name, i) =>
name match {
case FILE_PATH => row.update(i, UTF8String.fromString(filePath.toString))
case FILE_NAME => row.update(i, UTF8String.fromString(filePath.getName))
case FILE_SIZE => row.update(i, fileSize)
case FILE_MODIFICATION_TIME =>
// the modificationTime from the file is in millisecond,
// while internally, the TimestampType `file_modification_time` is stored in microsecond
row.update(i, fileModificationTime * 1000L)
}
}
row
}
}

/**
Expand Down
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.datasources.FileFormat._
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.NextIterator

/**
Expand Down Expand Up @@ -136,18 +135,8 @@ class FileScanRDD(
*/
private def updateMetadataRow(): Unit = {
if (metadataColumns.nonEmpty && currentFile != null) {
val path = new Path(currentFile.filePath)
metadataColumns.zipWithIndex.foreach { case (attr, i) =>
attr.name match {
case FILE_PATH => metadataRow.update(i, UTF8String.fromString(path.toString))
case FILE_NAME => metadataRow.update(i, UTF8String.fromString(path.getName))
case FILE_SIZE => metadataRow.update(i, currentFile.fileSize)
case FILE_MODIFICATION_TIME =>
// the modificationTime from the file is in millisecond,
// while internally, the TimestampType is stored in microsecond
metadataRow.update(i, currentFile.modificationTime * 1000L)
}
}
updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name),
new Path(currentFile.filePath), currentFile.fileSize, currentFile.modificationTime)
}
}

Expand Down
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.FileFormat.createMetadataInternalRow
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -71,8 +72,37 @@ abstract class PartitioningAwareFileIndex(
def isNonEmptyFile(f: FileStatus): Boolean = {
isDataPath(f.getPath) && f.getLen > 0
}

// retrieve the file metadata filters and reduce to a final filter expression
val fileMetadataFilterOpt = dataFilters.filter(_.references.forall {
case MetadataAttribute(_) => true
case _ => false
}).reduceOption(expressions.And)

// - create a bound references for filters: put the metadata struct at 0 position for each file
// - retrieve the final metadata struct (could be pruned) from filters
val boundedFilterMetadataStructOpt = fileMetadataFilterOpt.map { fileMetadataFilter =>
val metadataStruct = fileMetadataFilter.references.head.dataType
val boundedFilter = Predicate.createInterpreted(fileMetadataFilter.transform {
case _: AttributeReference => BoundReference(0, metadataStruct, nullable = true)
})
(boundedFilter, metadataStruct)
}

def matchFileMetadataPredicate(f: FileStatus): Boolean = {
// use option.forall, so if there is no filter no metadata struct, return true
boundedFilterMetadataStructOpt.forall { case (boundedFilter, metadataStruct) =>
val row = InternalRow.fromSeq(Seq(
createMetadataInternalRow(metadataStruct.asInstanceOf[StructType].names,
f.getPath, f.getLen, f.getModificationTime)
))
boundedFilter.eval(row)
}
}

val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
PartitionDirectory(InternalRow.empty, allFiles()
.filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(f))) :: Nil
} else {
if (recursiveFileLookup) {
throw new IllegalArgumentException(
Expand All @@ -83,7 +113,8 @@ abstract class PartitioningAwareFileIndex(
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f))
existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f) &&
matchFileMetadataPredicate(f))

case None =>
// Directory does not exist, or has no children files
Expand Down
Expand Up @@ -279,16 +279,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}

metadataColumnsTest("filter", schema) { (df, f0, _) =>
val filteredDF = df.select("name", "age", METADATA_FILE_NAME)
.where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME))

// check the filtered file
val partitions = filteredDF.queryExecution.sparkPlan.collectFirst {
case p: FileSourceScanExec => p.selectedPartitions
}.get

assert(partitions.length == 1) // 1 partition
assert(partitions.head.files.length == 1) // 1 file in that partition
assert(partitions.head.files.head.getPath.toString == f0(METADATA_FILE_PATH)) // the file is f0

// check result
checkAnswer(
df.select("name", "age", METADATA_FILE_NAME)
.where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME)),
filteredDF,
Seq(
// _file_name == f0's name, so we will only have 1 row
Row("jack", 24, f0(METADATA_FILE_NAME))
)
)
}

metadataColumnsTest("filter on metadata and user data", schema) { (df, _, f1) =>

val filteredDF = df.select("name", "age", "info",
METADATA_FILE_NAME, METADATA_FILE_PATH,
METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME)
// mix metadata column + user column
.where(Column(METADATA_FILE_NAME) === f1(METADATA_FILE_NAME) and Column("name") === "lily")
// only metadata columns
.where(Column(METADATA_FILE_PATH) === f1(METADATA_FILE_PATH))
// only user column
.where("age == 31")

// check the filtered file
val partitions = filteredDF.queryExecution.sparkPlan.collectFirst {
case p: FileSourceScanExec => p.selectedPartitions
}.get

assert(partitions.length == 1) // 1 partition
assert(partitions.head.files.length == 1) // 1 file in that partition
assert(partitions.head.files.head.getPath.toString == f1(METADATA_FILE_PATH)) // the file is f1

// check result
checkAnswer(
filteredDF,
Seq(Row("lily", 31, Row(54321L, "ucb"),
f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
)
}

Seq(true, false).foreach { caseSensitive =>
metadataColumnsTest(s"upper/lower case when case " +
s"sensitive is $caseSensitive", schemaWithNameConflicts) { (df, f0, f1) =>
Expand Down

0 comments on commit 817d1d7

Please sign in to comment.