Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions data/conf/iceberg/llap/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,9 @@
<name>hive.lock.sleep.between.retries</name>
<value>2</value>
</property>

<property>
<name>write.metadata.delete-after-commit.enabled</name>
Copy link
Member

@deniskuzZ deniskuzZ Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think hive-site.xml configs need to have an iceberg. prefix.
As an example please see https://docs.cloudera.com/data-warehouse/cloud/managing-warehouses/topics/dw-iceberg-manifest-caching-configure.html

Copy link
Contributor Author

@difin difin Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manifest caching configs in Hive already support being set in session conf with the same catalog prefix as the RESTCatalog configs - iceberg.catalog-default. or iceberg.catalog.<name>. :

@Test
public void testDefaultCatalogProperties() {
String catalogProperty = "io.manifest.cache-enabled";
// Set global property
final String defaultCatalogProperty = CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX + catalogProperty;

Can we keep the existing prefix approach for consistency?

Copy link
Member

@deniskuzZ deniskuzZ Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is exactly what i was looking for.
however, maybe we could simplify and drop the catalog-default to be consistent with other engines.
see Impala:
https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/util/IcebergUtil.java#L1258-L1283

cc @ayushtkn

Copy link
Member

@deniskuzZ deniskuzZ Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@difin in that case why do we need any other code changes like setWriteMetadataCleanupProperties?

<value>false</value>
</property>
</configuration>
5 changes: 5 additions & 0 deletions data/conf/iceberg/tez/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,9 @@
<name>hive.lock.sleep.between.retries</name>
<value>2</value>
</property>

<property>
<name>write.metadata.delete-after-commit.enabled</name>
<value>false</value>
</property>
</configuration>
5 changes: 5 additions & 0 deletions data/conf/llap/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -403,4 +403,9 @@
<name>hive.lock.sleep.between.retries</name>
<value>2</value>
</property>

<property>
<name>write.metadata.delete-after-commit.enabled</name>
<value>false</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class BaseHiveIcebergMetaHook implements HiveMetaHook {
.of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, InputFormatConfig.PARTITION_SPEC);
static final String ORC_FILES_ONLY = "iceberg.orc.files.only";
private static final String ZORDER_FIELDS_JSON_KEY = "zorderFields";
private static final boolean HIVE_ICEBERG_METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT = true;

protected final Configuration conf;
protected Table icebergTable = null;
Expand Down Expand Up @@ -202,6 +203,65 @@ public void preCreateTable(CreateTableRequest request) {
// Remove hive primary key columns from table request, as iceberg doesn't support hive primary key.
request.setPrimaryKeys(null);
setSortOrder(hmsTable, schema, catalogProperties);
setWriteMetadataCleanupProperties(hmsTable.getParameters());
}

private void setWriteMetadataCleanupProperties(Map<String, String> hmsParams) {

boolean isEnabled = isConfigEnabled(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
HIVE_ICEBERG_METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT,
hmsParams);

if (isEnabled) {
catalogProperties.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, true);

addCatalogProperty(
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
hmsParams);
}
}

/*
* Determines whether a given configuration is enabled, following this precedence:
* 1. If the configuration exists in the HMS table parameters (hmsParams), use its boolean value.
* 2. Otherwise, check the session configuration (conf) and parse its boolean value if present.
* 3. If neither HMS parameters nor session configuration provide a value, return the specified defaultValue.
* This ensures that table-level settings override session settings, with a fallback to the default.
*/
private boolean isConfigEnabled(String configName, boolean defaultValue, Map<String, String> hmsParams) {

String hmsParamValue = hmsParams.get(configName);
if (hmsParamValue != null) {
return Boolean.parseBoolean(hmsParamValue);
}

String hiveConfValue = conf.get(configName);
if (hiveConfValue != null) {
return Boolean.parseBoolean(hiveConfValue);
}

return defaultValue;
}

