Skip to content
Permalink
Browse files
HIVE-25008: Migrate hive table data into Iceberg format. (#2173) (Las…
…zlo Pinter, reviewed by Marton Bod and Peter Vary)
  • Loading branch information
lcspinter committed Apr 23, 2021
1 parent 25892ea commit 326abf9685de39cf4f1b3222d84fe9cbc465710a
Show file tree
Hide file tree
Showing 18 changed files with 2,371 additions and 20 deletions.
@@ -5459,7 +5459,10 @@ public static enum ConfVars {

HIVE_DESCRIBE_PARTITIONED_TABLE_IGNORE_STATS("hive.describe.partitionedtable.ignore.stats", false,
"Disable partitioned table stats collection for 'DESCRIBE FORMATTED' or 'DESCRIBE EXTENDED' commands."),


HIVE_SERVER2_ICEBERG_METADATA_GENERATOR_THREADS("hive.server2.iceberg.metadata.generator.threads", 10,
"Number of threads used to scan partition directories for data files and update/generate iceberg metadata"),

/* BLOBSTORE section */

HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

/**
* @deprecated use org.apache.iceberg.data.DataUtil once Iceberg 0.12 is released.
*/
@Deprecated
public class DataUtil {

private DataUtil() {
}

private static final PathFilter HIDDEN_PATH_FILTER =
p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");

/**
* Returns the data files in a partition by listing the partition location.
*
* For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
* metrics are set to null.
* @deprecated use org.apache.iceberg.data.DataUtil#listPartition() once Iceberg 0.12 is released.
*
* @param partitionKeys partition key, e.g., "a=1/b=2"
* @param uri partition location URI
* @param format partition format, avro, parquet or orc
* @param spec a partition spec
* @param conf a Hadoop conf
* @param metricsConfig a metrics conf
* @return a List of DataFile
*/
@Deprecated
public static List<DataFile> listPartition(Map<String, String> partitionKeys, String uri, String format,
PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig) {
return listPartition(partitionKeys, uri, format, spec, conf, metricsConfig, null);
}

/**
* Returns the data files in a partition by listing the partition location.
* <p>
* For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
* metrics are set to null.
* <p>
* Note: certain metrics, like NaN counts, that are only supported by iceberg file writers but not file footers,
* will not be populated.
* @deprecated use org.apache.iceberg.data.DataUtil#listPartition() once Iceberg 0.12 is released.
*
* @param partitionKeys partition key, e.g., "a=1/b=2"
* @param uri partition location URI
* @param format partition format, avro, parquet or orc
* @param spec a partition spec
* @param conf a Hadoop conf
* @param metricsConfig a metrics conf
* @param mapping a name mapping
* @return a List of DataFile
*/
@Deprecated
public static List<DataFile> listPartition(Map<String, String> partitionKeys, String uri, String format,
PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig,
NameMapping mapping) {
if (format.contains("avro")) {
return listAvroPartition(partitionKeys, uri, spec, conf);
} else if (format.contains("parquet")) {
return listParquetPartition(partitionKeys, uri, spec, conf, metricsConfig, mapping);
} else if (format.contains("orc")) {
return listOrcPartition(partitionKeys, uri, spec, conf, metricsConfig, mapping);
} else {
throw new UnsupportedOperationException("Unknown partition format: " + format);
}
}

private static List<DataFile> listAvroPartition(Map<String, String> partitionPath, String partitionUri,
PartitionSpec spec, Configuration conf) {
try {
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.map(stat -> {
// Avro file statistics cannot be calculated without reading the file.
// Setting the rowCount to 0 is just a workaround so that the DataFiles.Builder.build() doesn't fail.
Metrics metrics = new Metrics(0L, null, null, null);
String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name, partitionPath.get(name)))
.collect(Collectors.joining("/"));

return DataFiles.builder(spec)
.withPath(stat.getPath().toString())
.withFormat("avro")
.withFileSizeInBytes(stat.getLen())
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build();

}).collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
}
}

private static List<DataFile> listParquetPartition(Map<String, String> partitionPath, String partitionUri,
PartitionSpec spec, Configuration conf,
MetricsConfig metricsSpec, NameMapping mapping) {
try {
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);

return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.map(stat -> {
Metrics metrics;
try {
ParquetMetadata metadata = ParquetFileReader.readFooter(conf, stat);
metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
} catch (IOException e) {
throw new RuntimeException("Unable to read the footer of the parquet file: " +
stat.getPath(), e);
}
String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name, partitionPath.get(name)))
.collect(Collectors.joining("/"));

return DataFiles.builder(spec)
.withPath(stat.getPath().toString())
.withFormat("parquet")
.withFileSizeInBytes(stat.getLen())
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build();
}).collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
}
}

private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
PartitionSpec spec, Configuration conf,
MetricsConfig metricsSpec, NameMapping mapping) {
try {
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);

return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.map(stat -> {
Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
metricsSpec, mapping);
String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name, partitionPath.get(name)))
.collect(Collectors.joining("/"));

