Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-26103: Port Iceberg fixes to the iceberg module #3164

Merged
merged 15 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -5693,10 +5693,6 @@ public static enum ConfVars {
HIVE_SERVER2_ICEBERG_METADATA_GENERATOR_THREADS("hive.server2.iceberg.metadata.generator.threads", 10,
"Number of threads used to scan partition directories for data files and update/generate iceberg metadata"),

HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES("hive.iceberg.metadata.refresh.max.retries", 2,
"Max retry count for trying to access the metadata location in order to refresh metadata during " +
" Iceberg table load."),

/* BLOBSTORE section */

HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",
Expand Down
15 changes: 15 additions & 0 deletions iceberg/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@
<property name="format" value="sparkContext\(\)\.hadoopConfiguration\(\)"/>
<property name="message" value="Are you sure that you want to use sparkContext().hadoopConfiguration()? In most cases, you should use sessionState().newHadoopConf() instead, so that the Hadoop configurations specified in the Spark session configuration will come into effect."/>
</module>
<module name="RegexpSingleline">
<property name="format" value="new HashMap&lt;&gt;\(.*\)"/>
<property name="message"
value="Prefer using Maps.newHashMap instead."/>
</module>
<module name="RegexpSingleline">
<property name="format" value="new ArrayList&lt;&gt;\(.*\)"/>
<property name="message"
value="Prefer using Lists.newArrayList() instead."/>
</module>
<module name="RegexpSingleline">
<property name="format" value="new HashSet&lt;&gt;\(.*\)"/>
<property name="message"
value="Prefer using Sets.newHashSet() instead."/>
</module>
<module name="SuppressionFilter"> <!-- baseline-gradle: README.md -->
<property name="file" value="${config_loc}/checkstyle-suppressions.xml"/>
</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -49,49 +51,31 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
public static final String LIST_ALL_TABLES = "list-all-tables";
public static final String LIST_ALL_TABLES_DEFAULT = "false";

private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);

private String name;
private Configuration conf;
private FileIO fileIO;
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;

public HiveCatalog() {
}

/**
* Hive Catalog constructor.
*
* @param conf Hadoop Configuration
* @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog. Will be removed in
* v0.13.0
*/
@Deprecated
public HiveCatalog(Configuration conf) {
this.name = "hive";
this.conf = conf;
this.fileIO = new HadoopFileIO(conf);
Map<String, String> properties = ImmutableMap.of(
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
conf.get(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
String.valueOf(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT)),
CatalogProperties.CLIENT_POOL_SIZE,
conf.get(CatalogProperties.CLIENT_POOL_SIZE,
String.valueOf(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT))
);
this.clients = new CachedClientPool(conf, properties);
}

@Override
public void initialize(String inputName, Map<String, String> properties) {
this.name = inputName;
Expand All @@ -108,6 +92,8 @@ public void initialize(String inputName, Map<String, String> properties) {
this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
}

this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));

String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

