Feat: Add support for Trino Iceberg tables#2129
Conversation
There was a problem hiding this comment.
I refactored this slightly to make it take up less memory. Rather than having an entire Postgres instance per metastore, I changed it to a single Postgres instance with a database per metastore
There was a problem hiding this comment.
I found this gateway definition was repeated in multiple places - here, and where the engine_adapter fixture was created.
So rather than repeating the logic to generate it, I made it its own fixture
There was a problem hiding this comment.
I'm not super familiar with sqlglot, is this a valid approach?
There was a problem hiding this comment.
@georgesittas Can you confirm the best approach here?
There was a problem hiding this comment.
I'm not quite sure - what SQL code is this supposed to generate? Is there a way we can add a test to ensure we generate the correct DDL statement?
One observation is that instead of this="partitioning", we may need to do something like this=exp.var("partitioning") to have an expression in this vs a string (which is usually the case for Property expressions IIRC).
There was a problem hiding this comment.
@georgesittas I added some tests in test_trino.py to demonstrate the generated SQL.
the tl;dr is that the SQL should be identical to Hive - with the exception of the partitioned_by property being called partitioning instead
eakmanrq
left a comment
There was a problem hiding this comment.
This is amazing and a highly requested feature! Thanks for all the attention to details you put into this.
One thing that is missing is documentation. Can you update the Trino documentation to include a note about Iceberg support?
Also from reviewing this PR it seems like the code currently assumes a single catalog is used when connecting to Trino. For example it always checks "current_catalog" when wanting to know what the connector is when the object it is creating could actually be in a different catalog and therefore be a different connector. If that is correct, I think that constraint is fine but should also be documented.
sqlmesh/core/engine_adapter/trino.py
Outdated
There was a problem hiding this comment.
Is it expected that we could end up with multiple catalogs in the response? I'm wondering if instead we should raise if we get multiple and do seq_get(connector_name, 0) or self.DEFAULT_CATALOG_TYPE (seq_get is part of sqglot).
There was a problem hiding this comment.
Catalogs in Trino need to have unique names or Trino will fail to start, so there should only ever be a single row returned.
Since fetchone() returns a single row as a tuple, so the length check is to see if there was a value in the first column connector_name, and if there is, it gets returned.
If no records are returned, we just return the default
There was a problem hiding this comment.
Ah I see now. You could then use seq_get(connector_name, 0) or self.DEFAULT_CATALOG_TYPE or this is fine too.
There was a problem hiding this comment.
I like that, its more concise. I added it
sqlmesh/core/engine_adapter/base.py
Outdated
There was a problem hiding this comment.
It seems like we should raise UnsupportedCatalogOperationError here if self.CATALOG_SUPPORt.is_unsupported. It is strange for example for MySQL to return a mysql current catalog when it doesn't support catalogs at all.
There was a problem hiding this comment.
Fair call, it didn't even occur to me that catalogs might not be supported by some databases
There was a problem hiding this comment.
@georgesittas Can you confirm the best approach here?
|
Thanks for your review @eakmanrq , i'll make some tweaks based on the feedback.
Actually, I wasn't intending to commit you guys to supporting Iceberg until I was sure it was working myself :) this PR was an initial implementation but i'm expecting other issues to crop up so I wasn't going to state it was "supported" just yet. However, I can update the docs if you're ok with that
You're absolutely right, I just wanted to gauge feedback before spending too much time making it flexible. |
We currently define supported as being able to pass all the integration tests. Since that is the case with your change then I think it would be correct to consider it supported. |
3527990 to
9b38b2f
Compare
Cool, works for me! I've added a section to the docs. |
There was a problem hiding this comment.
| property: exp.Property | |
| property = exp.PartitionedByProperty( | |
| this=exp.Schema(expressions=partitioned_by), | |
| ) | |
| property: exp.Property = exp.PartitionedByProperty( | |
| this=exp.Schema(expressions=partitioned_by), | |
| ) |
- Use the catalog of the table instead of the catalog of the connection when probing the catalog type - Add Trino unit tests showing the Iceberg code paths and generated SQL - Add section on Iceberg to the Trino docs section
b76ba6d to
32a56cc
Compare
|
Thanks! I can't merge it, GitHub shows the following message:
|
partitioned_byfor Hive tables andpartitioningfor Iceberg tablesref: Issue #1998