-
Notifications
You must be signed in to change notification settings - Fork 2k
[Spark] Add a new option to workaround incorrect schema automatically created in external catalog #4431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Add a new option to workaround incorrect schema automatically created in external catalog #4431
Conversation
…rect schema automatically created in external catalog
|
@harperjiang Could you please review this PR? This is continuation of #2310. Followings are replies to your existing comments:
We need to run ALTER TABLE in any of those case conditions, so we need to keep the current place.
This was a great feedback. I reused the existing config and logic instead of introducing similar logic.
I have compared the results of those test cases, and noticed that the existing test cases did not capture the original issue successfully. I have tried to implement unit/integ test cases, but it was not easy. I concluded to add test scenario document rather than making super complex test cases which may introduce deep dependency. Hope this works. |
|
Tagging @dennyglee for visibility. |
|
Thanks @moomindani - including @roeap for review - thanks! |
|
Hi @roeap, could you please take a look at this? |
|
Hey @moomindani - could you take a look at the conflicts for this? @roeap and I will find time to review this next week as well. |
|
@moomindani @dennyglee - taking a look now! |
|
Resolved the conflict with the other refactoring commit. Moved the code change to CreateDeltaTableLike.scala. Tested the new implementation and verified the same result. |
|
@prakharjain09 could you please take a look at this PR? |
|
@moomindani Could you clarify the current plan: Is #2310 still targeted to be merged and this PR is a follow up or is this PR intended as a replacement of #2310? |
|
@LukasRupprecht #2310 is not being worked on today, and this is the replacement of that. We want to unblock these use cases by merging this PR instead. |
| if (conf.getConf(DeltaSQLConf.FORCE_ALTER_TABLE_DATA_SCHEMA)) { | ||
| spark.sessionState.catalog.alterTableDataSchema(cleaned.identifier, cleaned.schema) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @harperjiang mentioned on the previous PR, this shouldn't be necessary anymore if DELTA_UPDATE_CATALOG_ENABLED is set.
@moomindani I saw your comment on the previous PR that you've tried setting this config but it didn't work. Could you share the exact steps that you used to test whether the schema is correctly written to the catalog? If this is not working, then something is wrong (either with the test setup or the code) and we should fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LukasRupprecht Here's the details. Is this enough? Actually three different folks tried the way described in the previous PR, but no one succeeded. I do not think this is my config issue.
Environment
- AWS
- us-east-1 Region
- AWS Glue for Apache Spark (Glue version 5.0 / Apache Spark 3.5.4)
- Glue Data Catalog enabled (In Glue job console,
Use Glue data catalog as the Hive metastoreis selected) - Delta Lake 3.3.0 enabled (In Glue job console, job parameter is provided:
--datalake-formats delta)
Spark conf
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog--conf spark.databricks.delta.catalog.update.enabled=true
Script
additional_options = {
"path": "s3://<path_to_data>/database/table/",
"write.parquet.compression-codec": "snappy",
}
df.write.format("delta").saveAsTable("database.table")
Steps
- Create a Glue job with the above configurations
- Run the job
- Check
database.tableon Glue Data Catalog to see if the schema turned out tocol (array)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to figure out why the existing config it is not working as expected before we add a new config that does the exact same thing.
To start, could you
- Add a unit test to DeltaUpdateCatalogSuite that creates a table and checks that the schema is correct in the in-memory catalog that is used in the test when
DELTA_UPDATE_CATALOG_ENABLEDis set (there should already be tests like this in that suite but we can create one from scratch just to verify). - Add the same unit test again but now with
DELTA_UPDATE_CATALOG_ENABLEDoff andFORCE_ALTER_TABLE_DATA_SCHEMAon? This should result in the same schema being uploaded to the catalog as in the first unit test.
If the two unit tests indeed work the same, then we know DELTA_UPDATE_CATALOG_ENABLED works at least in the tests and we can start debugging things in your setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LukasRupprecht I have added the two unit tests as you requested. The test 1 succeeded but the test 2 failed.
From this observation, it is clear that both work differently. This is also observed in the integration test result linked from the PR overview: https://docs.google.com/document/d/e/2PACX-1vRl4lWFyx1ASeYCUoj575fLM0dFKhXM5MekWS__NVnwJJESfMflGS71OKHoFjEoHFjmidLHeEXHoreb/pub.
To make it extremely clear, I pushed the commit including those unit tests (but it failed).
As far as I understand, this issue occurs only with external Hive catalog (including Glue Data Catalog), and DELTA_UPDATE_CATALOG_ENABLED is not enough.
Let me trace through what happens when DELTA_UPDATE_CATALOG_ENABLED is used:
Step 1: Delta Sets Schema in CatalogTable
// In cleanupTableDefinition()
if (conf.getConf(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED)) {
table.copy(
schema = truncatedSchema, // ← Delta sets the schema here
properties = UpdateCatalog.updatedProperties(snapshot),
storage = storageProps, // ← This contains Delta-specific storage info
tracksPartitionsInCatalog = true)
}Step 2: Spark Tries to Create Table in External Catalog
// In updateCatalog()
spark.sessionState.catalog.createTable(cleaned, ignoreIfExists = false, validateLocation = false)Step 3: Hive Metastore Processes the Table Definition
When Spark calls createTable() on an external Hive-compatible catalog, here's what happens:
- SerDe Detection: Hive looks at the table's storage format to determine the appropriate SerDe
- Delta Format Check: Hive sees Delta-specific storage properties but doesn't recognize Delta as a valid SerDe
- Fallback Behavior: Since Delta isn't a registered Hive SerDe, Hive falls back to default behavior
- Schema Stripping: The default behavior is to ignore/strip the provided schema and use an empty schema
Step 4: The Schema Gets Lost
// What Delta sends to Hive:
CatalogTable(
schema = StructType(Array(
StructField("id", LongType),
StructField("name", StringType),
StructField("value", DoubleType)
)),
storage = CatalogStorageFormat(
locationUri = Some("s3://bucket/path"),
inputFormat = Some("org.apache.hadoop.mapred.SequenceFileInputFormat"), // Delta-specific
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"),
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") // Default, not Delta-aware
)
)
// What Hive actually stores:
CatalogTable(
schema = StructType(Array()), // ← EMPTY! Schema was stripped
storage = CatalogStorageFormat(...) // Storage info might be modified too
)Why FORCE_ALTER_TABLE_DATA_SCHEMA Works
Your approach bypasses this SerDe validation entirely:
Step 1: Create Table with Empty Schema
// First, create table normally (schema gets stripped as expected)
spark.sessionState.catalog.createTable(cleaned, ...)
// Result: Table exists in catalog with empty schemaStep 2: Force Schema Update
// Then, directly update the schema using Spark's catalog API
if (conf.getConf(DeltaSQLConf.FORCE_ALTER_TABLE_DATA_SCHEMA)) {
spark.sessionState.catalog.alterTableDataSchema(cleaned.identifier, cleaned.schema)
}Why This Works:
• alterTableDataSchema() is a direct catalog metadata operation
• It doesn't go through SerDe validation - it directly updates the catalog's metadata store
• It bypasses Hive's SerDe compatibility checks entirely
• The catalog accepts the schema update because it's a metadata-only operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@moomindani thanks for the detailed explanation and we think that makes sense.
The only concern is this solution is too HMS specific. Can you add HMS to the conf name (sth like HMS_FORCE_UPDATE_DATA_SCHEMA) and provide some context in the doc (e.g., this is a problem known only to HMS etc.)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I will do that and update this PR shortly with better explanation and config name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
9cfb1ef to
a5f48a2
Compare
a5f48a2 to
ae5f0c5
Compare
…_TABLE_DATA_SCHEMA behavior - Test 1: Verifies DELTA_UPDATE_CATALOG_ENABLED stores schema correctly in catalog - Test 2: Verifies FORCE_ALTER_TABLE_DATA_SCHEMA behavior when DELTA_UPDATE_CATALOG_ENABLED is disabled - These tests help validate the different approaches to catalog schema management
- Test name exceeded 100 character limit - Split the test name using string concatenation to comply with scalastyle rules
… created in external catalog (delta-io#4431) <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR is continuation of delta-io#2310. * This PR resolves issue 1 in the issue delta-io#1679. * The PR will change how the catalog schema is saved in Hive Metastore. Technical details: * Added a new boolean parameter `spark.databricks.delta.schema.forceAlterTableDataSchema` in `DeltaSqlConf`. * When this parameter is true, then in the class `CreateDeltaTableCommand`, in `updateCatalog` function, after create a table in the catalog it will update the table schema using a session catalog function (`alterTableDataSchema`) ## How was this patch tested? See https://docs.google.com/document/d/e/2PACX-1vRl4lWFyx1ASeYCUoj575fLM0dFKhXM5MekWS__NVnwJJESfMflGS71OKHoFjEoHFjmidLHeEXHoreb/pub ### Reason not to add unit tests The new config and logic is designed to trigger ALTER TABLE. Since we don't have a straightforward way to mock or spy on the catalog in this test environment, additional tet cases are not added. ### Reason not to add integration tests The original issue occurs with external catalog such as AWS Glue Data Catalog. Since we do not want to introduce extra dependency just to test this patch into the integration test, additional test cases are not added. While we can't directly test against AWS Glue Data Catalog in unit tests, I created the above doc to summarize the integration test results. ### Reason not to cover the existing test in the previous PR The previous PR delta-io#2310 had two test cases, but I verified that both test cases did not capture the original issue. ## Does this PR introduce _any_ user-facing changes? Yes. This PR introduces a new configuration `spark.databricks.delta.schema.forceAlterTableDataSchema` to force ALTER TABLE when Delta table is created into catalog. --------- Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com>
Which Delta project/connector is this regarding?
Description
This PR is continuation of #2310.
Technical details:
spark.databricks.delta.schema.forceAlterTableDataSchemainDeltaSqlConf.CreateDeltaTableCommand, inupdateCatalogfunction, after create a table in the catalog it will update the table schema using a session catalog function (alterTableDataSchema)How was this patch tested?
See https://docs.google.com/document/d/e/2PACX-1vRl4lWFyx1ASeYCUoj575fLM0dFKhXM5MekWS__NVnwJJESfMflGS71OKHoFjEoHFjmidLHeEXHoreb/pub
Reason not to add unit tests
The new config and logic is designed to trigger ALTER TABLE.
Since we don't have a straightforward way to mock or spy on the catalog in this test environment, additional tet cases are not added.
Reason not to add integration tests
The original issue occurs with external catalog such as AWS Glue Data Catalog.
Since we do not want to introduce extra dependency just to test this patch into the integration test, additional test cases are not added.
While we can't directly test against AWS Glue Data Catalog in unit tests, I created the above doc to summarize the integration test results.
Reason not to cover the existing test in the previous PR
The previous PR #2310 had two test cases, but I verified that both test cases did not capture the original issue.
Does this PR introduce any user-facing changes?
Yes. This PR introduces a new configuration
spark.databricks.delta.schema.forceAlterTableDataSchemato force ALTER TABLE when Delta table is created into catalog.