Skip to content
Permalink
Browse files
HIVE-25102: Cache Iceberg table objects within same query (#2261) (La…
…szlo Pinter, reviewed by Marton Bod and Peter Vary)
  • Loading branch information
lcspinter committed May 28, 2021
1 parent b1929f3 commit 6d850940e974f63ea97afa2429fec4409d5e05c6
Showing 8 changed files with 117 additions and 7 deletions.
@@ -87,8 +87,8 @@ public static Table loadTable(Configuration conf) {
* The table identifier ({@link Catalogs#NAME}) and the catalog name ({@link InputFormatConfig#CATALOG_NAME}),
* or table path ({@link Catalogs#LOCATION}) should be specified by the controlling properties.
* <p>
* Used by HiveIcebergSerDe and HiveIcebergStorageHandler
* @param conf a Hadoop
* Used by HiveIcebergSerDe and HiveIcebergStorageHandler.
* @param conf a Hadoop configuration
* @param props the controlling properties
* @return an Iceberg table
*/
@@ -111,7 +111,8 @@ public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)

// If not using HiveCatalog check for existing table
try {
this.icebergTable = Catalogs.loadTable(conf, catalogProperties);

this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties);

Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA) == null,
"Iceberg table already created - can not use provided schema");
@@ -169,7 +170,7 @@ public void preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
if (deleteIcebergTable && Catalogs.hiveCatalog(conf, catalogProperties)) {
// Store the metadata and the id for deleting the actual table data
String metadataLocation = hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
this.deleteIo = Catalogs.loadTable(conf, catalogProperties).io();
this.deleteIo = IcebergTableUtil.getTable(conf, catalogProperties).io();
this.deleteMetadata = TableMetadataParser.read(deleteIo, metadataLocation);
}
}
@@ -207,7 +208,7 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
super.preAlterTable(hmsTable, context);
catalogProperties = getCatalogProperties(hmsTable);
try {
icebergTable = Catalogs.loadTable(conf, catalogProperties);
icebergTable = IcebergTableUtil.getTable(conf, catalogProperties);
} catch (NoSuchTableException nte) {
// If the iceberg table does not exist, and the hms table is external and not temporary and not acid
// we will create it in commitAlterTable
@@ -98,8 +98,8 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr
}
} else {
try {
Table table = IcebergTableUtil.getTable(configuration, serDeProperties);
// always prefer the original table schema if there is one
Table table = Catalogs.loadTable(configuration, serDeProperties);
this.tableSchema = table.schema();
this.partitionColumns = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
LOG.info("Using schema from existing table {}", SchemaParser.toJson(tableSchema));
@@ -329,7 +329,7 @@ public static Schema schema(Configuration config) {
@VisibleForTesting
static void overlayTableProperties(Configuration configuration, TableDesc tableDesc, Map<String, String> map) {
Properties props = tableDesc.getProperties();
Table table = Catalogs.loadTable(configuration, props);
Table table = IcebergTableUtil.getTable(configuration, props);
String schemaJson = SchemaParser.toJson(table.schema());

Maps.fromProperties(props).entrySet().stream()
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.iceberg.Table;
import org.apache.iceberg.mr.Catalogs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergTableUtil {

private static final Logger LOG = LoggerFactory.getLogger(IcebergTableUtil.class);

private IcebergTableUtil() {

}

/**
* Load the iceberg table either from the {@link QueryState} or through the configured catalog. Look for the table
* object stored in the query state. If it's null, it means the table was not loaded yet within the same query
* therefore we claim it through the Catalogs API and then store it in query state.
* @param configuration a Hadoop configuration
* @param properties controlling properties
* @return an Iceberg table
*/
static Table getTable(Configuration configuration, Properties properties) {
Table table = null;
QueryState queryState = null;
String tableIdentifier = properties.getProperty(Catalogs.NAME);
if (SessionState.get() != null) {
queryState = SessionState.get().getQueryState(configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname));
if (queryState != null) {
table = (Table) queryState.getResource(tableIdentifier);
} else {
LOG.debug("QueryState is not available in SessionState. Loading {} from configured catalog.", tableIdentifier);
}
} else {
LOG.debug("SessionState is not available. Loading {} from configured catalog.", tableIdentifier);
}

if (table == null) {
table = Catalogs.loadTable(configuration, properties);
if (queryState != null) {
queryState.addResource(tableIdentifier, table);
}
}

return table;
}
}
@@ -117,6 +117,9 @@ public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager txnMana
driverContext = new DriverContext(queryState, queryInfo, new HookRunner(queryState.getConf(), CONSOLE),
txnManager);
driverTxnHandler = new DriverTxnHandler(driverContext, driverState);
if (SessionState.get() != null) {
SessionState.get().addQueryState(getConf().get(HiveConf.ConfVars.HIVEQUERYID.varname), queryState);
}
}

@Override
@@ -213,6 +216,11 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command
releaseResources();
}

if (SessionState.get() != null) {
// Remove any query state reference from the session state
SessionState.get().removeQueryState(getConf().get(HiveConf.ConfVars.HIVEQUERYID.varname));
}

driverState.executionFinishedWithLocking(isFinishedWithError);
}

@@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql;

import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
@@ -59,6 +60,11 @@ public class QueryState {

static public final String USERID_TAG = "userid";

/**
* map of resources involved in the query.
*/
private final Map<String, Object> resourceMap = new HashMap<>();

/**
* Private constructor, use QueryState.Builder instead.
* @param conf The query specific configuration object
@@ -141,6 +147,14 @@ public static void setApplicationTag(HiveConf queryConf, String queryTag) {
queryConf.set(TezConfiguration.TEZ_APPLICATION_TAGS, jobTag);
}

public void addResource(String resourceIdentifier, Object resource) {
resourceMap.put(resourceIdentifier, resource);
}

public Object getResource(String resourceIdentifier) {
return resourceMap.get(resourceIdentifier);
}

/**
* Generating the new QueryState object. Making sure, that the new queryId is generated.
* @param conf The HiveConf which should be used
@@ -74,6 +74,7 @@
import org.apache.hadoop.hive.metastore.cache.CachedStore;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.cleanup.CleanupService;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.cleanup.SyncCleanupService;
@@ -349,6 +350,20 @@ public class SessionState implements ISessionAuthState{

private final AtomicLong sparkSessionId = new AtomicLong();

private final Map<String, QueryState> queryStateMap = new HashMap<>();

public QueryState getQueryState(String queryId) {
return queryStateMap.get(queryId);
}

public void addQueryState(String queryId, QueryState queryState) {
queryStateMap.put(queryId, queryState);
}

public void removeQueryState(String queryId) {
queryStateMap.remove(queryId);
}

@Override
public HiveConf getConf() {
return sessionConf;

0 comments on commit 6d85094

Please sign in to comment.