[#9504] feat(flink): support generic table for Gravitino flink connector#9689
[#9504] feat(flink): support generic table for Gravitino flink connector#9689jerryshao merged 6 commits intoapache:mainfrom
Conversation
|
Blocked by #9590 |
There was a problem hiding this comment.
Pull request overview
This pull request adds support for generic (non-Hive) tables in the Gravitino Flink connector to maintain compatibility with Flink's native Hive catalog behavior. The implementation enables Gravitino to properly handle tables created by native Flink clients that store schema information in table properties rather than as Hive schema.
Changes:
- Introduces
FlinkGenericTableUtilto detect, serialize, and deserialize generic tables - Overrides create/get/alter table methods in
GravitinoHiveCatalogto handle generic tables separately from raw Hive tables - Updates
HiveSchemaAndTablePropertiesConverterto setis_generic=falsefor Hive tables and validate connector types
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
FlinkGenericTableUtil.java |
New utility class for detecting generic vs Hive tables and handling property serialization/deserialization with flink.* prefix |
GravitinoHiveCatalog.java |
Overrides table operations to route generic tables to property-based storage and Hive tables to schema-based storage |
HiveSchemaAndTablePropertiesConverter.java |
Adds connector validation and sets is_generic=false flag for Hive tables |
BaseCatalog.java |
Minor refactoring to compute indices earlier (no functional change) |
FlinkHiveCatalogIT.java |
Comprehensive integration tests verifying bidirectional compatibility between Gravitino and native Flink for both generic and Hive tables |
TestFlinkGenericTableUtil.java |
Unit tests for generic table detection and property conversion logic |
TestHiveCatalogOperations.java |
Test for creating generic tables with empty columns |
TestDatabaseName.java |
Adds test database enum entry |
flink-catalog-hive.md |
Documents generic table support and requirement to specify connector=hive for raw Hive tables |
...in/java/org/apache/gravitino/flink/connector/hive/HiveSchemaAndTablePropertiesConverter.java
Outdated
Show resolved
Hide resolved
|
@jerryshao PTAL |
...test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
Show resolved
Hide resolved
| Hive schema. | ||
|
|
||
| :::note | ||
| You must set `connector=hive` explicitly when creating a raw Hive table. Otherwise, the table is created as a managed generic table. The managed table is not recommended to use and is deprecated in Flink. |
There was a problem hiding this comment.
The documentation states "The managed table is not recommended to use and is deprecated in Flink," but this statement lacks clarity and context. It would be more helpful to:
- Specify which version of Flink deprecated managed tables
- Clarify what users should use instead
- Add a reference/link to the relevant Flink documentation about this deprecation
This helps users understand the implications and make informed decisions.
| You must set `connector=hive` explicitly when creating a raw Hive table. Otherwise, the table is created as a managed generic table. The managed table is not recommended to use and is deprecated in Flink. | |
| You must set `connector=hive` explicitly when creating a raw Hive table. Otherwise, the table is created as a managed generic table. Starting from Apache Flink 1.15, managed generic tables in the Hive catalog are deprecated and should be avoided. Instead, use external generic tables (by specifying an explicit connector) or native Hive tables. For more details, see the Flink documentation on Hive generic tables: https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/hive_catalog/#generic-tables |
There was a problem hiding this comment.
update the document, and managed table related API is deprecated in Flink 1.18
...ctor/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
Show resolved
Hide resolved
| } | ||
| if (!(newTable instanceof ResolvedCatalogTable)) { | ||
| throw new CatalogException("Generic table must be a resolved catalog table"); | ||
| } |
There was a problem hiding this comment.
The applyGenericTableAlter method ignores the tableChanges parameter in the second alterTable overload (line 180). While this appears intentional since generic tables re-serialize the entire schema into properties (line 219), it would be valuable to add a comment explaining this design decision.
For example:
// For generic tables, we re-serialize the entire table schema and partition keys
// into flink.* properties, so the individual tableChanges are not needed.
// The newTable parameter contains the final state after applying all changes.This helps future maintainers understand why the parameter is unused and prevents potential confusion or incorrect modifications.
| } | |
| } | |
| // For generic tables, we re-serialize the entire table schema and partition keys | |
| // into flink.* properties, so the individual tableChanges are not needed. | |
| // The newTable parameter contains the final state after applying all changes. |
| } | ||
| }); | ||
|
|
||
| catalog().asTableCatalog().alterTable(identifier, changes.toArray(new TableChange[0])); |
There was a problem hiding this comment.
The applyGenericTableAlter method calls catalog().asTableCatalog().alterTable() at line 244 without wrapping it in a try-catch block. This is inconsistent with other methods in this class:
createTable(lines 117-137) wraps the call in try-catch and converts exceptions to appropriate Flink exceptionsgetTable(lines 143-156) wraps the call in try-catch and converts exceptionsloadGravitinoTable(lines 199-210) wraps the call in try-catch
The alterTable() call could throw various exceptions (like NoSuchTableException, IllegalArgumentException, etc.) that should be converted to CatalogException or TableNotExistException for consistency with Flink's catalog API. Consider adding proper exception handling similar to other methods in this class.
| catalog().asTableCatalog().alterTable(identifier, changes.toArray(new TableChange[0])); | |
| try { | |
| catalog().asTableCatalog().alterTable(identifier, changes.toArray(new TableChange[0])); | |
| } catch (NoSuchTableException e) { | |
| throw new CatalogException(e); | |
| } catch (RuntimeException e) { | |
| throw new CatalogException(e); | |
| } |
| if (catalogTable.getUnresolvedSchema().getColumns().isEmpty()) { | ||
| catalogTable = | ||
| CatalogPropertiesUtil.deserializeCatalogTable(flinkProperties, "generic.table.schema"); | ||
| } |
There was a problem hiding this comment.
The fallback logic at lines 77-79 attempts to deserialize with a different schema prefix ("generic.table.schema") if the initial deserialization results in empty columns. However, there's no test coverage for this fallback path.
Consider adding a test case that:
- Creates properties that trigger this fallback condition
- Verifies the fallback deserialization works correctly
- Documents when this fallback is needed (e.g., for backward compatibility with specific Flink versions)
This ensures the fallback logic is tested and its purpose is clear to future maintainers.
There was a problem hiding this comment.
this is the logic from Flink connector, seems no neccessary to add test for this.
|
update PR to fix the comments from AI, @jerryshao PTAL, thx |
|
please fix the conflict. |
…connector (apache#9689) ### What changes were proposed in this pull request? support generic table in Gravitino Flink connector, 1. **Create table path** (`flink-connector/flink/.../catalog/BaseCatalog.java` + `flink-connector/flink/.../hive/HiveSchemaAndTablePropertiesConverter.java`) - Detect Hive vs generic by mirroring Flink: - Hive table if `connector=hive` - Otherwise generic - For generic tables: - Serialize `ResolvedCatalogTable` with `CatalogPropertiesUtil.serializeCatalogTable(...)`. - Mask with `flink.` prefix (same as `HiveTableUtil.maskFlinkProperties(...)`). - Preserve `connector` / `connector.type` options in the masked properties. - Add `is_generic=true` only when Flink would add it (no connector keys). - Store these properties into Gravitino table properties. - **Schema handling:** Gravitino validates schema presence. If we must keep HMS schema empty for generic tables, introduce a generic-table exception in the Gravitino Hive catalog write path or allow empty columns when `is_generic=true` or `flink.*` properties indicate generic. - For Hive tables: - Keep the current flow in `HiveSchemaAndTablePropertiesConverter`: normalize serde/storage properties and set `connector=hive` in Flink options. 2. **Load table path** (`flink-connector/flink/.../catalog/BaseCatalog.java` + `flink-connector/flink/.../hive/HiveSchemaAndTablePropertiesConverter.java`) - Detect generic tables using the same rule as `HiveCatalog.isHiveTable(Table)`: - If `is_generic` exists: `isHiveTable = !Boolean.parseBoolean(is_generic)` - Else: `isHiveTable = !has(flink.connector) && !has(flink.connector.type)` - For generic tables: - Strip `flink.` prefix to build a Flink properties map. - Use `CatalogPropertiesUtil.deserializeCatalogTable(...)` to reconstruct schema and partition keys. - For managed tables where `connector=ManagedTableFactory.DEFAULT_IDENTIFIER`, remove the `connector` option (same as `HiveCatalog.instantiateCatalogTable(...)`). - For Hive tables: - Use the existing schema-from-columns and table-properties logic. 3. **Alter table path** (Table changes in `BaseCatalog` and property conversion) - Load table first to determine if it is generic. - For generic tables, re-serialize the updated table schema and partition keys into `flink.*` properties and update Gravitino properties; avoid writing HMS columns. - For Hive tables, keep the current schema-alter behavior. please refer more details in design doc: https://docs.google.com/document/d/1Nr09p1kkQ1pTmoLDs1tI2gpEIur8fhSFanmvEZkoG3c/edit?tab=t.0#heading=h.ad5fz2xu9563 ### Why are the changes needed? Fix: apache#9504 ### Does this PR introduce _any_ user-facing change? Yes, If the user create a table without specifying connector, it will create a generic managed table not hive table ### How was this patch tested? adding tests and ITs test locally Gravitino create generic JDBC table and native Flink connector could read it and vice versa.
What changes were proposed in this pull request?
support generic table in Gravitino Flink connector,
Create table path (
flink-connector/flink/.../catalog/BaseCatalog.java+flink-connector/flink/.../hive/HiveSchemaAndTablePropertiesConverter.java)connector=hiveResolvedCatalogTablewithCatalogPropertiesUtil.serializeCatalogTable(...).flink.prefix (same asHiveTableUtil.maskFlinkProperties(...)).connector/connector.typeoptions in the masked properties.is_generic=trueonly when Flink would add it (no connector keys).for generic tables, introduce a generic-table exception in the Gravitino Hive catalog write
path or allow empty columns when
is_generic=trueorflink.*properties indicate generic.HiveSchemaAndTablePropertiesConverter:normalize serde/storage properties and set
connector=hivein Flink options.Load table path (
flink-connector/flink/.../catalog/BaseCatalog.java+flink-connector/flink/.../hive/HiveSchemaAndTablePropertiesConverter.java)HiveCatalog.isHiveTable(Table):is_genericexists:isHiveTable = !Boolean.parseBoolean(is_generic)isHiveTable = !has(flink.connector) && !has(flink.connector.type)flink.prefix to build a Flink properties map.CatalogPropertiesUtil.deserializeCatalogTable(...)to reconstruct schema and partitionkeys.
connector=ManagedTableFactory.DEFAULT_IDENTIFIER, remove theconnectoroption (same asHiveCatalog.instantiateCatalogTable(...)).Alter table path (Table changes in
BaseCatalogand property conversion)flink.*properties and update Gravitino properties; avoid writing HMS columns.
please refer more details in design doc: https://docs.google.com/document/d/1Nr09p1kkQ1pTmoLDs1tI2gpEIur8fhSFanmvEZkoG3c/edit?tab=t.0#heading=h.ad5fz2xu9563
Why are the changes needed?
Fix: #9504
Does this PR introduce any user-facing change?
Yes, If the user create a table without specifying connector, it will create a generic managed table not hive table
How was this patch tested?
adding tests and ITs
test locally Gravitino create generic JDBC table and native Flink connector could read it and vice versa.