Expand All @@ -122,12 +108,20 @@ public List<TableIdentifier> listTables(Namespace namespace) {

try {
List<String> tableNames = clients.run(client -> client.getAllTables(database));
List<Table> tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
List<TableIdentifier> tableIdentifiers = tableObjects.stream()
.filter(table -> table.getParameters() == null ? false : BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
.equalsIgnoreCase(table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
.map(table -> TableIdentifier.of(namespace, table.getTableName()))
.collect(Collectors.toList());
List<TableIdentifier> tableIdentifiers;

if (listAllTables) {
tableIdentifiers = tableNames.stream()
.map(t -> TableIdentifier.of(namespace, t))
.collect(Collectors.toList());
} else {
List<Table> tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
tableIdentifiers = tableObjects.stream()
.filter(table -> table.getParameters() != null && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
.equalsIgnoreCase(table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
.map(table -> TableIdentifier.of(namespace, table.getTableName()))
.collect(Collectors.toList());
}

LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, tableIdentifiers);
return tableIdentifiers;
Expand Down Expand Up @@ -235,6 +229,23 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
}
}

@Override
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

return new BaseTable(ops, identifier.toString());
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> meta) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -533,4 +544,9 @@ public void setConf(Configuration conf) {
public Configuration getConf() {
return conf;
}

@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
Expand All @@ -35,9 +37,8 @@ public class HiveClientPool extends ClientPoolImpl<IMetaStoreClient, TException>
// use appropriate ctor depending on whether we're working with Hive1, Hive2, or Hive3 dependencies
// we need to do this because there is a breaking API change between Hive1, Hive2, and Hive3
private static final DynMethods.StaticMethod GET_CLIENT = DynMethods.builder("getProxy")
.impl(RetryingMetaStoreClient.class, HiveConf.class)
.impl(RetryingMetaStoreClient.class, HiveConf.class, Boolean.TYPE)
.impl(RetryingMetaStoreClient.class, Configuration.class, Boolean.TYPE)
.impl(RetryingMetaStoreClient.class, HiveConf.class, HiveMetaHookLoader.class, String.class) // Hive 1 and 2
.impl(RetryingMetaStoreClient.class, Configuration.class, HiveMetaHookLoader.class, String.class) // Hive 3
.buildStatic();

private final HiveConf hiveConf;
Expand All @@ -53,7 +54,7 @@ public HiveClientPool(int poolSize, Configuration conf) {
protected IMetaStoreClient newClient() {
try {
try {
return GET_CLIENT.invoke(hiveConf, true);
return GET_CLIENT.invoke(hiveConf, (HiveMetaHookLoader) tbl -> null, HiveMetaStoreClient.class.getName());
} catch (RuntimeException e) {
// any MetaException would be wrapped into RuntimeException during reflection, so let's double-check type here
if (e.getCause() instanceof MetaException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.hive;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
Expand All @@ -30,6 +29,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
Expand Down Expand Up @@ -62,7 +62,7 @@ static Type convert(TypeInfo typeInfo, boolean autoConvert) {
}

List<Types.NestedField> convertInternal(List<String> names, List<TypeInfo> typeInfos, List<String> comments) {
List<Types.NestedField> result = new ArrayList<>(names.size());
List<Types.NestedField> result = Lists.newArrayListWithExpectedSize(names.size());
for (int i = 0; i < names.size(); ++i) {
result.add(Types.NestedField.optional(id++, names.get(i), convertType(typeInfos.get(i)),
comments.isEmpty() || i >= comments.size() ? null : comments.get(i)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.hive;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +30,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -72,9 +72,9 @@ public static Schema convert(List<FieldSchema> fieldSchemas) {
* @return An equivalent Iceberg Schema
*/
public static Schema convert(List<FieldSchema> fieldSchemas, boolean autoConvert) {
List<String> names = new ArrayList<>(fieldSchemas.size());
List<TypeInfo> typeInfos = new ArrayList<>(fieldSchemas.size());
List<String> comments = new ArrayList<>(fieldSchemas.size());
List<String> names = Lists.newArrayListWithExpectedSize(fieldSchemas.size());
List<TypeInfo> typeInfos = Lists.newArrayListWithExpectedSize(fieldSchemas.size());
List<String> comments = Lists.newArrayListWithExpectedSize(fieldSchemas.size());

for (FieldSchema col : fieldSchemas) {
names.add(col.getName());
Expand Down Expand Up @@ -237,10 +237,10 @@ public static Pair<String, Optional<String>> getReorderedColumn(List<FieldSchema
}

public static class SchemaDifference {
private final List<FieldSchema> missingFromFirst = new ArrayList<>();
private final List<FieldSchema> missingFromSecond = new ArrayList<>();
private final List<FieldSchema> typeChanged = new ArrayList<>();
private final List<FieldSchema> commentChanged = new ArrayList<>();
private final List<FieldSchema> missingFromFirst = Lists.newArrayList();
private final List<FieldSchema> missingFromSecond = Lists.newArrayList();
private final List<FieldSchema> typeChanged = Lists.newArrayList();
private final List<FieldSchema> commentChanged = Lists.newArrayList();

public List<FieldSchema> getMissingFromFirst() {
return missingFromFirst;
Expand Down
Loading