Skip to content
Permalink
Browse files
HIVE-25179: Support all partition transforms for Iceberg in create ta…
…ble (#2333) (Laszlo Pinter, reviewed by Adam Szita, Marton Bod and Peter Vary)
  • Loading branch information
lcspinter committed Jun 7, 2021
1 parent 323d12f commit eedcd82bc2d61861a27205f925ba0ffab9b6bca8
Showing 18 changed files with 369 additions and 23 deletions.
@@ -135,7 +135,7 @@ public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
// Iceberg schema and specification generated by the code

Schema schema = schema(catalogProperties, hmsTable);
PartitionSpec spec = spec(schema, catalogProperties, hmsTable);
PartitionSpec spec = spec(conf, schema, catalogProperties, hmsTable);

// If there are partition keys specified remove them from the HMS table and add them to the column list
if (hmsTable.isSetPartitionKeys()) {
@@ -233,7 +233,7 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
preAlterTableProperties.tableLocation = sd.getLocation();
preAlterTableProperties.format = sd.getInputFormat();
preAlterTableProperties.schema = schema(catalogProperties, hmsTable);
preAlterTableProperties.spec = spec(preAlterTableProperties.schema, catalogProperties, hmsTable);
preAlterTableProperties.spec = spec(conf, preAlterTableProperties.schema, catalogProperties, hmsTable);
preAlterTableProperties.partitionKeys = hmsTable.getPartitionKeys();

context.getProperties().put(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE, "true");
@@ -402,9 +402,17 @@ private Schema schema(Properties properties, org.apache.hadoop.hive.metastore.ap
}
}

private static PartitionSpec spec(Schema schema, Properties properties,
private static PartitionSpec spec(Configuration configuration, Schema schema, Properties properties,
org.apache.hadoop.hive.metastore.api.Table hmsTable) {

PartitionSpec spec = IcebergTableUtil.spec(configuration, schema);
if (spec != null) {
Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || hmsTable.getPartitionKeys().isEmpty(),
"Provide only one of the following: Hive partition transform specification, or the " +
InputFormatConfig.PARTITION_SPEC + " property");
return spec;
}

if (hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC) != null) {
Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || hmsTable.getPartitionKeys().isEmpty(),
"Provide only one of the following: Hive partition specification, or the " +
@@ -254,6 +254,11 @@ public LockType getLockType(WriteEntity writeEntity) {
return LockType.SHARED_READ;
}

@Override
public boolean supportsPartitionTransform() {
return true;
}

public boolean addDynamicSplitPruningEdge(org.apache.hadoop.hive.ql.metadata.Table table,
ExprNodeDesc syntheticFilterPredicate) {
try {
@@ -19,11 +19,15 @@

package org.apache.iceberg.mr.hive;

import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.mr.Catalogs;
import org.slf4j.Logger;
@@ -46,27 +50,60 @@ private IcebergTableUtil() {
* @return an Iceberg table
*/
static Table getTable(Configuration configuration, Properties properties) {
Table table = null;
QueryState queryState = null;
String tableIdentifier = properties.getProperty(Catalogs.NAME);
if (SessionState.get() != null) {
queryState = SessionState.get().getQueryState(configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname));
if (queryState != null) {
table = (Table) queryState.getResource(tableIdentifier);
} else {
LOG.debug("QueryState is not available in SessionState. Loading {} from configured catalog.", tableIdentifier);
}
} else {
LOG.debug("SessionState is not available. Loading {} from configured catalog.", tableIdentifier);
}
return SessionStateUtil.getResource(configuration, tableIdentifier).filter(o -> o instanceof Table)
.map(o -> (Table) o).orElseGet(() -> {
LOG.debug("Iceberg table {} is not found in QueryState. Loading table from configured catalog",
tableIdentifier);
Table tab = Catalogs.loadTable(configuration, properties);
SessionStateUtil.addResource(configuration, tableIdentifier, tab);
return tab;
});
}

if (table == null) {
table = Catalogs.loadTable(configuration, properties);
if (queryState != null) {
queryState.addResource(tableIdentifier, table);
}
/**
* Create {@link PartitionSpec} based on the partition information stored in
* {@link org.apache.hadoop.hive.ql.parse.PartitionTransform.PartitionTransformSpec}.
* @param configuration a Hadoop configuration
* @param schema iceberg table schema
* @return iceberg partition spec, always non-null
*/
public static PartitionSpec spec(Configuration configuration, Schema schema) {
List<PartitionTransform.PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil
.getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
.map(o -> (List<PartitionTransform.PartitionTransformSpec>) o).orElseGet(() -> null);

if (partitionTransformSpecList == null) {
LOG.debug("Iceberg partition transform spec is not found in QueryState.");
return null;
}

return table;
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
partitionTransformSpecList.forEach(spec -> {
switch (spec.transformType) {
case IDENTITY:
builder.identity(spec.name);
break;
case YEAR:
builder.year(spec.name);
break;
case MONTH:
builder.month(spec.name);
break;
case DAY:
builder.day(spec.name);
break;
case HOUR:
builder.hour(spec.name);
break;
case TRUNCATE:
builder.truncate(spec.name, spec.transformParam.get());
break;
case BUCKET:
builder.bucket(spec.name, spec.transformParam.get());
break;
}
});
return builder.build();
}
}
@@ -155,6 +155,41 @@ public void after() throws Exception {
HiveIcebergStorageHandlerTestUtils.close(shell);
}

@Test
public void testPartitionTransform() {
Schema schema = new Schema(
optional(1, "id", Types.LongType.get()),
optional(2, "year_field", Types.DateType.get()),
optional(3, "month_field", Types.TimestampType.withZone()),
optional(4, "day_field", Types.TimestampType.withoutZone()),
optional(5, "hour_field", Types.TimestampType.withoutZone()),
optional(6, "truncate_field", Types.StringType.get()),
optional(7, "bucket_field", Types.StringType.get()),
optional(8, "identity_field", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema)
.year("year_field")
.month("month_field")
.day("day_field")
.hour("hour_field")
.truncate("truncate_field", 2)
.bucket("bucket_field", 2)
.identity("identity_field")
.build();

TableIdentifier identifier = TableIdentifier.of("default", "part_test");
shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
" PARTITIONED BY SPEC (year(year_field), month(month_field), day(day_field), hour(hour_field), " +
"truncate(2, truncate_field), bucket(2, bucket_field), identity_field)" +
" STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(schema) + "', " +
"'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
Table table = testTables.loadTable(identifier);
Assert.assertEquals(spec, table.spec());
}

@Test
public void testCreateDropTable() throws TException, IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
@@ -385,6 +385,7 @@ KW_DATACONNECTORS: 'CONNECTORS';
KW_TYPE: 'TYPE';
KW_URL: 'URL';
KW_REMOTE: 'REMOTE';
KW_SPEC: 'SPEC';

// Operators
// NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.
@@ -238,6 +238,7 @@ TOK_TABCOLLIST;
TOK_TABCOL;
TOK_TABLECOMMENT;
TOK_TABLEPARTCOLS;
TOK_TABLEPARTCOLSBYSPEC;
TOK_TABLEPARTCOLNAMES;
TOK_TABLEROWFORMAT;
TOK_TABLEROWFORMATFIELD;
@@ -487,6 +488,13 @@ TOK_PARAMETER;
TOK_PARAMETER_IDX;
TOK_RESPECT_NULLS;
TOK_IGNORE_NULLS;
TOK_IDENTITY;
TOK_YEAR;
TOK_MONTH;
TOK_DAY;
TOK_HOUR;
TOK_TRUNCATE;
TOK_BUCKET;
}


@@ -690,6 +698,13 @@ import org.apache.hadoop.hive.conf.HiveConf;
xlateMap.put("KW_DATACONNECTOR", "CONNECTOR");
xlateMap.put("KW_DATACONNECTORS", "CONNECTORS");
xlateMap.put("KW_REMOTE", "REMOTE");
xlateMap.put("KW_SPEC", "SPEC");
xlateMap.put("KW_YEAR", "YEAR");
xlateMap.put("KW_MONTH", "MONTH");
xlateMap.put("KW_DAY", "DAY");
xlateMap.put("KW_HOUR", "HOUR");
xlateMap.put("KW_BUCKET", "BUCKET");
xlateMap.put("KW_TRUNCATE", "TRUNCATE");

// Operators
xlateMap.put("DOT", ".");
@@ -1808,6 +1823,8 @@ createTablePartitionSpec
: KW_PARTITIONED KW_BY LPAREN (opt1 = createTablePartitionColumnTypeSpec | opt2 = createTablePartitionColumnSpec) RPAREN
-> {$opt1.tree != null}? $opt1
-> $opt2
| KW_PARTITIONED KW_BY KW_SPEC LPAREN (spec = createTablePartitionTransformSpec) RPAREN
-> $spec
;
createTablePartitionColumnTypeSpec
@@ -1824,6 +1841,46 @@ createTablePartitionColumnSpec
-> ^(TOK_TABLEPARTCOLNAMES columnName+)
;
createTablePartitionTransformSpec
@init { pushMsg("create table partition by specification", state); }
@after { popMsg(state); }
: columnNameTransformConstraint (COMMA columnNameTransformConstraint)*
-> ^(TOK_TABLEPARTCOLSBYSPEC columnNameTransformConstraint+)
;
columnNameTransformConstraint
@init { pushMsg("column transform specification", state); }
@after { popMsg(state); }
: partitionTransformType
-> ^(TOK_TABCOL partitionTransformType)
;
partitionTransformType
@init {pushMsg("partitition transform type specification", state); }
@after { popMsg(state); }
: columnName
-> {containExcludedCharForCreateTableColumnName($columnName.text)}? {throwColumnNameException()}
-> ^(TOK_IDENTITY columnName)
| KW_YEAR LPAREN columnName RPAREN
-> {containExcludedCharForCreateTableColumnName($columnName.text)}? {throwColumnNameException()}
-> ^(TOK_YEAR columnName)
| KW_MONTH LPAREN columnName RPAREN
-> {containExcludedCharForCreateTableColumnName($columnName.text)}? {throwColumnNameException()}
-> ^(TOK_MONTH columnName)
| KW_DAY LPAREN columnName RPAREN
-> {containExcludedCharForCreateTableColumnName($columnName.text)}? {throwColumnNameException()}
-> ^(TOK_DAY columnName)
| KW_HOUR LPAREN columnName RPAREN
-> {containExcludedCharForCreateTableColumnName($columnName.text)}? {throwColumnNameException()}
-> ^(TOK_HOUR columnName)
| KW_TRUNCATE LPAREN value = Number COMMA columnName RPAREN
-> {containExcludedCharForCreateTableColumnName($columnName.text)}? {throwColumnNameException()}
-> ^(TOK_TRUNCATE $value columnName)
| KW_BUCKET LPAREN value = Number COMMA columnName RPAREN
-> {containExcludedCharForCreateTableColumnName($columnName.text)}? {throwColumnNameException()}
-> ^(TOK_BUCKET $value columnName)
;
tableBuckets
@init { pushMsg("table buckets specification", state); }
@after { popMsg(state); }
@@ -960,6 +960,7 @@ nonReserved
| KW_UNKNOWN
| KW_WITHIN
| KW_TRIM
| KW_SPEC
;

//The following SQL2011 reserved keywords are used as function name only, but not as identifiers.
@@ -247,4 +247,12 @@ default boolean directInsertCTAS() {
default boolean alwaysUnpartitioned() {
return false;
}

/**
* Check if the underlying storage handler implementation support partition transformations.
* @return true if the storage handler can support it
*/
default boolean supportsPartitionTransform() {
return false;
}
}
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PartitionTransform {

private static final Map<Integer, TransformType> TRANSFORMS = Stream
.of(new Object[][] { { HiveParser.TOK_IDENTITY, TransformType.IDENTITY },
{ HiveParser.TOK_YEAR, TransformType.YEAR }, { HiveParser.TOK_MONTH, TransformType.MONTH },
{ HiveParser.TOK_DAY, TransformType.DAY }, { HiveParser.TOK_HOUR, TransformType.HOUR },
{ HiveParser.TOK_TRUNCATE, TransformType.TRUNCATE }, { HiveParser.TOK_BUCKET, TransformType.BUCKET } })
.collect(Collectors.toMap(e -> (Integer) e[0], e -> (TransformType) e[1]));

/**
* Parse the partition transform specifications from the AST Tree node.
* @param node AST Tree node, must be not null
* @return list of partition transforms
*/
public static List<PartitionTransformSpec> getPartitionTransformSpec(ASTNode node) {
List<PartitionTransformSpec> partSpecList = new ArrayList<>();
for (int i = 0; i < node.getChildCount(); i++) {
PartitionTransformSpec spec = new PartitionTransformSpec();
ASTNode child = (ASTNode) node.getChild(i);
for (int j = 0; j < child.getChildCount(); j++) {
ASTNode grandChild = (ASTNode) child.getChild(j);
switch (grandChild.getToken().getType()) {
case HiveParser.TOK_IDENTITY:
case HiveParser.TOK_YEAR:
case HiveParser.TOK_MONTH:
case HiveParser.TOK_DAY:
case HiveParser.TOK_HOUR:
spec.name = grandChild.getChild(0).getText();
spec.transformType = TRANSFORMS.get(grandChild.getToken().getType());
break;
case HiveParser.TOK_TRUNCATE:
case HiveParser.TOK_BUCKET:
spec.transformType = TRANSFORMS.get(grandChild.getToken().getType());
spec.transformParam = Optional.ofNullable(Integer.valueOf(grandChild.getChild(0).getText()));
spec.name = grandChild.getChild(1).getText();
break;
}
}
partSpecList.add(spec);
}

return partSpecList;
}

public enum TransformType {
IDENTITY, YEAR, MONTH, DAY, HOUR, TRUNCATE, BUCKET
}

public static class PartitionTransformSpec {
public String name;
public TransformType transformType;
public Optional<Integer> transformParam;
}
}

0 comments on commit eedcd82

Please sign in to comment.