/*
* Adds a catalog property to catalogProperties using the following precedence:
* 1. If the property exists in the HMS table parameters (hmsParams), that value is always used.
* 2. Otherwise, the value is taken from the session configuration (conf),
* falling back to the provided defaultValue if not set in the session.
* This ensures table-level parameters override session, with a fallback to the default.
*/
private void addCatalogProperty(String key, Object defaultValue, Map<String, String> hmsParams) {

// If table property exists — always use it
if (hmsParams.containsKey(key)) {
catalogProperties.put(key, hmsParams.get(key));
return;
}

// Use the session conf, falling back to the default value
catalogProperties.put(key, conf.get(key, defaultValue.toString()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,7 @@ public void testAlterTableProperties() {
public void testIcebergAndHmsTableProperties() throws Exception {
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.getHiveConf().setBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, false);
shell.executeStatement(String.format("CREATE EXTERNAL TABLE default.customers " +
"STORED BY ICEBERG %s" +
"TBLPROPERTIES ('%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s')",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests Format specific features, such as reading/writing tables, using delete files, etc.
*/
public class TestHiveIcebergWriteMetadataCleanup {

protected static TestHiveShell shell;

protected TestTables testTables;

@Rule
public TemporaryFolder temp = new TemporaryFolder();

@Rule
public Timeout timeout = new Timeout(500_000, TimeUnit.MILLISECONDS);

@BeforeClass
public static void beforeClass() {
shell = HiveIcebergStorageHandlerTestUtils.shell();
}

@AfterClass
public static void afterClass() throws Exception {
shell.stop();
}

@Before
public void before() throws IOException {
testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, TestTables.TestTableType.HIVE_CATALOG, temp);
HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp);
HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
}

@After
public void after() throws Exception {
HiveIcebergStorageHandlerTestUtils.close(shell);
// Mixing mr and tez jobs within the same JVM can cause problems. Mr jobs set the ExecMapper status to done=false
// at the beginning and to done=true at the end. However, tez jobs also rely on this value to see if they should
// proceed, but they do not reset it to done=false at the beginning. Therefore, without calling this after each test
// case, any tez job that follows a completed mr job will erroneously read done=true and will not proceed.
ExecMapper.setDone(false);
}

private void insertFirstFiveCustomers() {
shell.executeStatement("insert into customers values (0, 'Alice', 'Brown')");
shell.executeStatement("insert into customers values (1, 'Bob', 'Brown')");
shell.executeStatement("insert into customers values (2, 'Charlie', 'Brown')");
shell.executeStatement("insert into customers values (3, 'David', 'Brown')");
shell.executeStatement("insert into customers values (4, 'Eve', 'Brown')");
}

private void insertNextFiveCustomers() {
shell.executeStatement("insert into customers values (5, 'Frank', 'Brown')");
shell.executeStatement("insert into customers values (6, 'Grace', 'Brown')");
shell.executeStatement("insert into customers values (7, 'Heidi', 'Brown')");
shell.executeStatement("insert into customers values (8, 'Ivan', 'Brown')");
shell.executeStatement("insert into customers values (9, 'Judy', 'Brown')");
}

@Test
public void testWriteMetadataCleanupEnabled() {

// Enable write metadata cleanup on session level
shell.getHiveConf().setBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, true);
shell.getHiveConf().setInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 4);

Table table = testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, PartitionSpec.unpartitioned(), FileFormat.ORC, null, 2);

insertFirstFiveCustomers();
assertMetadataFiles(table, 5 /* Iceberg keeps max metadata files + 1 */);

// Override max previous versions on table level
shell.executeStatement(String.format("alter table customers set tblproperties('%s'='%d')",
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 7));

insertNextFiveCustomers();
assertMetadataFiles(table, 8);
}

@Test
public void testWriteMetadataCleanupDisabled() {

// Disable write metadata cleanup on session level
shell.getHiveConf().setBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, false);

Table table = testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, PartitionSpec.unpartitioned(), FileFormat.ORC, null, 2);

insertFirstFiveCustomers();
insertNextFiveCustomers();

assertMetadataFiles(table, 11);
}

@Test
public void testWriteMetadataCleanupEnabledOnSessionLevelDisabledOnTableLevel() {

// Enable metadata cleanup configs on session level
shell.getHiveConf().setBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, true);
shell.getHiveConf().setInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 4);

Table table = testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, PartitionSpec.unpartitioned(), FileFormat.ORC, null, 2);

// Disable metadata cleanup configs on table level
shell.executeStatement(String.format("alter table customers set tblproperties('%s'='%s')",
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "false"));

insertFirstFiveCustomers();
insertNextFiveCustomers();

assertMetadataFiles(table, 12);
}

@Test
public void testWriteMetadataCleanupDisabledOnSessionLevelEnabledOnTableLevel() {

// Enable metadata cleanup configs on session level
shell.getHiveConf().setBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, false);

Table table = testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, PartitionSpec.unpartitioned(), FileFormat.ORC, null, 2);

// Disable metadata cleanup configs on table level
shell.executeStatement(String.format("alter table customers set tblproperties('%s'='%s', '%s'='%d')",
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true",
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 4));

insertFirstFiveCustomers();
insertNextFiveCustomers();

assertMetadataFiles(table, 5);
}

private void assertMetadataFiles(Table table, int expectedCount) {
List<String> metadataFiles =
Arrays.stream(new File(table.location().replaceAll("^[a-zA-Z]+:", "") + "/metadata")
.listFiles())
.map(File::getAbsolutePath)
.filter(f -> f.endsWith(getFileExtension(TableMetadataParser.Codec.NONE)))
.toList();
assertThat(metadataFiles).hasSize(expectedCount);
}
}