-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-43039][SQL] Support custom fields in the file source _metadata column. #40677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,10 +27,9 @@ import org.apache.spark.sql.catalyst.InternalRow | |
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection | ||
| import org.apache.spark.sql.errors.QueryExecutionErrors | ||
| import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.sources.Filter | ||
| import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType} | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
|
|
||
|
|
@@ -165,6 +164,17 @@ trait FileFormat { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Create a file metadata struct column containing fields supported by the given file format. | ||
| */ | ||
| def createFileMetadataCol(): AttributeReference = { | ||
| // Strip out the fields' metadata to avoid exposing it to the user. [[FileSourceStrategy]] | ||
| // avoids confusion by mapping back to [[metadataSchemaFields]]. | ||
|
||
| val fields = metadataSchemaFields | ||
| .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation) | ||
| FileSourceMetadataAttribute(FileFormat.METADATA_NAME, StructType(fields), nullable = false) | ||
| } | ||
|
|
||
| /** | ||
| * Returns whether this format supports the given [[DataType]] in read/write path. | ||
| * By default all data types are supported. | ||
|
|
@@ -176,6 +186,23 @@ trait FileFormat { | |
| * By default all field name is supported. | ||
| */ | ||
| def supportFieldName(name: String): Boolean = true | ||
|
|
||
| /** | ||
| * All fields the file format's _metadata struct defines. | ||
| * | ||
| * Each field's metadata should define [[METADATA_COL_ATTR_KEY]], | ||
| * [[FILE_SOURCE_METADATA_COL_ATTR_KEY]], and either | ||
| * [[FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY]] or | ||
| * [[FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY]] as appropriate. | ||
| * | ||
| * Constant attributes will be extracted automatically from | ||
| * [[PartitionedFile.extraConstantMetadataColumnValues]], while generated metadata columns always | ||
| * map to some hidden/internal column the underslying reader provides. | ||
| * | ||
| * NOTE: It is not possible to change the semantics of the base metadata fields by overriding this | ||
| * method. Technically, a file format could choose suppress them, but that is not recommended. | ||
| */ | ||
| def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS | ||
|
||
| } | ||
|
|
||
| object FileFormat { | ||
|
|
@@ -192,15 +219,6 @@ object FileFormat { | |
|
|
||
| val FILE_MODIFICATION_TIME = "file_modification_time" | ||
|
|
||
| val ROW_INDEX = "row_index" | ||
|
|
||
| // A name for a temporary column that holds row indexes computed by the file format reader | ||
| // until they can be placed in the _metadata struct. | ||
| val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX" | ||
|
|
||
| val ROW_INDEX_FIELD = FileSourceGeneratedMetadataStructField( | ||
| ROW_INDEX, ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, nullable = false) | ||
|
|
||
| val METADATA_NAME = "_metadata" | ||
|
|
||
| /** | ||
|
|
@@ -223,27 +241,6 @@ object FileFormat { | |
| FileSourceConstantMetadataStructField(FILE_BLOCK_LENGTH, LongType, nullable = false), | ||
| FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false)) | ||
|
|
||
| /** | ||
| * Supported metadata fields of the given [[FileFormat]]. | ||
| */ | ||
| def metadataSchemaFields(fileFormat: FileFormat): Seq[StructField] = fileFormat match { | ||
| case _: ParquetFileFormat => | ||
| BASE_METADATA_FIELDS :+ ROW_INDEX_FIELD | ||
| case _ => | ||
| BASE_METADATA_FIELDS | ||
| } | ||
|
|
||
| /** | ||
| * Create a file metadata struct column containing fields supported by the given [[FileFormat]]. | ||
| */ | ||
| def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = { | ||
| // Strip out the fields' metadata to avoid exposing it to the user. [[FileSourceStrategy]] | ||
| // avoids confusion by mapping back to [[metadataSchemaFields]]. | ||
| val fields = metadataSchemaFields(fileFormat) | ||
| .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation) | ||
| FileSourceMetadataAttribute(FileFormat.METADATA_NAME, StructType(fields)) | ||
| } | ||
|
|
||
| // create an internal row given required metadata fields and file information | ||
| def createMetadataInternalRow( | ||
| fieldNames: Seq[String], | ||
|
|
@@ -253,7 +250,7 @@ object FileFormat { | |
| // We are not aware of `FILE_BLOCK_START` and `FILE_BLOCK_LENGTH` before splitting files | ||
| assert(!fieldNames.contains(FILE_BLOCK_START) && !fieldNames.contains(FILE_BLOCK_LENGTH)) | ||
| updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames, | ||
| filePath, fileSize, 0L, fileSize, fileModificationTime) | ||
| filePath, fileSize, 0L, fileSize, fileModificationTime, Map.empty) | ||
| } | ||
|
|
||
| // update an internal row given required metadata fields and file information | ||
|
|
@@ -264,9 +261,11 @@ object FileFormat { | |
| fileSize: Long, | ||
| fileBlockStart: Long, | ||
| fileBlockLength: Long, | ||
| fileModificationTime: Long): InternalRow = { | ||
| fileModificationTime: Long, | ||
| otherConstantMetadataColumnValues: Map[String, Any]): InternalRow = { | ||
|
||
| fieldNames.zipWithIndex.foreach { case (name, i) => | ||
| name match { | ||
| // NOTE: The base metadata fields are hard-wired here and cannot be overridden. | ||
| case FILE_PATH => row.update(i, UTF8String.fromString(filePath.toString)) | ||
| case FILE_NAME => row.update(i, UTF8String.fromString(filePath.getName)) | ||
| case FILE_SIZE => row.update(i, fileSize) | ||
|
|
@@ -276,10 +275,13 @@ object FileFormat { | |
| // the modificationTime from the file is in millisecond, | ||
| // while internally, the TimestampType `file_modification_time` is stored in microsecond | ||
| row.update(i, fileModificationTime * 1000L) | ||
| case ROW_INDEX => | ||
| // Do nothing. Only the metadata fields that have identical values for each row of the | ||
| // file are set by this function, while fields that have different values (such as row | ||
| // index) are set separately. | ||
| case other => | ||
| // Other metadata columns use the file-provided value (if any). Automatically convert raw | ||
| // values (including nulls) to literals as a courtesy. | ||
| Literal(otherConstantMetadataColumnValues.get(other).orNull) match { | ||
| case Literal(null, _) => row.setNullAt(i) | ||
| case literal => row.update(i, literal.value) | ||
| } | ||
| } | ||
| } | ||
| row | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,11 +23,30 @@ import org.apache.spark.sql.catalyst.InternalRow | |
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| /** | ||
| * A file status augmented with optional metadata, which tasks and file readers can use however they | ||
| * see fit. For example, a custom [[FileIndex]] and [[FileFormat]] working together could expose | ||
| * this extra metadata as file-constant fields of the file source metadata column. | ||
| */ | ||
| case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) { | ||
|
||
| // Wrapper methods to improve source compatibility in code that still expects a [[FileStatus]]. | ||
| def getPath: Path = fileStatus.getPath | ||
| def getLen: Long = fileStatus.getLen | ||
| def getModificationTime: Long = fileStatus.getModificationTime | ||
| def isDirectory: Boolean = fileStatus.isDirectory | ||
| } | ||
|
|
||
| /** | ||
| * A collection of data files from a partitioned relation, along with the partition values in the | ||
| * form of an [[InternalRow]]. | ||
| */ | ||
| case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus]) | ||
| case class PartitionDirectory(values: InternalRow, files: Seq[FileStatusWithMetadata]) | ||
|
|
||
| object PartitionDirectory { | ||
| // For backward compat with code that does not know about extra file metadata | ||
| def apply(values: InternalRow, files: Array[FileStatus]): PartitionDirectory = | ||
| PartitionDirectory(values, files.map(FileStatusWithMetadata(_))) | ||
| } | ||
|
|
||
| /** | ||
| * An interface for objects capable of enumerating the root paths of a relation as well as the | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.