Skip to content
Permalink
Browse files
HIVE-25314: Implement SHOW CREATE TABLE command for Iceberg tables (#…
…2458) (Laszlo Pinter, reviewed by Marton Bod)
  • Loading branch information
lcspinter committed Jul 13, 2021
1 parent 9a4a36a commit 96710345bc96813b867c1b855fb1528b671dbd26
Showing 12 changed files with 357 additions and 55 deletions.
@@ -73,15 +73,12 @@
public class HiveIcebergMetaHook implements HiveMetaHook {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class);
private static final Set<String> PARAMETERS_TO_REMOVE = ImmutableSet
.of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME);
.of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, InputFormatConfig.PARTITION_SPEC);
private static final Set<String> PROPERTIES_TO_REMOVE = ImmutableSet
// We don't want to push down the metadata location props to Iceberg from HMS,
// since the snapshot pointer in HMS would always be one step ahead
.of(BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP,
// Initially we'd like to cache the partition spec in HMS, but not push it down later to Iceberg during alter
// table commands since by then the HMS info can be stale + Iceberg does not store its partition spec in the props
InputFormatConfig.PARTITION_SPEC);
BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP);
private static final EnumSet<AlterTableType> SUPPORTED_ALTER_OPS = EnumSet.of(
AlterTableType.ADDCOLS, AlterTableType.REPLACE_COLUMNS, AlterTableType.ADDPROPS, AlterTableType.DROPPROPS,
AlterTableType.SETPARTITIONSPEC);
@@ -21,13 +21,15 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.type.Date;
@@ -42,6 +44,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -270,6 +273,31 @@ public boolean supportsPartitionTransform() {
return true;
}

@Override
public List<PartitionTransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
List<PartitionTransformSpec> result = new ArrayList<>();
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
return table.spec().fields().stream().map(f -> {
PartitionTransformSpec spec = new PartitionTransformSpec();
spec.setColumnName(table.schema().findColumnName(f.sourceId()));
// right now the only way to fetch the transform type and its params is through the toString() call
String transformName = f.transform().toString().toUpperCase();
// if the transform name contains '[' it means it has some config params
if (transformName.contains("[")) {
spec.setTransformType(PartitionTransformSpec.TransformType
.valueOf(transformName.substring(0, transformName.indexOf("["))));
spec.setTransformParam(Optional.of(Integer
.valueOf(transformName.substring(transformName.indexOf("[") + 1, transformName.indexOf("]")))));
} else {
spec.setTransformType(PartitionTransformSpec.TransformType.valueOf(transformName));
spec.setTransformParam(Optional.empty());
}

return spec;
}).collect(Collectors.toList());
}

