-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-29437: Iceberg: Fix concurrency issues between compaction and co… #6292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...erg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Show resolved
Hide resolved
...eberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java
Show resolved
Hide resolved
b842441 to
5433c78
Compare
5433c78 to
508798e
Compare
508798e to
22e610b
Compare
| * This is needed because hiveConf() returns the original conf passed to start(), | ||
| * which may not have the connection URL that was set in the handler's serverConf. | ||
| */ | ||
| public String getConnectionURL() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the code used to set the DB url?
private static void setupMetastoreDB(String dbURL) throws Exception {
HiveConf conf = new HiveConf();
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY,
"jdbc:derby:" + DERBY_PATH + ";create=true");
TestTxnDbUtil.prepDb(conf);
}
why we use baseHandler.getConf()?
could we reuse MetaStoreInit.getConnectionURL(Configuration conf)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we reuse MetaStoreInit.getConnectionURL(Configuration conf)
No, MetaStoreInit.getConnectionURL(Configuration conf) returns default db url - Derby in-memory:
jdbc:derby:memory:/Users/dfingerman/workspace/hive-upstream-difin/iceberg/iceberg-handler/target/tmp/junit_metastore_db;create=true
static String getConnectionURL(Configuration conf) {
return MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY, "");
}
Current method returns the correct file-based derby db url:
shell.metastore().getConnectionURL():
jdbc:derby:/var/folders/ks/4pwh80t957gc8q2z7jdblpw80000gq/T/hive3437069780704791128/metastore_db;create=true
This is what took me 1-2 days to find why compaction command wasn't able to resolve correct database during compaction command analysis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we using fallback here?
// Fallback: construct from DERBY_PATH pattern
return "jdbc:derby:" + DERBY_PATH + ";create=true";
someone should be setting the CONNECT_URL_KEY, otherwise, how everything else works?
is this the place?
private static void setupMetastoreDB(String dbURL) throws Exception {
HiveConf conf = new HiveConf();
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY,
"jdbc:derby:" + DERBY_PATH + ";create=true");
TestTxnDbUtil.prepDb(conf);
}
why not move to initConf
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY,
"jdbc:derby:" + DERBY_PATH + ";create=true");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not move to initConf
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY, "jdbc:derby:" + DERBY_PATH + ";create=true");
It doesn't work - when compaction command is being analyzed it connects to the default derby in-memory db.
It only works when this is added in HiveIcebergStorageHandlerWithEngineBase#executeConcurrently
shell.setHiveSessionValue(HiveConf.ConfVars.METASTORE_CONNECT_URL_KEY.varname,
shell.metastore().getConnectionURL());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, but what about metastore().getConnectionURL() do we always go with fallback? can we set CONNECT_URL_KEY in init and then return in getConnectionURL()?
...eberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
Outdated
Show resolved
Hide resolved
| shell.setHiveConfValue("tez.counters.max", "1024"); | ||
|
|
||
| // Settings for Hive Iceberg Compaction | ||
| shell.setHiveConfValue(HiveConf.ConfVars.HIVE_LOCK_MANAGER.varname, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not set configs that are already defaults
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HiveConf:
HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager", ""),
Where is this defined as default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is not good actually, DbLockManager should be the default. Isn't ACID default upstream?
.../src/test/java/org/apache/iceberg/mr/hive/test/utils/HiveIcebergStorageHandlerTestUtils.java
Outdated
Show resolved
Hide resolved
...andler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerWithEngineBase.java
Outdated
Show resolved
Hide resolved
| Configuration shellConf = shell.getHiveConf(); | ||
|
|
||
| if (metastoreConnectUrl != null) { | ||
| shellConf.set(HiveConf.ConfVars.METASTORE_CONNECT_URL_KEY.varname, metastoreConnectUrl); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am not convinced we should be doing this. can't we set globally METASTORE_CONNECT_URL_KEY?
like System.setProperty(HiveConf.ConfVars.METASTORE_CONNECT_URL_KEY.varname,
MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.CONNECT_URL_KEY));
|
|
||
| String[] sql = new String[] { | ||
| "INSERT INTO ice_t SELECT i*100, p*100 FROM ice_t", | ||
| "ALTER TABLE ice_t compact 'MAJOR' and wait" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think that is a proper test.
ALTER TABLE ice_t compact 'MAJOR' would initiate new IOW query, would it synchronize with INSERT? HiveIcebergStorageHandlerStub#waitForAllWritesToComplete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please check above, maybe we need to execute IOW with compaction session attributes.
or fix the synchronization
22e610b to
2e0825d
Compare
deniskuzZ
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
please create new JIRA for the test part
…ncurrent write operations.
2e0825d to
3bf65e2
Compare
|



…ncurrent write operations.
What changes were proposed in this pull request?
Fixing concurrency issues between compaction and concurrent write operations.
Why are the changes needed?
It was found in downstream testing that when Hive Iceberg compaction is running in parallel to Spark write operations on the same table, compaction sometimes produces wrong results. Before committing, when Hive already has the compacted data files that need to replace existing, uncompacted data and delete files in a table or partition, it collects those uncompacted data and delete files to replace them with the compacted files. The issue is that Hive collects those uncompacted data and delete files from the latest Iceberg snapshot instead of using the original snapshot. The latest snapshot may contain different data because of concurrent write operations, which can lead to data corruption.
Does this PR introduce any user-facing change?
No
How was this patch tested?
The fix was validated downstream with concurrent Spark write operations and Hive Iceberg compaction.