Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL #34575

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -965,7 +965,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}

private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
case r: DataSourceV2Relation => r.withMetadataColumns()
case s: ExposesMetadataColumns => s.withMetadataColumns()
case p: Project =>
p.copy(
projectList = p.metadataOutput ++ p.projectList,
Expand Down
Expand Up @@ -438,3 +438,70 @@ object VirtualColumn {
val groupingIdName: String = "spark_grouping_id"
val groupingIdAttribute: UnresolvedAttribute = UnresolvedAttribute(groupingIdName)
}

/**
* The internal representation of the hidden metadata column
*/
class MetadataAttribute(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will think about this new class. Maybe have something like AttributeReferenceBase trait.

override val name: String,
override val dataType: DataType,
override val nullable: Boolean = true,
override val metadata: Metadata = Metadata.empty)(
override val exprId: ExprId = NamedExpression.newExprId,
override val qualifier: Seq[String] = Seq.empty[String])
extends AttributeReference(name, dataType, nullable, metadata)(exprId, qualifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not extend AttributeReference, otherwise copy can cause issues


// use to resolve supported metadata column references (e.g. different casings)
override def withName(newName: String): MetadataAttribute = {
if (name == newName) {
this
} else {
MetadataAttribute(newName, dataType, nullable, metadata)(exprId, qualifier)
}
}

override def withNullability(newNullability: Boolean): MetadataAttribute = {
if (nullable == newNullability) {
this
} else {
MetadataAttribute(name, dataType, newNullability, metadata)(exprId, qualifier)
}
}

override def withQualifier(newQualifier: Seq[String]): MetadataAttribute = {
if (qualifier == newQualifier) {
this
} else {
MetadataAttribute(name, dataType, nullable, metadata)(exprId, newQualifier)
}
}

override def withExprId(newExprId: ExprId): MetadataAttribute = {
if (exprId == newExprId) {
this
} else {
MetadataAttribute(name, dataType, nullable, metadata)(newExprId, qualifier)
}
}

override def withDataType(newType: DataType): MetadataAttribute = {
MetadataAttribute(name, newType, nullable, metadata)(exprId, qualifier)
}

override def newInstance(): MetadataAttribute =
MetadataAttribute(name, dataType, nullable, metadata)(exprId, qualifier)

override def withMetadata(newMetadata: Metadata): MetadataAttribute = {
MetadataAttribute(name, dataType, nullable, newMetadata)(exprId, qualifier)
}
}

object MetadataAttribute {

def apply(name: String, dataType: DataType): MetadataAttribute =
new MetadataAttribute(name, dataType, true)()

def apply(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata)
(exprId: ExprId, qualifier: Seq[String]): MetadataAttribute =
new MetadataAttribute(name, dataType, nullable, metadata)(exprId, qualifier)
}
Expand Up @@ -276,3 +276,10 @@ object LogicalPlanIntegrity {
checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
}
}

/**
* A logical plan node with exposed metadata columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A logical plan node that can generate metadata columns

*/
trait ExposesMetadataColumns extends LogicalPlan {
def withMetadataColumns(): ExposesMetadataColumns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need it to return ExposesMetadataColumns? Can't it just return LogicalPlan?

}
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
Expand All @@ -44,7 +44,7 @@ case class DataSourceV2Relation(
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap)
extends LeafNode with MultiInstanceRelation with NamedRelation {
extends LeafNode with MultiInstanceRelation with NamedRelation with ExposesMetadataColumns {

import DataSourceV2Implicits._

Expand Down
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -194,10 +195,17 @@ case class FileSourceScanExec(
disableBucketedScan: Boolean = false)
extends DataSourceScanExec {

lazy val outputMetadataStruct: Option[MetadataAttribute] =
output.collectFirst { case meta: MetadataAttribute => meta }

// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
// schema without file metadata column
val fileSchema = if (outputMetadataStruct.isEmpty) schema else {
StructType.fromAttributes(output.filterNot(_.isInstanceOf[MetadataAttribute]))
}
relation.fileFormat.supportBatch(relation.sparkSession, fileSchema)
}

private lazy val needsUnsafeRowConversion: Boolean = {
Expand All @@ -212,7 +220,16 @@ case class FileSourceScanExec(
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)
relation.sparkSession.sessionState.conf).map { vectorTypes =>
val metadataVectorClz =
if (relation.sparkSession.sessionState.conf.offHeapColumnVectorEnabled) {
classOf[OffHeapColumnVector].getName
} else {
classOf[OnHeapColumnVector].getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we will change to use a constant vector soon, how about we always use OnHeapColumnVector for now, to simplify the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea! thanks!

}
// for column-based file format, append metadata columns' vector type classes if any
vectorTypes ++ (if (outputMetadataStruct.isDefined) Seq(metadataVectorClz) else Seq.empty)
}

private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty

Expand Down Expand Up @@ -355,7 +372,9 @@ case class FileSourceScanExec(
@transient
private lazy val pushedDownFilters = {
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
dataFilters
.filterNot(_.references.exists(_.isInstanceOf[MetadataAttribute]))
.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
}

override lazy val metadata: Map[String, String] = {
Expand Down Expand Up @@ -597,7 +616,8 @@ case class FileSourceScanExec(
}
}

new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
requiredSchema, outputMetadataStruct)
}

/**
Expand Down Expand Up @@ -653,7 +673,8 @@ case class FileSourceScanExec(
val partitions =
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
requiredSchema, outputMetadataStruct)
}

// Filters unused DynamicPruningExpression expressions - one which has been replaced
Expand Down
Expand Up @@ -36,7 +36,8 @@ object PartitionedFileUtil {
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts,
file.getModificationTime, file.getLen)
}
} else {
Seq(getPartitionedFile(file, filePath, partitionValues))
Expand All @@ -48,7 +49,8 @@ object PartitionedFileUtil {
filePath: Path,
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts)
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts,
file.getModificationTime, file.getLen)
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
Expand Down
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType}


/**
Expand Down Expand Up @@ -171,6 +171,25 @@ trait FileFormat {
def supportFieldName(name: String): Boolean = true
}

object FileFormat {

val FILE_PATH = "file_path"

val FILE_NAME = "file_name"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering do we also plan to deprecate existing expression InputFileName in Spark?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think we should, as InputFileName is really fragile and can't be used with join for example.


val FILE_SIZE = "file_size"

val FILE_MODIFICATION_TIME = "file_modification_time"

// supported metadata columns for hadoop fs relation
val FILE_METADATA_COLUMNS: MetadataAttribute = MetadataAttribute("_metadata",
new StructType()
.add(StructField(FILE_PATH, StringType))
.add(StructField(FILE_NAME, StringType))
.add(StructField(FILE_SIZE, LongType))
.add(StructField(FILE_MODIFICATION_TIME, LongType)))
}

/**
* The base class file format that is based on text file.
*/
Expand Down