return DataFiles.builder(spec)
.withPath(stat.getPath().toString())
.withFormat("orc")
.withFileSizeInBytes(stat.getLen())
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build();

}).collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
}
}


}
@@ -19,12 +19,20 @@

package org.apache.iceberg.mr.hive;

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
@@ -67,6 +75,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
private boolean deleteIcebergTable;
private FileIO deleteIo;
private TableMetadata deleteMetadata;
private boolean canMigrateHiveTable;
private PreAlterTableProperties preAlterTableProperties;

public HiveIcebergMetaHook(Configuration conf) {
this.conf = conf;
@@ -118,20 +128,7 @@ public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)

catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
catalogProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec));

// Allow purging table data if the table is created now and not set otherwise
if (hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE) == null) {
hmsTable.getParameters().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "TRUE");
}

// If the table is not managed by Hive catalog then the location should be set
if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
Preconditions.checkArgument(hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null,
"Table location not set");
}

// Remove creation related properties
PARAMETERS_TO_REMOVE.forEach(hmsTable.getParameters()::remove);
updateHmsTableProperties(hmsTable);
}

@Override
@@ -191,6 +188,91 @@ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable,
}
}

@Override
public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context)
throws MetaException {
HiveMetaHook.super.preAlterTable(hmsTable, context);
catalogProperties = getCatalogProperties(hmsTable);
try {
icebergTable = Catalogs.loadTable(conf, catalogProperties);
} catch (NoSuchTableException nte) {
// If the iceberg table does not exist, and the hms table is external and not temporary and not acid
// we will create it in commitAlterTable
StorageDescriptor sd = hmsTable.getSd();
canMigrateHiveTable = MetaStoreUtils.isExternalTable(hmsTable) && !hmsTable.isTemporary() &&
!AcidUtils.isTransactionalTable(hmsTable);
if (!canMigrateHiveTable) {
throw new MetaException("Converting non-external, temporary or transactional hive table to iceberg " +
"table is not allowed.");
}

preAlterTableProperties = new PreAlterTableProperties();
preAlterTableProperties.tableLocation = sd.getLocation();
preAlterTableProperties.format = sd.getInputFormat();
preAlterTableProperties.schema = schema(catalogProperties, hmsTable);
preAlterTableProperties.spec = spec(preAlterTableProperties.schema, catalogProperties, hmsTable);
preAlterTableProperties.partitionKeys = hmsTable.getPartitionKeys();

context.getProperties().put(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE, "true");
// If there are partition keys specified remove them from the HMS table and add them to the column list
if (hmsTable.isSetPartitionKeys()) {
hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys());
hmsTable.setPartitionKeysIsSet(false);
}
sd.setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
sd.setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName());
sd.setSerdeInfo(new SerDeInfo("icebergSerde", HiveIcebergSerDe.class.getCanonicalName(),
Collections.emptyMap()));
updateHmsTableProperties(hmsTable);
}
}

@Override
public void commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable,
PartitionSpecProxy partitionSpecProxy) throws MetaException {
HiveMetaHook.super.commitAlterTable(hmsTable, partitionSpecProxy);
if (canMigrateHiveTable) {
catalogProperties = getCatalogProperties(hmsTable);
catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(preAlterTableProperties.schema));
catalogProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(preAlterTableProperties.spec));
setFileFormat();
if (Catalogs.hiveCatalog(conf, catalogProperties)) {
catalogProperties.put(TableProperties.ENGINE_HIVE_ENABLED, true);
}
HiveTableUtil.importFiles(preAlterTableProperties.tableLocation, preAlterTableProperties.format,
partitionSpecProxy, preAlterTableProperties.partitionKeys, catalogProperties, conf);
}
}

private void setFileFormat() {
String format = preAlterTableProperties.format.toLowerCase();
if (format.contains("orc")) {
catalogProperties.put(TableProperties.DEFAULT_FILE_FORMAT, "orc");
} else if (format.contains("parquet")) {
catalogProperties.put(TableProperties.DEFAULT_FILE_FORMAT, "parquet");
} else if (format.contains("avro")) {
catalogProperties.put(TableProperties.DEFAULT_FILE_FORMAT, "avro");
}
}

private void updateHmsTableProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
// Set the table type even for non HiveCatalog based tables
hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());

// Allow purging table data if the table is created now and not set otherwise
hmsTable.getParameters().putIfAbsent(InputFormatConfig.EXTERNAL_TABLE_PURGE, "TRUE");

// If the table is not managed by Hive catalog then the location should be set
if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
Preconditions.checkArgument(hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null,
"Table location not set");
}

// Remove creation related properties
PARAMETERS_TO_REMOVE.forEach(hmsTable.getParameters()::remove);
}

/**
* Calculates the properties we would like to send to the catalog.
* <ul>
@@ -256,4 +338,12 @@ private static PartitionSpec spec(Schema schema, Properties properties,
return PartitionSpec.unpartitioned();
}
}

private class PreAlterTableProperties {
private String tableLocation;
private String format;
private Schema schema;
private PartitionSpec spec;
private List<FieldSchema> partitionKeys;
}
}

0 comments on commit 326abf9

Please sign in to comment.