Skip to content
Permalink
Browse files
HIVE-25065: Implement ALTER TABLE for setting iceberg table properties (
#2230) (Laszlo Pinter, reviewed by Marton Bod and Peter Vary)
  • Loading branch information
lcspinter committed Apr 29, 2021
1 parent 8048019 commit f877f02733cb1ba5a6fc13053d870e2bc7c818ab
Showing 5 changed files with 75 additions and 5 deletions.
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -53,6 +54,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hive.HiveSchemaUtil;
@@ -61,6 +63,7 @@
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
@@ -238,9 +241,9 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
}

@Override
public void commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable,
public void commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context,
PartitionSpecProxy partitionSpecProxy) throws MetaException {
super.commitAlterTable(hmsTable, partitionSpecProxy);
super.commitAlterTable(hmsTable, context, partitionSpecProxy);
if (canMigrateHiveTable) {
catalogProperties = getCatalogProperties(hmsTable);
catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(preAlterTableProperties.schema));
@@ -251,6 +254,21 @@ public void commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable
}
HiveTableUtil.importFiles(preAlterTableProperties.tableLocation, preAlterTableProperties.format,
partitionSpecProxy, preAlterTableProperties.partitionKeys, catalogProperties, conf);
} else {
Map<String, String> contextProperties = context.getProperties();
if (contextProperties.containsKey(ALTER_TABLE_OPERATION_TYPE) &&
allowedAlterTypes.contains(contextProperties.get(ALTER_TABLE_OPERATION_TYPE))) {
Map<String, String> hmsTableParameters = hmsTable.getParameters();
Splitter splitter = Splitter.on(PROPERTIES_SEPARATOR);
UpdateProperties icebergUpdateProperties = icebergTable.updateProperties();
if (contextProperties.containsKey(SET_PROPERTIES)) {
splitter.splitToList(contextProperties.get(SET_PROPERTIES))
.forEach(k -> icebergUpdateProperties.set(k, hmsTableParameters.get(k)));
} else if (contextProperties.containsKey(UNSET_PROPERTIES)) {
splitter.splitToList(contextProperties.get(UNSET_PROPERTIES)).forEach(icebergUpdateProperties::remove);
}
icebergUpdateProperties.commit();
}
}
}

@@ -535,6 +535,37 @@ public void testCreateTableWithoutColumnComments() {
}
}

@Test
public void testAlterTableProperties() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
shell.executeStatement("CREATE EXTERNAL TABLE customers (" +
"t_int INT, " +
"t_string STRING) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
testTables.locationForCreateTableSQL(identifier) +
testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
String propKey = "dummy";
String propValue = "dummy_val";
// add new property
shell.executeStatement(String.format("ALTER TABLE customers SET TBLPROPERTIES('%s'='%s')", propKey, propValue));
// Check the Iceberg table parameters
Table icebergTable = testTables.loadTable(identifier);
Assert.assertTrue(icebergTable.properties().containsKey(propKey));
Assert.assertEquals(icebergTable.properties().get(propKey), propValue);
// update existing property
propValue = "new_dummy_val";
shell.executeStatement(String.format("ALTER TABLE customers SET TBLPROPERTIES('%s'='%s')", propKey, propValue));
// Check the Iceberg table parameters
icebergTable.refresh();
Assert.assertTrue(icebergTable.properties().containsKey(propKey));
Assert.assertEquals(icebergTable.properties().get(propKey), propValue);
// remove existing property
shell.executeStatement(String.format("ALTER TABLE customers UNSET TBLPROPERTIES('%s'='%s')", propKey, propValue));
// Check the Iceberg table parameters
icebergTable.refresh();
Assert.assertFalse(icebergTable.properties().containsKey(propKey));
}

@Test
public void testIcebergAndHmsTableProperties() throws Exception {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -136,6 +137,22 @@ private void finalizeAlterTableWithWriteIdOp(Table table, Table oldTable, List<P

try {
environmentContext.putToProperties(HiveMetaHook.ALTER_TABLE_OPERATION_TYPE, desc.getType().name());
if (desc.getType() == AlterTableType.ADDPROPS) {
Map<String, String> oldTableParameters = oldTable.getParameters();
environmentContext.putToProperties(HiveMetaHook.SET_PROPERTIES,
table.getParameters().entrySet().stream()
.filter(e -> !oldTableParameters.containsKey(e.getKey()) ||
!oldTableParameters.get(e.getKey()).equals(e.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.joining(HiveMetaHook.PROPERTIES_SEPARATOR)));
} else if (desc.getType() == AlterTableType.DROPPROPS) {
Map<String, String> newTableParameters = table.getParameters();
environmentContext.putToProperties(HiveMetaHook.UNSET_PROPERTIES,
oldTable.getParameters().entrySet().stream()
.filter(e -> !newTableParameters.containsKey(e.getKey()))
.map(Map.Entry::getKey)
.collect(Collectors.joining(HiveMetaHook.PROPERTIES_SEPARATOR)));
}
if (partitions == null) {
long writeId = desc.getWriteId() != null ? desc.getWriteId() : 0;
context.getDb().alterTable(desc.getDbTableName(), table, desc.isCascade(), environmentContext, true,
@@ -48,6 +48,9 @@ public interface HiveMetaHook {
public List<String> allowedAlterTypes = ImmutableList.of("ADDPROPS", "DROPPROPS");
String ALTERLOCATION = "ALTERLOCATION";
String ALLOW_PARTITION_KEY_CHANGE = "allow_partition_key_change";
String SET_PROPERTIES = "set_properties";
String UNSET_PROPERTIES = "unset_properties";
String PROPERTIES_SEPARATOR = "'";

/**
* Called before a new table definition is added to the metastore
@@ -126,9 +129,10 @@ public default void preAlterTable(Table table, EnvironmentContext context) throw
/**
* Called after a table is altered in the metastore during ALTER TABLE.
* @param table new table definition
* @param context environment context, containing information about the alter operation type
* @param partitionSpecProxy list of partitions wrapped in {@link PartitionSpecProxy}
*/
default void commitAlterTable(Table table, PartitionSpecProxy partitionSpecProxy) throws MetaException {
default void commitAlterTable(Table table, EnvironmentContext context, PartitionSpecProxy partitionSpecProxy) throws MetaException {
// Do nothing
}

@@ -523,7 +523,7 @@ public void alter_table_with_environmentContext(String dbname, String tbl_name,
client.alter_table_req(req);
if (hook != null) {
PartitionSpecProxy partitionSpecProxy = listPartitionSpecs(dbname, tbl_name, Integer.MAX_VALUE);
hook.commitAlterTable(new_tbl, partitionSpecProxy);
hook.commitAlterTable(new_tbl, envContext, partitionSpecProxy);
}
success = true;
} finally {
@@ -568,7 +568,7 @@ public void alter_table(String catName, String dbName, String tbl_name, Table ne
client.alter_table_req(req);
if (hook != null) {
PartitionSpecProxy partitionSpecProxy = listPartitionSpecs(catName, dbName, tbl_name, Integer.MAX_VALUE);
hook.commitAlterTable(new_tbl, partitionSpecProxy);
hook.commitAlterTable(new_tbl, envContext, partitionSpecProxy);
}
success = true;
} finally {

0 comments on commit f877f02

Please sign in to comment.