Vectorized Reads of Parquet with Identity Partitions#1287
Vectorized Reads of Parquet with Identity Partitions#1287rdblue merged 2 commits intoapache:masterfrom
Conversation
Previously vectorization would be disabled whenever an underlying iceberg table was using Parquet files and also used Identity transforms in it's partitioning. To fix this we extend the DummyVectorReader to be a ConstantVectorReader which is used when a column's value can be determined from the PartitionSpec. Then when constructing the reader we use a ConstantColumnVector to fill in the missing column.
|
CC @aokolnychyi |
|
@samarthjain, can you help review this one? |
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
Outdated
Show resolved
Hide resolved
spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
Outdated
Show resolved
Hide resolved
| .option("vectorization-enabled", String.valueOf(vectorized)) | ||
| .load(table.location()).orderBy("id").collectAsList(); | ||
| .load(table.location()).orderBy("id") | ||
| .select("id", "date", "level", "message") |
There was a problem hiding this comment.
Isn't this the default? Why was it necessary to add select?
There was a problem hiding this comment.
When I added in the Hive Import it gets the schema in a different order, I think this may be an issue with the import code? I'm not sure, but I know the default column order does not come out the same way :/
There was a problem hiding this comment.
That's suspicious. We'll have to look into why the schema has the wrong order. I see select before all the writes, so it shouldn't need the reorder here.
There was a problem hiding this comment.
I'll try to figure out the actual issue today, but I agree it shouldn't work this way. My assumption is that the Hive table schema is just being listed in a different order or when we use SparkSchemaUtil the order is getting scrambled.
There was a problem hiding this comment.
I spent some time digging into this,
When you call saveAsTable it ends up in this bit of code in DataFrameWriter
val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = storage,
schema = new StructType,
provider = Some(source),
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec)Which strips out whatever incoming schema you have. So the new table is created without any information about the actual ordering of columns you used in the create.
Then when the Relation is resolved, that's when the attributes are looked up again and the schema is created from the Attribute output. So long story short, saveAsTable doesn't care about your field ordering as far as I can tell. This is all in Spark and I'm not sure we can do anything about it here.
There was a problem hiding this comment.
I'm fine with this, then. Thanks for looking into it!
|
Looks mostly good to me! |
|
I wasn't really happy about doing the instance checking in Java (if dummy
then cast), it makes me long for Scala :P
I do think this is probably a minimal set of changes to get this in without
breaking too much open
…On Mon, Aug 3, 2020 at 12:59 PM Ryan Blue ***@***.***> wrote:
Looks mostly good to me!
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1287 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AADE2YN4YJZSKNOSMAPSBU3R633JDANCNFSM4PTQKJVQ>
.
|
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
Show resolved
Hide resolved
|
One minor comment, but generally looks good to me. |
Renamed Classes Parameterized Others
|
Thanks @samarthjain and @rdblue I applied all your comments! The only thing which I couldn't address was the Hive (saveAsTable) reordering thing. But hopefully I can get some time to working on making a save.format(iceberg) to due some column pruning with identity transforms and simplify this test later? |
|
Merged. Thanks, @RussellSpitzer! Good to have this feature done. |
Previously vectorization would be disabled whenever an underlying iceberg table
was using Parquet files and also used Identity transforms in it's partitioning.
To fix this we extend the DummyVectorReader to be a ConstantVectorReader which is
used when a column's value can be determined from the PartitionSpec. Then when
constructing the reader we use a ConstantColumnVector to fill in the missing
column.