-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13883][SQL] Parquet Implementation of FileFormat.buildReader #11709
Conversation
sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) | ||
|
||
// Try to push down filters when filter push-down is enabled. | ||
if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liancheng any idea why this isn't working?
Test build #53150 has finished for PR 11709 at commit
|
Test build #53212 has finished for PR 11709 at commit
|
Test build #53229 has finished for PR 11709 at commit
|
Test build #53237 has finished for PR 11709 at commit
|
Test build #53262 has finished for PR 11709 at commit
|
Test build #53367 has finished for PR 11709 at commit
|
val iter = new RecordReaderIterator(parquetReader) | ||
val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes | ||
val joinedRow = new JoinedRow() | ||
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we append the partition values as batches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do, this only happens when we fall back on the old parquet-mr reader. Otherwise columns are appended on line 370. I can more this into the if to make that more clear.
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) | ||
|
||
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. | ||
if (parquetReader.isInstanceOf[UnsafeRowParquetRecordReader]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnsafeRowParquetRecordReader could still produce UnsafeRow (without partition values), if enableVectorizedParquetReader is false
Test build #53369 has finished for PR 11709 at commit
|
Test build #53372 has finished for PR 11709 at commit
|
@@ -807,6 +809,11 @@ public final int appendStruct(boolean isNull) { | |||
public final boolean isArray() { return resultArray != null; } | |||
|
|||
/** | |||
* Marks this column as being constant. | |||
*/ | |||
public final void setIsConstant() { isConstant = true; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a special ColumnVector for constants (that always return the same value )?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a good suggestion. Let's do it as a follow up though.
Test build #53558 has finished for PR 11709 at commit
|
resolve merge conflicts in vectorized parquet reader
Test build #53571 has finished for PR 11709 at commit
|
Fix ParquetRelation
Test build #53565 has finished for PR 11709 at commit
|
Test build #53575 has finished for PR 11709 at commit
|
Conflicts: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
Test build #53719 has finished for PR 11709 at commit
|
LGTM |
Thanks, merging to master! |
if (fileSchema.containsPath(colPath)) { | ||
ColumnDescriptor fd = fileSchema.getColumnDescription(colPath); | ||
if (!fd.equals(requestedSchema.getColumns().get(i))) { | ||
throw new IOException("Schema evolution not supported."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to include fd in the exception message
This PR add implements the new `buildReader` interface for the Parquet `FileFormat`. An simple implementation of `FileScanRDD` is also included. This code should be tested by the many existing tests for parquet. Author: Michael Armbrust <michael@databricks.com> Author: Sameer Agarwal <sameer@databricks.com> Author: Nong Li <nong@databricks.com> Closes apache#11709 from marmbrus/parquetReader.
val vectorizedReader = new VectorizedParquetRecordReader() | ||
vectorizedReader.initialize(split, hadoopAttemptContext) | ||
logDebug(s"Appending $partitionSchema ${file.partitionValues}") | ||
vectorizedReader.initBatch(partitionSchema, file.partitionValues) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marmbrus Not quite sure about the intention of this line. Are we "reserving" column batches for partition columns here so that partition values can be filled later after data columns are fetched?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering why we need to pass partition schema and values to buildReader
since partitioning should have already been handled during planning phase. Then I found they are only used here. Seems that this is pretty much a Parquet vectorized reader specific use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this perhaps belongs at a higher layer (probably in FileScanRDD), but that would require us to vectorize everything or take a giant performance hit. This line is telling the vectorized reader to append the partition columns as static columns. This allows us to avoid an extra copy to append them for the optimized path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, I'd just take the non-vectorized version below, put it in a utility function and use it everywhere. If we vectorize all the sources, that will be the only part we have to remove and then this can be done in FileScanRDD.
I think that you do not want to do the actually partition appending in the planner like we were before, because you can't have Spark Partitions (splits) that read from different partitions very easily. This is what was making the bucking logic so convoluted in the old code path. This makes bucketing and collapsing of small files into a single partition much simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion. When I mentioned "planning phase" what I really meant was that ideally the data source implementation shouldn't care about partitioning at all. But I mixed up partition discovery and partition value appending. I agree with your comments. Thanks for the explanations.
This PR add implements the new
buildReader
interface for the ParquetFileFormat
. An simple implementation ofFileScanRDD
is also included.This code should be tested by the many existing tests for parquet.