Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL #34575

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -965,7 +965,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}

private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
case r: DataSourceV2Relation => r.withMetadataColumns()
case s: ExposesMetadataColumns => s.withMetadataColumns()
case p: Project =>
p.copy(
projectList = p.metadataOutput ++ p.projectList,
Expand Down
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.trees.TreePattern
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, METADATA_COL_ATTR_KEY}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.BitSet
Expand Down Expand Up @@ -438,3 +438,22 @@ object VirtualColumn {
val groupingIdName: String = "spark_grouping_id"
val groupingIdAttribute: UnresolvedAttribute = UnresolvedAttribute(groupingIdName)
}

/**
* The internal representation of the hidden metadata struct:
* set `__metadata_col` to `true` in AttributeReference metadata
* - apply() will create a metadata attribute reference
* - unapply() will check if an attribute reference is the metadata attribute reference
*/
object MetadataAttribute {
def apply(name: String, dataType: DataType, nullable: Boolean = true): AttributeReference =
AttributeReference(name, dataType, nullable,
new MetadataBuilder().putBoolean(METADATA_COL_ATTR_KEY, value = true).build())()

def unapply(attr: AttributeReference): Option[AttributeReference] = {
if (attr.metadata.contains(METADATA_COL_ATTR_KEY)
&& attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)) {
Some(attr)
} else None
}
}
Expand Up @@ -276,3 +276,10 @@ object LogicalPlanIntegrity {
checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
}
}

/**
* A logical plan node that can generate metadata columns
*/
trait ExposesMetadataColumns extends LogicalPlan {
def withMetadataColumns(): LogicalPlan
}
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
Expand All @@ -44,7 +44,7 @@ case class DataSourceV2Relation(
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap)
extends LeafNode with MultiInstanceRelation with NamedRelation {
extends LeafNode with MultiInstanceRelation with NamedRelation with ExposesMetadataColumns {

import DataSourceV2Implicits._

Expand Down
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -194,10 +195,17 @@ case class FileSourceScanExec(
disableBucketedScan: Boolean = false)
extends DataSourceScanExec {

lazy val metadataStructCol: Option[AttributeReference] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we assuming that only 1 column in output corresponds to MetadataAttribute(_)? Is there some place in code where we are enforcing this?

output.collectFirst { case MetadataAttribute(attr) => attr }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a design problem: do we prefer 4 flat columns or one struct-type column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"4 flat columns" was the original design. As per suggestions: changed from 4 flat columns to 4 fields under one struct:

  • wrap in _metadata, potentially reduce name conflict with user's columns
  • can query all available metadata information by selecting _metadata
  • easy to maintain and extend (?)

I can see there's a clear disadvantage in doing struct-type: if users only select _metadata.file_size, we still need to fill all fields (hurt the performance). But it could be improved/fixed in follow-up PRs.


// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
// schema without the file metadata struct column
val fileSchema = if (metadataStructCol.isEmpty) schema else {
output.filter(_.exprId != metadataStructCol.get.exprId).toStructType
}
relation.fileFormat.supportBatch(relation.sparkSession, fileSchema)
}

private lazy val needsUnsafeRowConversion: Boolean = {
Expand All @@ -212,7 +220,12 @@ case class FileSourceScanExec(
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)
relation.sparkSession.sessionState.conf).map { vectorTypes =>
// for column-based file format, append metadata struct column's vector type classes if any
vectorTypes ++ (if (metadataStructCol.isDefined) {
Seq(classOf[OnHeapColumnVector].getName)
} else Seq.empty)
}

private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty

Expand Down Expand Up @@ -355,7 +368,15 @@ case class FileSourceScanExec(
@transient
private lazy val pushedDownFilters = {
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
// TODO: should be able to push filters containing metadata struct down to skip files
dataFilters
.filterNot(
_.references.exists {
case MetadataAttribute(_) => true
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
case _ => false
}
)
.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dataFilters
.filterNot(
_.references.exists {
case MetadataAttribute(_) => true
case _ => false
}
)
.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
dataFilters.filterNot(_.references.exists {
case MetadataAttribute(_) => true
case _ => false
}).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))

}

override lazy val metadata: Map[String, String] = {
Expand Down Expand Up @@ -597,7 +618,8 @@ case class FileSourceScanExec(
}
}

new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
requiredSchema, metadataStructCol)
}

/**
Expand Down Expand Up @@ -653,7 +675,8 @@ case class FileSourceScanExec(
val partitions =
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
requiredSchema, metadataStructCol)
}

// Filters unused DynamicPruningExpression expressions - one which has been replaced
Expand Down
Expand Up @@ -36,7 +36,8 @@ object PartitionedFileUtil {
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts,
file.getModificationTime, file.getLen)
}
} else {
Seq(getPartitionedFile(file, filePath, partitionValues))
Expand All @@ -48,7 +49,8 @@ object PartitionedFileUtil {
filePath: Path,
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts)
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts,
file.getModificationTime, file.getLen)
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
Expand Down
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType}


/**
Expand Down Expand Up @@ -171,6 +171,29 @@ trait FileFormat {
def supportFieldName(name: String): Boolean = true
}

object FileFormat {

val FILE_PATH = "file_path"

val FILE_NAME = "file_name"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering do we also plan to deprecate existing expression InputFileName in Spark?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think we should, as InputFileName is really fragile and can't be used with join for example.


val FILE_SIZE = "file_size"

val FILE_MODIFICATION_TIME = "file_modification_time"

val METADATA_NAME = "_metadata"

// supported metadata struct fields for hadoop fs relation
val METADATA_STRUCT: StructType = new StructType()
.add(StructField(FILE_PATH, StringType))
.add(StructField(FILE_NAME, StringType))
.add(StructField(FILE_SIZE, LongType))
.add(StructField(FILE_MODIFICATION_TIME, LongType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be TimestampType?

Copy link
Contributor Author

@Yaohua628 Yaohua628 Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more like a design choice? I think both are fine, I don't have a strong opinion on it.
long matches what the file modification tells you directly;
timestamp is more readable;
WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one is an easy decision. Timestamp type is much better as people can do WHERE _metadata.modificationTime < TIMESTAMP'2020-12-12 12:12:12' or other datetime operations. And df.show can also display the value in a more user-readable format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, it makes sense! addressed.


// create a file metadata struct col
def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT)
}

/**
* The base class file format that is based on text file.
*/
Expand Down