Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read Parquet data file with projection #244

Open
viirya opened this issue Mar 10, 2024 · 13 comments · Fixed by #245
Open

Read Parquet data file with projection #244

viirya opened this issue Mar 10, 2024 · 13 comments · Fixed by #245
Milestone

Comments

@viirya
Copy link
Member

viirya commented Mar 10, 2024

We can read Parquet file with TableScan as a stream of Arrow RecordBatches now. However, it reads all columns without projections of columns. TableScanBuilder.select is a no-op now. It is better if we can propagate selected columns to TableScan to apply the projection to scan operation.

@sdd
Copy link
Contributor

sdd commented Mar 10, 2024

Firstly, it's great to see someone else helping out on this - getting projection and filtering working on reads will unlock the most important (for me anyway 😅) use cases, so thanks for the contribution!

I think you need to make a change though. The parquet file columns should be matched by ID rather than name, as the schema can evolve. See https://iceberg.apache.org/spec/#column-projection for more details. This makes it a bit more tricky but you are thinking along the right direction!

@viirya
Copy link
Member Author

viirya commented Mar 10, 2024

Thank you @sdd. I will take a look the doc tomorrow and update the PR accordingly.

@liurenjie1024
Copy link
Collaborator

Hi, @viirya I've took a look at the pr and found that some thing is missed. Due to schema evolution of iceberg, the parquet schema maybe different from iceberg schema. Also projection is in not simply extracting fields by id since schema may be deeply nested. We need to implement #251 #252 first.

Blocked by #251 #252

@liurenjie1024 liurenjie1024 added this to the 0.3.0 Release milestone Mar 11, 2024
@viirya
Copy link
Member Author

viirya commented Mar 11, 2024

Thank you @liurenjie1024 .

I have looked the doc @sdd mentioned that describes Iceberg column projection. Looks like the projection is specified by field id as @sdd said due to schema evolution.

I think at user API, I guess it should be selected by column names. For example, currently in TableScanBuilder the API select is used to select columns by names. And we find the field ids internally by column names and use matched field ids to do column projection.

But currently the select API cannot select deeply nested fields. It is also in #251 or #252?

I will take a look #251 and #252 and see if I can implement them first.

@liurenjie1024
Copy link
Collaborator

liurenjie1024 commented Mar 12, 2024

But currently the select API cannot select deeply nested fields.

Hi @viirya

In fact, current api is supposed to allow user to select deeply nested fields like select("person.name", "person.age"), which is related with #251 .

#252 is used to find actual array index in parquet file.

@viirya
Copy link
Member Author

viirya commented Mar 12, 2024

Ah, I see. Thanks for the point. I began looking at Java implementation of #251.

@liurenjie1024
Copy link
Collaborator

Problem Statement

When converting parquet file to arrow in iceberg, there are several problems to take into consideration:

  1. Field id mapping. Iceberg stores field in arrow schema, and then converted to parquet schema's field id. When doing projection, we should map id by field, rather by name.
  2. Type promotion. Iceberg support lazy schema evolution, e.g. int -> long. So when reading from parquet, we need to promote int array to long array.
  3. Default value. Due to iceberg's lazy schema evolution, when reading from parquet, one field maybe missing in parquet file, and in this case we can fill in null values, or some default values in schema ideally.

Example

Let's use an example to illustrate these problems. Let's say current iceberg table schema is following:

schema {
  struct person [id = 1] {
     struct address [id = 2] {
        string city [id= 3]
        string street [id = 4]
     }
     string name [id = 5]
  }
  struct howtown [id=6] {
     string city [id = 7]
     string state [id=8]
  }
  long age [id=9]
}

And parquet file with following schema:

schema {
  struct person [id = 1] {
     struct address [id = 2] {
        string city [id= 3]
        string street [id = 4]
     }
     string name [id = 5]
  }
  struct howtown [id=6] {
     string city [id = 7]
  }
  int age [id=9]
}

Now we want to do following projection: ("person.address", "person.name", "hometown.state", "age")

The result schema is supposed to be following:

schema {
   struct address [id = 2] {
        string city [id= 3]
        string street [id = 4]
     }
    string name [id = 5]
    string state [id=8]
    long age [id=9]
}

Solution

After #251 #252 , we have finished necessary building blocks for projection. Here is a proposed algorithm for this :

  1. Collect leave column ids after schema pruning, and translate it to ProjectionMask to do column pruning when reading parquet file.
  2. Implement sth like ArrowProjectionVisitor in python to translate record batch to actual RecordBatch matching iceberg's schema.

@viirya
Copy link
Member Author

viirya commented Mar 26, 2024

Thanks @liurenjie1024.

I read through the summary above.

I think currently #245 has done the first one Collect leave column ids after schema pruning, and translate it to ProjectionMask. The select API can select root/nested columns. When building table scan, #245 takes the field ids from selected fields and pass to arrow reader builder.

When the arrow reader goes to read files, it uses these field ids to find corresponding leaf column indices from Parquet schema. The leaf column indices are used to construct ProjectionMask and pass to the Parquet record batch reader to project columns when reading Parquet files.

@viirya
Copy link
Member Author

viirya commented Mar 26, 2024

I think this Implement sth like ArrowProjectionVisitor is for the following two items?

Type promotion. Iceberg support lazy schema evolution, e.g. int -> long. So when reading from parquet, we need to promote int array to long array.
Default value. Due to iceberg's lazy schema evolution, when reading from parquet, one field maybe missing in parquet file, and in this case we can fill in null values, or some default values in schema ideally.

Currently #245 has not implemented these yet. I think we can finish the basic projection in #245 and I will continue work on the ArrowProjectionVisitor to finish type promotion and default value.

@liurenjie1024
Copy link
Collaborator

I think this Implement sth like ArrowProjectionVisitor is for the following two items?

Yes, but with one extra requirement: reconstructing struct arrays. For example, when we select (person.address.street, person.name], we have projection mask [1,2], and the returned schema is

schema {
  struct person {
     struct address {
        string street
     }
     string name
  }
}

But what we expect is

schema {
   string person
   string name
}

Currently #245 has not implemented these yet. I think we can finish the basic projection in #245 and I will continue work on the ArrowProjectionVisitor to finish type promotion and default value.

This sound reasonable, but we need to add a verification that selected fields are not nested fields and primitive types, which works for most cases.

@viirya
Copy link
Member Author

viirya commented Mar 27, 2024

I think this Implement sth like ArrowProjectionVisitor is for the following two items?

Yes, but with one extra requirement: reconstructing struct arrays. For example, when we select (person.address.street, person.name], we have projection mask [1,2], and the returned schema is

schema {
  struct person {
     struct address {
        string street
     }
     string name
  }
}

But what we expect is

schema {
   string person
   string name
}

Yea. I think the Parquet reader cannot return the flatten schema but the pruned struct person. Like in query engines such as Spark, we need an additional projection to project the pruned person to flatten two root columns person and name.

@viirya
Copy link
Member Author

viirya commented Apr 1, 2024

@liurenjie1024 Thanks for reviewing and merging #245.

Implement sth like ArrowProjectionVisitor in python to translate record batch to actual RecordBatch matching iceberg's schema.

I will work on this soon. Do we want to reuse this ticket?

@liurenjie1024
Copy link
Collaborator

@liurenjie1024 Thanks for reviewing and merging #245.

Implement sth like ArrowProjectionVisitor in python to translate record batch to actual RecordBatch matching iceberg's schema.

I will work on this soon. Do we want to reuse this ticket?

Yes, given we have discussions in this ticket, I think reusing it would make keeping the context easier.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

Successfully merging a pull request may close this issue.

3 participants