Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create iceberg table from existsing parquet files with slightly different schemas (schemas merge is possible). #601

Open
sergun opened this issue Apr 12, 2024 · 8 comments

Comments

@sergun
Copy link

sergun commented Apr 12, 2024

Question

Hi!

What is a right way to create an iceberg table from existsing parquet files with slightly different schemas? So merge of their schemas is possible.
I would like to create the iceberg table by iceberg-python library (without Spark).

@kevinjqliu
Copy link
Collaborator

There's a Table.add_files API which supports directly adding parquet files. But it seems like the parquet files must have the same schema.

You can also read all the parquet files into memory with PyArrow, merge the different schemas (using pyarrow.unify_schemas) and then write Arrow as Iceberg table.

Maybe DuckDB works well here since read_parquet can union schemas, see union_by_name and https://duckdb.org/docs/data/parquet/tips.html#use-union_by_name-when-loading-files-with-different-schemas

@sergun
Copy link
Author

sergun commented Apr 14, 2024

Thank you @kevinjqliu !
Do you know how to read parquet file with unified schema in pyarrow?

I successfully merged schemas:

    t1 = pq.read_table("data/1.parquet")
    t2 = pq.read_table("data/2.parquet")
    schema = pa.unify_schemas([t1.schema, t2.schema])
    print(schema)

but the next lines give an error:

t1 = pq.read_table("data/1.parquet", schema=schema)
t2 = pq.read_table("data/2.parquet", schema=schema)
# union of t1 and t2 and write to iceberg should follow
pyarrow.lib.ArrowTypeError: struct fields don't match or are in the wrong order: Input fields: struct<z: int64, x: int64> output fields: struct<z: int64, x: int64, y: int64, w: struct<w1: int64>>

Reg. duckdb - unfortunately union by name does not work for nested parquet files with changes in schemas on any level of nested structures. BTW it works for json in duckdb. It is my question in duckdb discussion:
duckdb/duckdb#11633

@kevinjqliu
Copy link
Collaborator

Looks like your schema is nested, which makes things more complicated. It's pretty difficult to deal with merging nested schemas. I'm not sure if there's an out-of-the-box solution for this.
One possible solution could be to use pandas (or another engine) to merge the data once it is read into memory. Then use the union schema as the Iceberg table's schema.
Another solution can be to read into memory, flatten the schemas, and then write to Iceberg.

That said, most of the difficulties here are not related to Iceberg. One thing I wonder is if PyIceberg can handle schema evolution of nested structs.

@sergun
Copy link
Author

sergun commented Apr 16, 2024

@kevinjqliu
It is strange to me that in PyArrow there is pa.unify_schemas(...) which is able (I double-checked) to unify nested schemas (even with type promotions).
But there is no "dual" functionality to cast, concat, read or to do something different with corresponding data in pa.Table. None of e.g. pa.Table.cast(...), pa.concat_tables(...), , pa.parquet.read_table(...) is working.

@sergun
Copy link
Author

sergun commented Apr 16, 2024

One thing I wonder is if PyIceberg can handle schema evolution of nested structs.

Looks like it can.
From https://py.iceberg.apache.org/api/#add-column:

with table.update_schema() as update:
    update.add_column("retries", IntegerType(), "Number of retries to place the bid")
    # In a struct
    update.add_column("details.confirmed_by", StringType(), "Name of the exchange")

@sergun
Copy link
Author

sergun commented Apr 16, 2024

BTW: Found some explaination why merge of Arrow tables with different schemas is not possible:
apache/arrow#35424
The reason looks weired, but yes, as I remeber e.g. Spark dataframes may have columns with duplicated names.

Probably it is possible to implement table merge in PyArrow after the check that there are no duplicated column names in each struct and on root level.

@Fokko
Copy link
Contributor

Fokko commented Apr 16, 2024

The reason looks weired, but yes, as I remeber e.g. Spark dataframes may have columns with duplicated names.

Wow, I learned something today. I hope nobody uses that in real life.

That said, most of the difficulties here are not related to Iceberg. One thing I wonder is if PyIceberg can handle schema evolution of nested structs.

Nested structs, or structs inside a maps and lists are all supported :)

@sergun In PyIceberg we also have a union_by_name that will add the missing columns to the schema. Would that work?

@soutong
Copy link

soutong commented May 31, 2024

have any java soultion that import parquet files ? @Fokko

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants