Skip to content
Permalink
Browse files
HIVE-26157: Change Iceberg storage handler authz URI to metadata loca…
…tion. (#3226) (Laszlo Pinter, reviewed by Peter Vary)
  • Loading branch information
lcspinter committed May 13, 2022
1 parent edd36e5 commit 3b3da9ed7f3813bae3e959670df55682fea648d3
Showing 6 changed files with 154 additions and 5 deletions.
@@ -41,6 +41,7 @@
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
@@ -78,6 +79,7 @@
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpecParser;
@@ -457,9 +459,24 @@ public boolean isValidMetadataTable(String metaTableName) {
public URI getURIForAuth(org.apache.hadoop.hive.metastore.api.Table hmsTable) throws URISyntaxException {
String dbName = hmsTable.getDbName();
String tableName = hmsTable.getTableName();
return new URI(ICEBERG_URI_PREFIX + dbName + "/" + tableName);
StringBuilder authURI = new StringBuilder(ICEBERG_URI_PREFIX).append(dbName).append("/").append(tableName)
.append("?snapshot=");
Optional<String> locationProperty = SessionStateUtil.getProperty(conf, hive_metastoreConstants.META_TABLE_LOCATION);
if (locationProperty.isPresent()) {
Preconditions.checkArgument(locationProperty.get() != null,
"Table location is not set in SessionState. Authorization URI cannot be supplied.");
// this property is set during the create operation before the hive table was created
// we are returning a dummy iceberg metadata file
authURI.append(URI.create(locationProperty.get()).getPath()).append("/metadata/dummy.metadata.json");
} else {
Table table = IcebergTableUtil.getTable(conf, hmsTable);
authURI.append(URI.create(((BaseTable) table).operations().current().metadataFileLocation()).getPath());
}
LOG.debug("Iceberg storage handler authorization URI {}", authURI);
return new URI(HiveConf.EncoderDecoderFactory.URL_ENCODER_DECODER.encode(authURI.toString()));
}


@Override
public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException {
HiveStorageHandler.super.validateSinkDesc(sinkDesc);
@@ -30,8 +30,10 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -43,6 +45,25 @@ private IcebergTableUtil() {

}

/**
* Constructs the table properties needed for the Iceberg table loading by retrieving the information from the
* hmsTable. It then calls {@link IcebergTableUtil#getTable(Configuration, Properties)} with these properties.
* @param configuration a Hadoop configuration
* @param hmsTable the HMS table
* @return the Iceberg table
*/
static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) {
Properties properties = new Properties();
properties.setProperty(Catalogs.NAME, TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString());
properties.setProperty(Catalogs.LOCATION, hmsTable.getSd().getLocation());
hmsTable.getParameters().computeIfPresent(InputFormatConfig.CATALOG_NAME,
(k, v) -> {
properties.setProperty(k, v);
return v;
});
return getTable(configuration, properties);
}

/**
* Load the iceberg table either from the {@link QueryState} or through the configured catalog. Look for the table
* object stored in the query state. If it's null, it means the table was not loaded yet within the same query
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFactory;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerImpl;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
import org.mockito.Mockito;

public class CustomTestHiveAuthorizerFactory implements HiveAuthorizerFactory {

private static HiveAuthorizer authorizer;

@Override
public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory metastoreClientFactory,
HiveConf conf, HiveAuthenticationProvider hiveAuthenticator, HiveAuthzSessionContext ctx) {
authorizer = Mockito.mock(HiveAuthorizerImpl.class);
return authorizer;
}

public static HiveAuthorizer getAuthorizer() {
return authorizer;
}
}
@@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -37,6 +38,9 @@
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseMetastoreTableOperations;
@@ -79,6 +83,8 @@
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.types.Types.NestedField.optional;
@@ -128,7 +134,6 @@ public static Collection<Object[]> parameters() {
for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) {
testParams.add(new Object[] {testTableType});
}

return testParams;
}

@@ -1412,15 +1417,60 @@ public void testCommandsWithPartitionClauseThrow() {
@Test
public void testAuthzURI() throws TException, InterruptedException, URISyntaxException {
TableIdentifier target = TableIdentifier.of("default", "target");
testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
Table table = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
PartitionSpec.unpartitioned(), FileFormat.PARQUET, ImmutableList.of());
org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable(target);

HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
storageHandler.setConf(shell.getHiveConf());
URI uriForAuth = storageHandler.getURIForAuth(hmsTable);

Assert.assertEquals("iceberg://" + hmsTable.getDbName() + "/" + hmsTable.getTableName(), uriForAuth.toString());
Assert.assertEquals("iceberg://" + target.namespace() + "/" + target.name() + "?snapshot=" +
URI.create(((BaseTable) table).operations().current().metadataFileLocation()).getPath(),
HiveConf.EncoderDecoderFactory.URL_ENCODER_DECODER.decode(uriForAuth.toString()));

}

@Test
public void testAuthzURIWithAuthEnabledAndMockCommandAuthorizer() throws HiveException {
shell.setHiveSessionValue("hive.security.authorization.enabled", true);
shell.setHiveSessionValue("hive.security.authorization.manager",
"org.apache.iceberg.mr.hive.CustomTestHiveAuthorizerFactory");
TableIdentifier target = TableIdentifier.of("default", "target");
Table table = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
PartitionSpec.unpartitioned(), FileFormat.PARQUET, ImmutableList.of());
HiveAuthorizer authorizer = CustomTestHiveAuthorizerFactory.getAuthorizer();
ArgumentCaptor<List<HivePrivilegeObject>> outputHObjsCaptor = ArgumentCaptor.forClass(List.class);
Mockito.verify(authorizer).checkPrivileges(Mockito.any(), Mockito.any(), outputHObjsCaptor.capture(),
Mockito.any());
Optional<HivePrivilegeObject> hivePrivObject = outputHObjsCaptor.getValue().stream()
.filter(hpo -> hpo.getType().equals(HivePrivilegeObject.HivePrivilegeObjectType.STORAGEHANDLER_URI)).findAny();
if (hivePrivObject.isPresent()) {
Assert.assertEquals("iceberg://" + target.namespace() + "/" + target.name() + "?snapshot=" +
new Path(((BaseTable) table).operations().current().metadataFileLocation()).getParent().toUri()
.getPath() +
"/dummy.metadata.json",
HiveConf.EncoderDecoderFactory.URL_ENCODER_DECODER.decode(hivePrivObject.get().getObjectName()));
} else {
Assert.fail("StorageHandler auth URI is not found");
}
}

@Test
public void testAuthzURIWithAuthEnabled() throws TException, InterruptedException, URISyntaxException {
shell.setHiveSessionValue("hive.security.authorization.enabled", true);
TableIdentifier target = TableIdentifier.of("default", "target");
Table table = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
PartitionSpec.unpartitioned(), FileFormat.PARQUET, ImmutableList.of());
org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable(target);

HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
storageHandler.setConf(shell.getHiveConf());
URI uriForAuth = storageHandler.getURIForAuth(hmsTable);
Assert.assertEquals("iceberg://" + target.namespace() + "/" + target.name() + "?snapshot=" +
URI.create(((BaseTable) table).operations().current()
.metadataFileLocation()).getPath(),
HiveConf.EncoderDecoderFactory.URL_ENCODER_DECODER.decode(uriForAuth.toString()));
}

@Test
@@ -13717,6 +13717,22 @@ ASTNode analyzeCreateTable(
// outputs is empty, which means this create table happens in the current
// database.
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), crtTblDesc)));
String tblLocation = null;
if (location != null) {
tblLocation = location;
} else {
try {
Warehouse wh = new Warehouse(conf);
tblLocation = wh.getDefaultTablePath(db.getDatabase(qualifiedTabName.getDb()), qualifiedTabName.getTable(),
isExt).toUri().getPath();
} catch (MetaException | HiveException e) {
throw new SemanticException(e);
}
}
if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.META_TABLE_LOCATION, tblLocation)) {
throw new SemanticException(
"Query state attached to Session state must be not null. Table location cannot be saved.");
}
break;
case ctt: // CREATE TRANSACTIONAL TABLE
if (isExt && !isDefaultTableTypeChanged) {
@@ -101,7 +101,7 @@ public static boolean addCommitInfo(Configuration conf, String tableName, String

private static Optional<QueryState> getQueryState(Configuration conf) {
return Optional.ofNullable(SessionState.get())
.map(session -> session.getQueryState(conf.get(HiveConf.ConfVars.HIVEQUERYID.varname)));
.map(session -> session.getQueryState(conf.get(HiveConf.ConfVars.HIVEQUERYID.varname, "")));
}

/**

0 comments on commit 3b3da9e

Please sign in to comment.