@Override
public String getFileFormatPropertyKey() {
return TableProperties.DEFAULT_FILE_FORMAT;
@@ -26,7 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
@@ -68,15 +68,15 @@ static Table getTable(Configuration configuration, Properties properties) {

/**
* Create {@link PartitionSpec} based on the partition information stored in
* {@link org.apache.hadoop.hive.ql.parse.PartitionTransform.PartitionTransformSpec}.
* {@link PartitionTransformSpec}.
* @param configuration a Hadoop configuration
* @param schema iceberg table schema
* @return iceberg partition spec, always non-null
*/
public static PartitionSpec spec(Configuration configuration, Schema schema) {
List<PartitionTransform.PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil
List<PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil
.getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
.map(o -> (List<PartitionTransform.PartitionTransformSpec>) o).orElseGet(() -> null);
.map(o -> (List<PartitionTransformSpec>) o).orElseGet(() -> null);

if (partitionTransformSpecList == null) {
LOG.debug("Iceberg partition transform spec is not found in QueryState.");
@@ -85,27 +85,27 @@ public static PartitionSpec spec(Configuration configuration, Schema schema) {

PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
partitionTransformSpecList.forEach(spec -> {
switch (spec.transformType) {
switch (spec.getTransformType()) {
case IDENTITY:
builder.identity(spec.name);
builder.identity(spec.getColumnName());
break;
case YEAR:
builder.year(spec.name);
builder.year(spec.getColumnName());
break;
case MONTH:
builder.month(spec.name);
builder.month(spec.getColumnName());
break;
case DAY:
builder.day(spec.name);
builder.day(spec.getColumnName());
break;
case HOUR:
builder.hour(spec.name);
builder.hour(spec.getColumnName());
break;
case TRUNCATE:
builder.truncate(spec.name, spec.transformParam.get());
builder.truncate(spec.getColumnName(), spec.getTransformParam().get());
break;
case BUCKET:
builder.bucket(spec.name, spec.transformParam.get());
builder.bucket(spec.getColumnName(), spec.getTransformParam().get());
break;
}
});
@@ -134,34 +134,34 @@ public static void updateSpec(Configuration configuration, Table table) {
updatePartitionSpec.apply();

// add new partitions which are not yet present
List<PartitionTransform.PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil
List<PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil
.getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
.map(o -> (List<PartitionTransform.PartitionTransformSpec>) o).orElseGet(() -> null);
.map(o -> (List<PartitionTransformSpec>) o).orElseGet(() -> null);
IntStream.range(0, partitionTransformSpecList.size())
.filter(i -> !intersectingPartitionNames.contains(newPartitionSpec.fields().get(i).name()))
.forEach(i -> {
PartitionTransform.PartitionTransformSpec spec = partitionTransformSpecList.get(i);
switch (spec.transformType) {
PartitionTransformSpec spec = partitionTransformSpecList.get(i);
switch (spec.getTransformType()) {
case IDENTITY:
updatePartitionSpec.addField(spec.name);
updatePartitionSpec.addField(spec.getColumnName());
break;
case YEAR:
updatePartitionSpec.addField(Expressions.year(spec.name));
updatePartitionSpec.addField(Expressions.year(spec.getColumnName()));
break;
case MONTH:
updatePartitionSpec.addField(Expressions.month(spec.name));
updatePartitionSpec.addField(Expressions.month(spec.getColumnName()));
break;
case DAY:
updatePartitionSpec.addField(Expressions.day(spec.name));
updatePartitionSpec.addField(Expressions.day(spec.getColumnName()));
break;
case HOUR:
updatePartitionSpec.addField(Expressions.hour(spec.name));
updatePartitionSpec.addField(Expressions.hour(spec.getColumnName()));
break;
case TRUNCATE:
updatePartitionSpec.addField(Expressions.truncate(spec.name, spec.transformParam.get()));
updatePartitionSpec.addField(Expressions.truncate(spec.getColumnName(), spec.getTransformParam().get()));
break;
case BUCKET:
updatePartitionSpec.addField(Expressions.bucket(spec.name, spec.transformParam.get()));
updatePartitionSpec.addField(Expressions.bucket(spec.getColumnName(), spec.getTransformParam().get()));
break;
}
});
@@ -763,7 +763,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
Assert.assertEquals(expectedIcebergProperties, icebergTable.properties());

if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
Assert.assertEquals(11, hmsParams.size());
Assert.assertEquals(10, hmsParams.size());
Assert.assertEquals("initial_val", hmsParams.get("custom_property"));
Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL"));
@@ -776,10 +776,9 @@ public void testIcebergAndHmsTableProperties() throws Exception {
getCurrentSnapshotForHiveCatalogTable(icebergTable));
Assert.assertNull(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP));
Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME));
Assert.assertNotNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC));
Assert.assertNotNull(hmsParams.get(serdeConstants.SERIALIZATION_FORMAT));
} else {
Assert.assertEquals(8, hmsParams.size());
Assert.assertEquals(7, hmsParams.size());
Assert.assertNull(hmsParams.get(TableProperties.ENGINE_HIVE_ENABLED));
}

@@ -802,7 +801,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
Assert.assertEquals(14, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals(13, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals("true", hmsParams.get("new_prop_1"));
Assert.assertEquals("false", hmsParams.get("new_prop_2"));
Assert.assertEquals("new_val", hmsParams.get("custom_property"));
@@ -812,7 +811,7 @@ public void testIcebergAndHmsTableProperties() throws Exception {
Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP), prevSnapshot);
Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP), newSnapshot);
} else {
Assert.assertEquals(8, hmsParams.size());
Assert.assertEquals(7, hmsParams.size());
}

// Remove some Iceberg props and see if they're removed from HMS table props as well
@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS ice_t;
CREATE EXTERNAL TABLE ice_t (i int, s string, ts timestamp, d date) STORED BY ICEBERG;
SHOW CREATE TABLE ice_t;

DROP TABLE IF EXISTS ice_t_transform;
CREATE EXTERNAL TABLE ice_t_transform (year_field date, month_field date, day_field date, hour_field timestamp, truncate_field string, bucket_field int, identity_field int) PARTITIONED BY SPEC (year(year_field), month(month_field), day(day_field), hour(hour_field), truncate(2, truncate_field), bucket(2, bucket_field), identity_field) STORED BY ICEBERG;
SHOW CREATE TABLE ice_t_transform;

DROP TABLE IF EXISTS ice_t_transform_prop;
CREATE EXTERNAL TABLE ice_t_transform_prop (id int, year_field date, month_field date, day_field date, hour_field timestamp, truncate_field string, bucket_field int, identity_field int) STORED BY ICEBERG TBLPROPERTIES ('iceberg.mr.table.partition.spec'='{"spec-id":0,"fields":[{"name":"year_field_year","transform":"year","source-id":1,"field-id":1000},{"name":"month_field_month","transform":"month","source-id":2,"field-id":1001},{"name":"day_field_day","transform":"day","source-id":3,"field-id":1002},{"name":"hour_field_hour","transform":"hour","source-id":4,"field-id":1003},{"name":"truncate_field_trunc","transform":"truncate[2]","source-id":5,"field-id":1004},{"name":"bucket_field_bucket","transform":"bucket[2]","source-id":6,"field-id":1005},{"name":"identity_field","transform":"identity","source-id":7,"field-id":1006}]}');
SHOW CREATE TABLE ice_t_transform_prop;

DROP TABLE IF EXISTS ice_t_identity_part;
CREATE EXTERNAL TABLE ice_t_identity_part (a int) PARTITIONED BY (b string) STORED BY ICEBERG;
SHOW CREATE TABLE ice_t_identity_part;

0 comments on commit 9671034

Please sign in to comment.