New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL #34575
Conversation
/** | ||
* The internal representation of the hidden metadata column | ||
*/ | ||
class MetadataAttribute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will think about this new class. Maybe have something like AttributeReferenceBase trait.
@cloud-fan @brkyvz It would be great if you can take a look! Thanks! |
ok to test |
override val metadata: Metadata = Metadata.empty)( | ||
override val exprId: ExprId = NamedExpression.newExprId, | ||
override val qualifier: Seq[String] = Seq.empty[String]) | ||
extends AttributeReference(name, dataType, nullable, metadata)(exprId, qualifier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not extend AttributeReference
, otherwise copy
can cause issues
@@ -276,3 +276,10 @@ object LogicalPlanIntegrity { | |||
checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan) | |||
} | |||
} | |||
|
|||
/** | |||
* A logical plan node with exposed metadata columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A logical plan node that can generate metadata columns
ok to test |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145183 has finished for PR 34575 at commit
|
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145253 has finished for PR 34575 at commit
|
Test build #145256 has finished for PR 34575 at commit
|
Kubernetes integration test starting |
Test build #145263 has finished for PR 34575 at commit
|
Kubernetes integration test status failure |
Test build #145268 has finished for PR 34575 at commit
|
*/ | ||
object MetadataAttribute { | ||
def apply(name: String, dataType: DataType): AttributeReference = | ||
AttributeReference(name, dataType, true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we allow non-nullable metadata attr? We should probably add one more parameter in apply
: nullable: boolean
.add(StructField(FILE_PATH, StringType)) | ||
.add(StructField(FILE_NAME, StringType)) | ||
.add(StructField(FILE_SIZE, LongType)) | ||
.add(StructField(FILE_MODIFICATION_TIME, LongType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be TimestampType?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more like a design choice? I think both are fine, I don't have a strong opinion on it.
long
matches what the file modification tells you directly;
timestamp
is more readable;
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this one is an easy decision. Timestamp type is much better as people can do WHERE _metadata.modificationTime < TIMESTAMP'2020-12-12 12:12:12'
or other datetime operations. And df.show
can also display the value in a more user-readable format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, it makes sense! addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only one comment about the data type of one metadata column.
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status failure |
@@ -45,7 +45,7 @@ import org.apache.spark.util.NextIterator | |||
* @param filePath URI of the file to read | |||
* @param start the beginning offset (in bytes) of the block. | |||
* @param length number of bytes to read. | |||
* @param modificationTime The modification time of the input file, in milliseconds. | |||
* @param modificationTime The modification time of the input file, in microseconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can still put milliseconds
here, as it matches file.getModificationTime
. We can * 1000
in FileScanRDD when we set the value to the internal row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure! done
Kubernetes integration test status failure |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test status failure |
Test build #146428 has finished for PR 34575 at commit
|
Test build #146429 has finished for PR 34575 at commit
|
Test build #146431 has finished for PR 34575 at commit
|
thanks, merging to master! |
Test build #146436 has finished for PR 34575 at commit
|
Test build #146437 has finished for PR 34575 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late review, thanks for @Yaohua628 for the work! Just have some questions. Thanks.
val columnVector = new OnHeapColumnVector(c.numRows(), StringType) | ||
rowId = 0 | ||
// use a tight-loop for better performance | ||
while (rowId < c.numRows()) { | ||
columnVector.putByteArray(rowId, filePathBytes) | ||
rowId += 1 | ||
} | ||
columnVector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like for each batch of input rows, we need to recreate new column vector onheap, and write the same constant values per each row (i.e. file path, file name, file size, etc). Just wondering the performance penalty when reading a large table, how big of table have we tested?
Maybe a simple optimization here is to come up with something like ConstantColumnVector
, where for each row, all values are same, and we only need to save one copy of value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for reviewing! make sense - also Bart mentioned something optimizing putByteArray
for all rows: #34575 (comment)
val filePathBytes = path.toString.getBytes | ||
val fileNameBytes = path.getName.getBytes | ||
var rowId = 0 | ||
metadataColumns.map(_.name).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should already know how to fill column vector for each metadata column, so the pattern matching can be done outside of execution, and here it does not need to do pattern matching per batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per-batch should be fine to have some small overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, it's not a huge issue as it's per batch not per row. But also think it's not hard to organize code as the most efficient way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the comments, really appreciate that!
but, we have to do something per batch right? since we cannot be sure of c.numRows
(small file or last batch), and different file formats could have configurable different max rows per batch: Parquet, ORC.
unless we could have a ConstantColumnVector
as you mentioned or something else and refer only c.numRows
for each batch?
|
||
val FILE_PATH = "file_path" | ||
|
||
val FILE_NAME = "file_name" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering do we also plan to deprecate existing expression InputFileName
in Spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think we should, as InputFileName
is really fragile and can't be used with join for example.
case MetadataAttribute(attr) => attr | ||
} | ||
|
||
// TODO (yaohua): should be able to prune the metadata struct only containing what needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Shall we file a JIRA?
### What changes were proposed in this pull request? Follow-up PR of #34575. Support the metadata struct schema pruning for all file formats. ### Why are the changes needed? Performance improvements. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs and a new UT. Closes #35147 from Yaohua628/spark-37768. Authored-by: yaohua <yaohua.zhao@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…present in the data filter ### What changes were proposed in this pull request? Follow-up PR of #34575. Filtering files if metadata columns are present in the data filter. ### Why are the changes needed? Performance improvements. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs and a new UT. Closes #35055 from Yaohua628/spark-37769. Authored-by: yaohua <yaohua.zhao@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This PR proposes a new interface in Spark SQL that allows users to query the metadata of the input files for all file formats. Spark SQL will expose them as built-in hidden columns meaning users can only see them when they explicitly reference them. Currently, This PR proposes to support the following metadata columns inside of a metadata struct
_metadata
:This proposed hidden file metadata interface has the following behaviors:
(SELECT *)
. In other words, they are not returned unless being explicitly referenced.Why are the changes needed?
To improve the Spark SQL observability for all file formats that still leverage DSV1.
Does this PR introduce any user-facing change?
Yes.
Example return:
How was this patch tested?
Add new testsuite: FileMetadataColumnsSuite