diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3b422105668b..605f6f51772b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -5693,10 +5693,6 @@ public static enum ConfVars {
HIVE_SERVER2_ICEBERG_METADATA_GENERATOR_THREADS("hive.server2.iceberg.metadata.generator.threads", 10,
"Number of threads used to scan partition directories for data files and update/generate iceberg metadata"),
- HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES("hive.iceberg.metadata.refresh.max.retries", 2,
- "Max retry count for trying to access the metadata location in order to refresh metadata during " +
- " Iceberg table load."),
-
/* BLOBSTORE section */
HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",
diff --git a/iceberg/checkstyle/checkstyle.xml b/iceberg/checkstyle/checkstyle.xml
index 2911b45ed25b..a288af5de908 100644
--- a/iceberg/checkstyle/checkstyle.xml
+++ b/iceberg/checkstyle/checkstyle.xml
@@ -46,6 +46,21 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 4737dd64d520..b97ff3daa8b3 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -36,10 +36,12 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -49,49 +51,31 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
+ public static final String LIST_ALL_TABLES = "list-all-tables";
+ public static final String LIST_ALL_TABLES_DEFAULT = "false";
+
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
private String name;
private Configuration conf;
private FileIO fileIO;
private ClientPool clients;
+ private boolean listAllTables = false;
public HiveCatalog() {
}
- /**
- * Hive Catalog constructor.
- *
- * @param conf Hadoop Configuration
- * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog. Will be removed in
- * v0.13.0
- */
- @Deprecated
- public HiveCatalog(Configuration conf) {
- this.name = "hive";
- this.conf = conf;
- this.fileIO = new HadoopFileIO(conf);
- Map properties = ImmutableMap.of(
- CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
- conf.get(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
- String.valueOf(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT)),
- CatalogProperties.CLIENT_POOL_SIZE,
- conf.get(CatalogProperties.CLIENT_POOL_SIZE,
- String.valueOf(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT))
- );
- this.clients = new CachedClientPool(conf, properties);
- }
-
@Override
public void initialize(String inputName, Map properties) {
this.name = inputName;
@@ -108,6 +92,8 @@ public void initialize(String inputName, Map properties) {
this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
}
+ this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
+
String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
@@ -122,12 +108,20 @@ public List listTables(Namespace namespace) {
try {
List tableNames = clients.run(client -> client.getAllTables(database));
- List tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
- List tableIdentifiers = tableObjects.stream()
- .filter(table -> table.getParameters() == null ? false : BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
- .equalsIgnoreCase(table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
- .map(table -> TableIdentifier.of(namespace, table.getTableName()))
- .collect(Collectors.toList());
+ List tableIdentifiers;
+
+ if (listAllTables) {
+ tableIdentifiers = tableNames.stream()
+ .map(t -> TableIdentifier.of(namespace, t))
+ .collect(Collectors.toList());
+ } else {
+ List tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
+ tableIdentifiers = tableObjects.stream()
+ .filter(table -> table.getParameters() != null && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
+ .equalsIgnoreCase(table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
+ .map(table -> TableIdentifier.of(namespace, table.getTableName()))
+ .collect(Collectors.toList());
+ }
LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, tableIdentifiers);
return tableIdentifiers;
@@ -235,6 +229,23 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
}
}
+ @Override
+ public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
+ Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
+
+ // Throw an exception if this table already exists in the catalog.
+ if (tableExists(identifier)) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
+ }
+
+ TableOperations ops = newTableOps(identifier);
+ InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
+ TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
+ ops.commit(null, metadata);
+
+ return new BaseTable(ops, identifier.toString());
+ }
+
@Override
public void createNamespace(Namespace namespace, Map meta) {
Preconditions.checkArgument(
@@ -533,4 +544,9 @@ public void setConf(Configuration conf) {
public Configuration getConf() {
return conf;
}
+
+ @VisibleForTesting
+ void setListAllTables(boolean listAllTables) {
+ this.listAllTables = listAllTables;
+ }
}
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java
deleted file mode 100644
index 4d8e0133ffae..000000000000
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.hive;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-
-public final class HiveCatalogs {
-
- private static final Cache CATALOG_CACHE = Caffeine.newBuilder().build();
-
- private HiveCatalogs() {
- }
-
- /**
- * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog. Will be removed in
- * v0.12.0
- */
- @Deprecated
- public static HiveCatalog loadCatalog(Configuration conf) {
- // metastore URI can be null in local mode
- String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
- return CATALOG_CACHE.get(metastoreUri, uri -> (HiveCatalog)
- CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), conf));
- }
-}
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index e322792ffcb8..e9a3c53519be 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -21,6 +21,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -35,9 +37,8 @@ public class HiveClientPool extends ClientPoolImpl
// use appropriate ctor depending on whether we're working with Hive1, Hive2, or Hive3 dependencies
// we need to do this because there is a breaking API change between Hive1, Hive2, and Hive3
private static final DynMethods.StaticMethod GET_CLIENT = DynMethods.builder("getProxy")
- .impl(RetryingMetaStoreClient.class, HiveConf.class)
- .impl(RetryingMetaStoreClient.class, HiveConf.class, Boolean.TYPE)
- .impl(RetryingMetaStoreClient.class, Configuration.class, Boolean.TYPE)
+ .impl(RetryingMetaStoreClient.class, HiveConf.class, HiveMetaHookLoader.class, String.class) // Hive 1 and 2
+ .impl(RetryingMetaStoreClient.class, Configuration.class, HiveMetaHookLoader.class, String.class) // Hive 3
.buildStatic();
private final HiveConf hiveConf;
@@ -53,7 +54,7 @@ public HiveClientPool(int poolSize, Configuration conf) {
protected IMetaStoreClient newClient() {
try {
try {
- return GET_CLIENT.invoke(hiveConf, true);
+ return GET_CLIENT.invoke(hiveConf, (HiveMetaHookLoader) tbl -> null, HiveMetaStoreClient.class.getName());
} catch (RuntimeException e) {
// any MetaException would be wrapped into RuntimeException during reflection, so let's double-check type here
if (e.getCause() instanceof MetaException) {
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java
index 5968693cce29..422546c11277 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaConverter.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.hive;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -30,6 +29,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
@@ -62,7 +62,7 @@ static Type convert(TypeInfo typeInfo, boolean autoConvert) {
}
List convertInternal(List names, List typeInfos, List comments) {
- List result = new ArrayList<>(names.size());
+ List result = Lists.newArrayListWithExpectedSize(names.size());
for (int i = 0; i < names.size(); ++i) {
result.add(Types.NestedField.optional(id++, names.get(i), convertType(typeInfos.get(i)),
comments.isEmpty() || i >= comments.size() ? null : comments.get(i)));
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index cc9ad46ac15b..0ebd0571a477 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.hive;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -31,6 +30,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -72,9 +72,9 @@ public static Schema convert(List fieldSchemas) {
* @return An equivalent Iceberg Schema
*/
public static Schema convert(List fieldSchemas, boolean autoConvert) {
- List names = new ArrayList<>(fieldSchemas.size());
- List typeInfos = new ArrayList<>(fieldSchemas.size());
- List comments = new ArrayList<>(fieldSchemas.size());
+ List names = Lists.newArrayListWithExpectedSize(fieldSchemas.size());
+ List typeInfos = Lists.newArrayListWithExpectedSize(fieldSchemas.size());
+ List comments = Lists.newArrayListWithExpectedSize(fieldSchemas.size());
for (FieldSchema col : fieldSchemas) {
names.add(col.getName());
@@ -237,10 +237,10 @@ public static Pair> getReorderedColumn(List missingFromFirst = new ArrayList<>();
- private final List missingFromSecond = new ArrayList<>();
- private final List typeChanged = new ArrayList<>();
- private final List commentChanged = new ArrayList<>();
+ private final List missingFromFirst = Lists.newArrayList();
+ private final List missingFromSecond = Lists.newArrayList();
+ private final List typeChanged = Lists.newArrayList();
+ private final List commentChanged = Lists.newArrayList();
public List getMissingFromFirst() {
return missingFromFirst;
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 3ad450cc440f..6afcd7140887 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -24,7 +24,6 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
@@ -36,9 +35,9 @@
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
@@ -61,6 +60,7 @@
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -86,10 +86,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+ private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries";
private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms";
private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+ private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;
private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
private static final BiMap ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of(
@@ -97,9 +99,6 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
GC_ENABLED, "external.table.purge"
);
- // Should be in org.apache.iceberg.hadoop.ConfigProperties, but that is not ported to Hive codebase
- public static final String KEEP_HIVE_STATS = "iceberg.hive.keep.stats";
-
private static Cache commitLockCache;
private static synchronized void initTableLevelLockCache(long evictionTimeout) {
@@ -140,6 +139,7 @@ private static class WaitingForLockException extends RuntimeException {
private final long lockAcquireTimeout;
private final long lockCheckMinWaitTime;
private final long lockCheckMaxWaitTime;
+ private final int metadataRefreshMaxRetries;
private final FileIO fileIO;
private final ClientPool metaClients;
@@ -157,6 +157,8 @@ protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO
conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
this.lockCheckMaxWaitTime =
conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+ this.metadataRefreshMaxRetries =
+ conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
long tableLevelLockCacheEvictionTimeout =
conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
@@ -195,16 +197,16 @@ protected void doRefresh() {
throw new RuntimeException("Interrupted during refresh", e);
}
- refreshFromMetadataLocation(metadataLocation, HiveConf.getIntVar(conf,
- HiveConf.ConfVars.HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES));
+ refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
}
@SuppressWarnings("checkstyle:CyclomaticComplexity")
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
- String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+ String newMetadataLocation = base == null && metadata.metadataFileLocation() != null ?
+ metadata.metadataFileLocation() : writeNewMetadata(metadata, currentVersion() + 1);
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
- boolean keepHiveStats = conf.getBoolean(KEEP_HIVE_STATS, false);
+ boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false);
CommitStatus commitStatus = CommitStatus.FAILURE;
boolean updateHiveTable = false;
@@ -253,7 +255,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
Map summary = Optional.ofNullable(metadata.currentSnapshot())
.map(Snapshot::summary)
.orElseGet(ImmutableMap::of);
- setHmsTableParameters(newMetadataLocation, tbl, metadata.properties(), removedProps, hiveEngineEnabled, summary);
+ setHmsTableParameters(newMetadataLocation, tbl, metadata, removedProps, hiveEngineEnabled, summary);
if (!keepHiveStats) {
StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
@@ -263,29 +265,32 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
try {
persistTable(tbl, updateHiveTable);
commitStatus = CommitStatus.SUCCESS;
- } catch (Throwable persistFailure) {
+ } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
+ throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName);
+
+ } catch (InvalidObjectException e) {
+ throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName);
+
+ } catch (Throwable e) {
+ if (e.getMessage() != null && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
+ throw new RuntimeException("Failed to acquire locks from metastore because the underlying metastore " +
+ "table 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not " +
+ "support transactions. To fix this use an alternative metastore.", e);
+ }
+
LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
- database, tableName, persistFailure);
+ database, tableName, e);
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
switch (commitStatus) {
case SUCCESS:
break;
case FAILURE:
- throw persistFailure;
+ throw e;
case UNKNOWN:
- throw new CommitStateUnknownException(persistFailure);
+ throw new CommitStateUnknownException(e);
}
}
- } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
- throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
-
} catch (TException | UnknownHostException e) {
- if (e.getMessage() != null && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
- throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't " +
- "exist, this probably happened when using embedded metastore or doesn't create a " +
- "transactional meta table. To fix this, use an alternative metastore", e);
- }
-
throw new RuntimeException(String.format("Metastore operation failed for %s.%s", database, tableName), e);
} catch (InterruptedException e) {
@@ -332,7 +337,7 @@ private Table newHmsTable() {
Integer.MAX_VALUE,
null,
Collections.emptyList(),
- new HashMap<>(),
+ Maps.newHashMap(),
null,
null,
TableType.EXTERNAL_TABLE.toString());
@@ -341,18 +346,21 @@ private Table newHmsTable() {
return newTable;
}
- private void setHmsTableParameters(String newMetadataLocation, Table tbl, Map icebergTableProps,
+ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableMetadata metadata,
Set obsoleteProps, boolean hiveEngineEnabled,
Map summary) {
Map parameters = Optional.ofNullable(tbl.getParameters())
- .orElseGet(HashMap::new);
+ .orElseGet(Maps::newHashMap);
// push all Iceberg table properties into HMS
- icebergTableProps.forEach((key, value) -> {
+ metadata.properties().forEach((key, value) -> {
// translate key names between Iceberg and HMS where needed
String hmsKey = ICEBERG_TO_HMS_TRANSLATION.getOrDefault(key, key);
parameters.put(hmsKey, value);
});
+ if (metadata.uuid() != null) {
+ parameters.put(TableProperties.UUID, metadata.uuid());
+ }
// remove any props from HMS that are no longer present in Iceberg table props
obsoleteProps.forEach(parameters::remove);
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java
index f98e1a5d90fe..b1fb891f3054 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveMetastoreTest.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.hive;
-import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -27,6 +26,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public static void startMetastore() throws Exception {
HiveMetastoreTest.hiveConf = metastore.hiveConf();
HiveMetastoreTest.metastoreClient = new HiveMetaStoreClient(hiveConf);
String dbPath = metastore.getDatabasePath(DB_NAME);
- Database db = new Database(DB_NAME, "description", dbPath, new HashMap<>());
+ Database db = new Database(DB_NAME, "description", dbPath, Maps.newHashMap());
metastoreClient.createDatabase(db);
HiveMetastoreTest.catalog = (HiveCatalog)
CatalogUtil.loadCatalog(HiveCatalog.class.getName(), CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, ImmutableMap.of(
@@ -55,7 +55,7 @@ public static void startMetastore() throws Exception {
}
@AfterClass
- public static void stopMetastore() {
+ public static void stopMetastore() throws Exception {
HiveMetastoreTest.catalog = null;
metastoreClient.close();
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
index 6804fc80e5b0..4c210a956355 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
@@ -22,11 +22,9 @@
import java.io.File;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -35,6 +33,7 @@
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hive.iceberg.org.apache.avro.generic.GenericData;
import org.apache.hive.iceberg.org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Files;
@@ -48,6 +47,7 @@
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.ConfigProperties;
@@ -66,6 +66,7 @@
import static java.nio.file.attribute.PosixFilePermissions.fromString;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
+import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
@@ -324,8 +325,14 @@ public void testListTables() throws TException, IOException {
org.apache.hadoop.hive.metastore.api.Table hiveTable = createHiveTable(hiveTableName);
metastoreClient.createTable(hiveTable);
+ catalog.setListAllTables(false);
List tableIdents1 = catalog.listTables(TABLE_IDENTIFIER.namespace());
Assert.assertEquals("should only 1 iceberg table .", 1, tableIdents1.size());
+
+ catalog.setListAllTables(true);
+ List tableIdents2 = catalog.listTables(TABLE_IDENTIFIER.namespace());
+ Assert.assertEquals("should be 2 tables in namespace .", 2, tableIdents2.size());
+
Assert.assertTrue(catalog.tableExists(TABLE_IDENTIFIER));
metastoreClient.dropTable(DB_NAME, hiveTableName);
}
@@ -371,6 +378,51 @@ public void testNonDefaultDatabaseLocation() throws IOException, TException {
metastoreClient.dropDatabase(NON_DEFAULT_DATABASE, true, true, true);
}
+ @Test
+ public void testRegisterTable() throws TException {
+ org.apache.hadoop.hive.metastore.api.Table originalTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+ Map originalParams = originalTable.getParameters();
+ Assert.assertNotNull(originalParams);
+ Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(originalParams.get(TABLE_TYPE_PROP)));
+ Assert.assertTrue("EXTERNAL_TABLE".equalsIgnoreCase(originalTable.getTableType()));
+
+ catalog.dropTable(TABLE_IDENTIFIER, false);
+ Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER));
+
+ List metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
+ Assert.assertEquals(1, metadataVersionFiles.size());
+
+ catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0));
+
+ org.apache.hadoop.hive.metastore.api.Table newTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+ Map newTableParameters = newTable.getParameters();
+ Assert.assertNull(newTableParameters.get(PREVIOUS_METADATA_LOCATION_PROP));
+ Assert.assertEquals(originalParams.get(TABLE_TYPE_PROP), newTableParameters.get(TABLE_TYPE_PROP));
+ Assert.assertEquals(originalParams.get(METADATA_LOCATION_PROP), newTableParameters.get(METADATA_LOCATION_PROP));
+ Assert.assertEquals(originalTable.getSd(), newTable.getSd());
+ }
+
+ @Test
+ public void testRegisterExistingTable() throws TException {
+ org.apache.hadoop.hive.metastore.api.Table originalTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+ Map originalParams = originalTable.getParameters();
+ Assert.assertNotNull(originalParams);
+ Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(originalParams.get(TABLE_TYPE_PROP)));
+ Assert.assertTrue("EXTERNAL_TABLE".equalsIgnoreCase(originalTable.getTableType()));
+
+ List metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
+ Assert.assertEquals(1, metadataVersionFiles.size());
+
+ // Try to register an existing table
+ AssertHelpers.assertThrows(
+ "Should complain that the table already exists", AlreadyExistsException.class,
+ "Table already exists",
+ () -> catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0)));
+ }
+
@Test
public void testEngineHiveEnabledDefault() throws TException {
// Drop the previously created table to make place for the new one
@@ -415,7 +467,7 @@ public void testEngineHiveEnabledTableProperty() throws TException {
catalog.dropTable(TABLE_IDENTIFIER);
// Enabled by table property - also check that the hive-conf is ignored
- Map tableProperties = new HashMap<>();
+ Map tableProperties = Maps.newHashMap();
tableProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true");
catalog.getConf().set(ConfigProperties.ENGINE_HIVE_ENABLED, "false");
@@ -436,20 +488,19 @@ public void testEngineHiveEnabledTableProperty() throws TException {
assertHiveEnabled(hmsTable, false);
}
- @Test(timeout = 60000, expected = NotFoundException.class)
- public void testMissingMetadataWontCauseHang() throws Exception {
+ @Test
+ public void testMissingMetadataWontCauseHang() {
catalog.loadTable(TABLE_IDENTIFIER);
- HiveConf.setIntVar(catalog.getConf(), HiveConf.ConfVars.HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, 3);
File realLocation = new File(metadataLocation(TABLE_NAME));
File fakeLocation = new File(metadataLocation(TABLE_NAME) + "_dummy");
- realLocation.renameTo(fakeLocation);
- try {
- catalog.loadTable(TABLE_IDENTIFIER);
- } finally {
- realLocation.renameTo(realLocation);
- }
+ Assert.assertTrue(realLocation.renameTo(fakeLocation));
+ AssertHelpers.assertThrows(
+ "HiveTableOperations shouldn't hang indefinitely when a missing metadata file is encountered",
+ NotFoundException.class,
+ () -> catalog.loadTable(TABLE_IDENTIFIER));
+ Assert.assertTrue(fakeLocation.renameTo(realLocation));
}
private void assertHiveEnabled(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean expected) {
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
index 9518540238a4..6ba3a46a27a6 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
@@ -21,7 +21,6 @@
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CachingCatalog;
@@ -29,6 +28,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
@@ -54,7 +54,6 @@
import static org.apache.iceberg.types.Types.NestedField.required;
public class TestHiveCatalog extends HiveMetastoreTest {
- private static final String hiveLocalDir = "file:/tmp/hive/" + UUID.randomUUID().toString();
private static ImmutableMap meta = ImmutableMap.of(
"owner", "apache",
"group", "iceberg",
@@ -272,7 +271,7 @@ public void testCreateTableCustomSortOrder() {
}
@Test
- public void testCreateNamespace() throws TException {
+ public void testCreateNamespace() throws Exception {
Namespace namespace1 = Namespace.of("noLocation");
catalog.createNamespace(namespace1, meta);
Database database1 = metastoreClient.getDatabase(namespace1.toString());
@@ -287,6 +286,9 @@ public void testCreateNamespace() throws TException {
AlreadyExistsException.class, "Namespace '" + namespace1 + "' already exists!", () -> {
catalog.createNamespace(namespace1);
});
+ String hiveLocalDir = temp.newFolder().toURI().toString();
+ // remove the trailing slash of the URI
+ hiveLocalDir = hiveLocalDir.substring(0, hiveLocalDir.length() - 1);
ImmutableMap newMeta = ImmutableMap.builder()
.putAll(meta)
.put("location", hiveLocalDir)
@@ -441,4 +443,29 @@ private String defaultUri(Namespace namespace) throws TException {
"hive.metastore.warehouse.external.dir", "") + "/" + namespace.level(0) + ".db";
}
+ @Test
+ public void testUUIDinTableProperties() throws Exception {
+ Schema schema = new Schema(
+ required(1, "id", Types.IntegerType.get(), "unique ID"),
+ required(2, "data", Types.StringType.get())
+ );
+ TableIdentifier tableIdentifier = TableIdentifier.of(DB_NAME, "tbl");
+ String location = temp.newFolder("tbl").toString();
+
+ try {
+ catalog.buildTable(tableIdentifier, schema)
+ .withLocation(location)
+ .create();
+
+ String tableName = tableIdentifier.name();
+ org.apache.hadoop.hive.metastore.api.Table hmsTable =
+ metastoreClient.getTable(tableIdentifier.namespace().level(0), tableName);
+
+ // check parameters are in expected state
+ Map parameters = hmsTable.getParameters();
+ Assert.assertNotNull(parameters.get(TableProperties.UUID));
+ } finally {
+ catalog.dropTable(tableIdentifier);
+ }
+ }
}
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
index 5c0a22eba988..e22374b6e975 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
@@ -24,9 +24,13 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;
import org.junit.Assert;
@@ -77,10 +81,11 @@ public void testSuppressUnlockExceptions() throws TException, InterruptedExcepti
}
/**
- * Pretends we throw an error while persisting that actually fails to commit serverside
+ * Pretends we throw an error while persisting, and not found with check state, commit state should be treated as
+ * unknown, because in reality the persisting may still succeed, just not yet by the time of checking.
*/
@Test
- public void testThriftExceptionFailureOnCommit() throws TException, InterruptedException {
+ public void testThriftExceptionUnknownStateIfNotInHistoryFailureOnCommit() throws TException, InterruptedException {
Table table = catalog.loadTable(TABLE_IDENTIFIER);
HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations();
@@ -100,17 +105,15 @@ public void testThriftExceptionFailureOnCommit() throws TException, InterruptedE
failCommitAndThrowException(spyOps);
- AssertHelpers.assertThrows("We should rethrow generic runtime errors if the " +
- "commit actually doesn't succeed", CommitStateUnknownException.class,
- "Cannot determine whether the commit was successful or not, the underlying data files may " +
- "or may not be needed. Manual intervention via the Remove Orphan Files Action can remove these files " +
- "when a connection to the Catalog can be re-established if the commit was actually unsuccessful.",
- () -> spyOps.commit(metadataV2, metadataV1));
+ AssertHelpers.assertThrows("We should assume commit state is unknown if the " +
+ "new location is not found in history in commit state check", CommitStateUnknownException.class,
+ "Datacenter on fire", () -> spyOps.commit(metadataV2, metadataV1));
ops.refresh();
Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current());
Assert.assertTrue("Current metadata should still exist", metadataFileExists(metadataV2));
- Assert.assertEquals("New non-current metadata file should be added", 3, metadataFileCount(ops.current()));
+ Assert.assertEquals("New metadata files should still exist, new location not in history but" +
+ " the commit may still succeed", 3, metadataFileCount(ops.current()));
}
/**
@@ -276,6 +279,21 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted
2, ops.current().schema().columns().size());
}
+ @Test
+ public void testInvalidObjectException() {
+ TableIdentifier badTi = TableIdentifier.of(DB_NAME, "£tbl");
+ Assert.assertThrows(String.format("Invalid table name for %s.%s", DB_NAME, "`tbl`"),
+ ValidationException.class,
+ () -> catalog.createTable(badTi, schema, PartitionSpec.unpartitioned()));
+ }
+
+ @Test
+ public void testAlreadyExistsException() {
+ Assert.assertThrows(String.format("Table already exists: %s.%s", DB_NAME, TABLE_NAME),
+ AlreadyExistsException.class,
+ () -> catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned()));
+ }
+
private void commitAndThrowException(HiveTableOperations realOperations, HiveTableOperations spyOperations)
throws TException, InterruptedException {
// Simulate a communication error after a successful commit
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
index a956d36031d3..76eb87b21b50 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.hive;
import java.io.File;
+import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
@@ -47,6 +48,7 @@
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportFactory;
+import org.junit.Assert;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.attribute.PosixFilePermissions.asFileAttribute;
@@ -80,9 +82,37 @@ public class TestHiveMetastore {
.orNoop()
.buildStatic();
- private File hiveLocalDir;
- private File hiveWarehouseDir;
- private File hiveExternalWarehouseDir;
+ // It's tricky to clear all static fields in an HMS instance in order to switch derby root dir.
+ // Therefore, we reuse the same derby root between tests and remove it after JVM exits.
+ private static final File HIVE_LOCAL_DIR;
+ private static final File HIVE_WAREHOUSE_DIR;
+ private static final File HIVE_EXTERNAL_WAREHOUSE_DIR;
+ private static final String DERBY_PATH;
+
+ static {
+ try {
+ HIVE_LOCAL_DIR = createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile();
+ DERBY_PATH = new File(HIVE_LOCAL_DIR, "metastore_db").getPath();
+ HIVE_WAREHOUSE_DIR = new File(HIVE_LOCAL_DIR, "managed");
+ HIVE_EXTERNAL_WAREHOUSE_DIR = new File(HIVE_LOCAL_DIR, "external");
+ File derbyLogFile = new File(HIVE_LOCAL_DIR, "derby.log");
+ System.setProperty("derby.stream.error.file", derbyLogFile.getAbsolutePath());
+ setupMetastoreDB("jdbc:derby:" + DERBY_PATH + ";create=true");
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ Path localDirPath = new Path(HIVE_LOCAL_DIR.getAbsolutePath());
+ FileSystem fs = Util.getFs(localDirPath, new Configuration());
+ String errMsg = "Failed to delete " + localDirPath;
+ try {
+ Assert.assertTrue(errMsg, fs.delete(localDirPath, true));
+ } catch (IOException e) {
+ throw new RuntimeException(errMsg, e);
+ }
+ }));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to setup local dir for hive metastore", e);
+ }
+ }
+
private HiveConf hiveConf;
private ExecutorService executorService;
private TServer server;
@@ -111,17 +141,6 @@ public void start(HiveConf conf) {
*/
public void start(HiveConf conf, int poolSize) {
try {
- hiveLocalDir = createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile();
- hiveWarehouseDir = new File(hiveLocalDir, "managed");
- hiveExternalWarehouseDir = new File(hiveLocalDir, "external");
- File derbyLogFile = new File(hiveLocalDir, "derby.log");
- System.setProperty("derby.stream.error.file", derbyLogFile.getAbsolutePath());
-
- // create and initialize HMS backend DB for ACID and non-ACID tables as well
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY,
- "jdbc:derby:" + getDerbyPath() + ";create=true");
- TestTxnDbUtil.prepDb(conf);
-
TServerSocket socket = new TServerSocket(0);
int port = socket.getServerSocket().getLocalPort();
initConf(conf, port);
@@ -140,7 +159,8 @@ public void start(HiveConf conf, int poolSize) {
}
}
- public void stop() {
+ public void stop() throws Exception {
+ reset();
if (clientPool != null) {
clientPool.close();
}
@@ -150,9 +170,6 @@ public void stop() {
if (executorService != null) {
executorService.shutdown();
}
- if (hiveLocalDir != null) {
- hiveLocalDir.delete();
- }
if (baseHandler != null) {
baseHandler.shutdown();
}
@@ -164,29 +181,31 @@ public HiveConf hiveConf() {
}
public String getDatabasePath(String dbName) {
- File dbDir = new File(hiveExternalWarehouseDir, dbName + ".db");
+ File dbDir = new File(HIVE_LOCAL_DIR, dbName + ".db");
return dbDir.getPath();
}
public void reset() throws Exception {
- for (String dbName : clientPool.run(client -> client.getAllDatabases())) {
- for (String tblName : clientPool.run(client -> client.getAllTables(dbName))) {
- clientPool.run(client -> {
- client.dropTable(dbName, tblName, true, true, true);
- return null;
- });
- }
+ if (clientPool != null) {
+ for (String dbName : clientPool.run(client -> client.getAllDatabases())) {
+ for (String tblName : clientPool.run(client -> client.getAllTables(dbName))) {
+ clientPool.run(client -> {
+ client.dropTable(dbName, tblName, true, true, true);
+ return null;
+ });
+ }
- if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
- // Drop cascade, functions dropped by cascade
- clientPool.run(client -> {
- client.dropDatabase(dbName, true, true, true);
- return null;
- });
+ if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
+ // Drop cascade, functions dropped by cascade
+ clientPool.run(client -> {
+ client.dropDatabase(dbName, true, true, true);
+ return null;
+ });
+ }
}
}
- Path warehouseRoot = new Path(hiveLocalDir.getAbsolutePath());
+ Path warehouseRoot = new Path(HIVE_LOCAL_DIR.getAbsolutePath());
FileSystem fs = Util.getFs(warehouseRoot, hiveConf);
for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
if (!fileStatus.getPath().getName().equals("derby.log") &&
@@ -210,7 +229,7 @@ public R run(ClientPool.Action action) thro
private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf conf) throws Exception {
HiveConf serverConf = new HiveConf(conf);
- serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true");
+ serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + DERBY_PATH + ";create=true");
baseHandler = HMS_HANDLER_CTOR.newInstance("new db based metaserver", serverConf);
IHMSHandler handler = GET_BASE_HMS_HANDLER.invoke(serverConf, baseHandler, false);
@@ -226,9 +245,9 @@ private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf con
private void initConf(HiveConf conf, int port) {
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port);
- conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + hiveWarehouseDir.getAbsolutePath());
+ conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + HIVE_WAREHOUSE_DIR.getAbsolutePath());
conf.set(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL.varname,
- "file:" + hiveExternalWarehouseDir.getAbsolutePath());
+ "file:" + HIVE_EXTERNAL_WAREHOUSE_DIR.getAbsolutePath());
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false");
conf.set("iceberg.hive.client-pool-size", "2");
@@ -237,8 +256,10 @@ private void initConf(HiveConf conf, int port) {
conf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, false);
}
- private String getDerbyPath() {
- File metastoreDB = new File(hiveLocalDir, "metastore_db");
- return metastoreDB.getPath();
+ private static void setupMetastoreDB(String dbURL) throws Exception {
+ HiveConf conf = new HiveConf();
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY,
+ "jdbc:derby:" + DERBY_PATH + ";create=true");
+ TestTxnDbUtil.prepDb(conf);
}
}
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveSchemaUtil.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveSchemaUtil.java
index e0ad70cd8a4f..a02ed0909ace 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveSchemaUtil.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveSchemaUtil.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.hive;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -30,6 +29,7 @@
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
@@ -120,7 +120,7 @@ public void testNotSupportedTypes() {
for (FieldSchema notSupportedField : getNotSupportedFieldSchemas()) {
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"Unsupported Hive type", () -> {
- HiveSchemaUtil.convert(new ArrayList<>(Arrays.asList(notSupportedField)));
+ HiveSchemaUtil.convert(Lists.newArrayList(Arrays.asList(notSupportedField)));
}
);
}
@@ -171,7 +171,7 @@ public void testConversionWithoutLastComment() {
}
protected List getSupportedFieldSchemas() {
- List fields = new ArrayList<>();
+ List fields = Lists.newArrayList();
fields.add(new FieldSchema("c_float", serdeConstants.FLOAT_TYPE_NAME, "float comment"));
fields.add(new FieldSchema("c_double", serdeConstants.DOUBLE_TYPE_NAME, "double comment"));
fields.add(new FieldSchema("c_boolean", serdeConstants.BOOLEAN_TYPE_NAME, "boolean comment"));
@@ -186,7 +186,7 @@ protected List getSupportedFieldSchemas() {
}
protected List getNotSupportedFieldSchemas() {
- List fields = new ArrayList<>();
+ List fields = Lists.newArrayList();
fields.add(new FieldSchema("c_byte", serdeConstants.TINYINT_TYPE_NAME, ""));
fields.add(new FieldSchema("c_short", serdeConstants.SMALLINT_TYPE_NAME, ""));
fields.add(new FieldSchema("c_char", serdeConstants.CHAR_TYPE_NAME + "(5)", ""));
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
index 47e9f3e0537d..e6ba956eb2a1 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
@@ -19,8 +19,6 @@
package org.apache.iceberg.mr.hive;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -33,6 +31,7 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.hive.serde.objectinspector.WriteObjectInspector;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
import org.apache.iceberg.types.Type.PrimitiveType;
@@ -154,7 +153,7 @@ public FieldDeserializer list(ListType listTypeInfo, ObjectInspectorPair pair, F
return null;
}
- List