Skip to content

Commit

Permalink
HIVE-27448: Hive Iceberg: Merge column stats
Browse files Browse the repository at this point in the history
  • Loading branch information
SimhadriG committed Jun 20, 2023
1 parent 7c83f6b commit 7d07744
Show file tree
Hide file tree
Showing 6 changed files with 445 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.ErrorMsg;
Expand Down Expand Up @@ -414,7 +416,7 @@ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTabl
List<ColumnStatistics> colStats) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
String snapshotId = String.format("%s-STATS-%d", tbl.name(), tbl.currentSnapshot().snapshotId());
invalidateStats(getStatsPath(tbl));
checkAndMergeOrInvalidateStats(colStats.get(0), hmsTable);
byte[] serializeColStats = SerializationUtils.serialize((Serializable) colStats);
try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString()))
.createdBy(Constants.HIVE_ENGINE).build()) {
Expand All @@ -437,17 +439,20 @@ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTabl
@Override
public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
if (canSetColStatistics(hmsTable)) {
Path statsPath = getStatsPath(table);
try {
FileSystem fs = statsPath.getFileSystem(conf);
if (fs.exists(statsPath)) {
return true;
}
} catch (IOException e) {
LOG.warn("Exception when trying to find Iceberg column stats for table:{} , snapshot:{} , " +
"statsPath: {} , stack trace: {}", table.name(), table.currentSnapshot(), statsPath, e);
return canSetColStatistics(hmsTable) && canProvideColStatistics(hmsTable, table.currentSnapshot().snapshotId());
}

private boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable, long snapshotId) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
Path statsPath = getStatsPath(table, snapshotId);
try {
FileSystem fs = statsPath.getFileSystem(conf);
if (fs.exists(statsPath)) {
return true;
}
} catch (IOException e) {
LOG.warn("Exception when trying to find Iceberg column stats for table:{} , snapshot:{} , " +
"statsPath: {} , stack trace: {}", table.name(), table.currentSnapshot(), statsPath, e);
}
return false;
}
Expand All @@ -457,17 +462,20 @@ public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.meta
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
String statsPath = getStatsPath(table).toString();
LOG.info("Using stats from puffin file at: {}", statsPath);
return readColStats(table, statsPath).equals(null) ? null : readColStats(table, statsPath).getStatsObj();
}

private ColumnStatistics readColStats(Table table, String statsPath) {
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath)).build()) {
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
Map<BlobMetadata, List<ColumnStatistics>> collect =
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
blobMetadataByteBufferPair -> SerializationUtils.deserialize(
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
Map<BlobMetadata, List<ColumnStatistics>> collect = Streams.stream(reader.readAll(blobMetadata)).collect(
Collectors.toMap(Pair::first, blobMetadataByteBufferPair -> SerializationUtils.deserialize(
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
return collect.get(blobMetadata.get(0)).get(0);
} catch (IOException e) {
LOG.error("Error when trying to read iceberg col stats from puffin files: ", e);
return null;
}
return null;
}

@Override
Expand All @@ -493,20 +501,45 @@ private String getStatsSource() {
}

private Path getStatsPath(Table table) {
return new Path(table.location() + STATS + table.name() + table.currentSnapshot().snapshotId());
return getStatsPath(table, table.currentSnapshot().snapshotId());
}

private Path getStatsPath(Table table, long snapshotId) {
return new Path(table.location() + STATS + table.name() + snapshotId);
}

private void invalidateStats(Path statsPath) {
private void checkAndMergeOrInvalidateStats(ColumnStatistics statsObjNew,
org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
Path statsPath = getStatsPath(tbl);
try {
FileSystem fs = statsPath.getFileSystem(conf);
if (fs.exists(statsPath)) {
// Analyze table and stats updater thread
fs.delete(statsPath, true);
} else {
// Insert queries
mergeStats(statsObjNew, tbl, hmsTable);
}
} catch (IOException e) {
LOG.error("Failed to invalidate stale column stats: {}", e.getMessage());
}
}

private void mergeStats(ColumnStatistics statsObjNew, Table tbl, org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
if (tbl.currentSnapshot().parentId() != null) {
long previousSnapshotId = tbl.currentSnapshot().parentId();
if (canProvideColStatistics(hmsTable, previousSnapshotId)) {
ColumnStatistics statsObjOld = readColStats(tbl, getStatsPath(tbl, previousSnapshotId).toString());
try {
MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld);
} catch (InvalidObjectException e) {
LOG.error("Unable to merge column stats: ", e.getMessage());
}
}
}
}

/**
* No need for exclusive locks when writing, since Iceberg tables use optimistic concurrency when writing
* and only lock the table during the commit operation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.util.List;

import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Test;

public class TestHiveIcebergColStats extends HiveIcebergStorageHandlerWithEngineBase {

@Test
public void testStatsWithInsert() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname, true);
testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of());

if (testTableType != TestTables.TestTableType.HIVE_CATALOG) {
// If the location is set and we have to gather stats, then we have to update the table stats now
shell.executeStatement("ANALYZE TABLE " + identifier + " COMPUTE STATISTICS FOR COLUMNS");
}

String insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, identifier, false);
shell.executeStatement(insert);

checkColStat(identifier.name(), "customer_id", true);
checkColStatMinMaxDistinctValue(identifier.name(), "customer_id", 0, 2, 3, 0);

insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1, identifier, false);
shell.executeStatement(insert);

checkColStat(identifier.name(), "customer_id", true);
checkColStatMinMaxDistinctValue(identifier.name(), "customer_id", 0, 5, 6, 0);

insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, identifier, false);
shell.executeStatement(insert);
checkColStat(identifier.name(), "customer_id", true);
checkColStatMinMaxDistinctValue(identifier.name(), "customer_id", 0, 5, 6, 0);
}

private void checkColStat(String tableName, String colName, boolean accurate) {
List<Object[]> rows = shell.executeStatement("DESCRIBE " + tableName + " " + colName);

if (accurate) {
Assert.assertEquals(2, rows.size());
Assert.assertEquals(StatsSetupConst.COLUMN_STATS_ACCURATE, rows.get(1)[0]);
// Check if the value is not {} (empty)
Assert.assertFalse(rows.get(1)[1].toString().matches("\\{\\}\\s*"));
} else {
// If we expect the stats to be not accurate
if (rows.size() == 1) {
// no stats now, we are ok
return;
} else {
Assert.assertEquals(2, rows.size());
Assert.assertEquals(StatsSetupConst.COLUMN_STATS_ACCURATE, rows.get(1)[0]);
// Check if the value is {} (empty)
Assert.assertTrue(rows.get(1)[1].toString().matches("\\{\\}\\s*"));
}
}
}

private void checkColStatMinMaxDistinctValue(String tableName, String colName, int minValue, int maxValue,
int distinct, int nulls) {

shell.executeStatement("set hive.iceberg.stats.source=metastore");
List<Object[]> rows = shell.executeStatement("DESCRIBE FORMATTED " + tableName + " " + colName);

// Check min
Assert.assertEquals("min", rows.get(2)[0]);
Assert.assertEquals(String.valueOf(minValue), rows.get(2)[1]);

// Check max
Assert.assertEquals("max", rows.get(3)[0]);
Assert.assertEquals(String.valueOf(maxValue), rows.get(3)[1]);

// Check num of nulls
Assert.assertEquals("num_nulls", rows.get(4)[0]);
Assert.assertEquals(String.valueOf(nulls), rows.get(4)[1]);

// Check distinct
Assert.assertEquals("distinct_count", rows.get(5)[0]);
Assert.assertEquals(String.valueOf(distinct), rows.get(5)[1]);

shell.executeStatement("set hive.iceberg.stats.source=iceberg");
rows = shell.executeStatement("DESCRIBE FORMATTED " + tableName + " " + colName);

// Check min
Assert.assertEquals("min", rows.get(2)[0]);
Assert.assertEquals(String.valueOf(minValue), rows.get(2)[1]);

// Check max
Assert.assertEquals("max", rows.get(3)[0]);
Assert.assertEquals(String.valueOf(maxValue), rows.get(3)[1]);

// Check num of nulls
Assert.assertEquals("num_nulls", rows.get(4)[0]);
Assert.assertEquals(String.valueOf(nulls), rows.get(4)[1]);

// Check distinct
Assert.assertEquals("distinct_count", rows.get(5)[0]);
Assert.assertEquals(String.valueOf(distinct), rows.get(5)[1]);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
set hive.stats.autogather=true;
set hive.stats.column.autogather=true;

set hive.iceberg.stats.source=iceberg;
drop table if exists tbl_ice_puffin;
create external table tbl_ice_puffin(a int, b string, c int) stored by iceberg tblproperties ('format-version'='2');
insert into tbl_ice_puffin values (1, 'one', 50), (2, 'two', 51),(2, 'two', 51),(2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
desc formatted tbl_ice_puffin a;
desc formatted tbl_ice_puffin c;
explain select * from tbl_ice_puffin order by a, b, c;
insert into tbl_ice_puffin values (1000, 'one', 1000), (5000, 'two', 5000);
desc formatted tbl_ice_puffin a;
desc formatted tbl_ice_puffin c;
explain select * from tbl_ice_puffin order by a, b, c;
insert into tbl_ice_puffin values (10, 'one', 100000), (5000, 'two', 510000);
explain select * from tbl_ice_puffin order by a, b, c;
desc formatted tbl_ice_puffin a;
desc formatted tbl_ice_puffin c;
-- Result: a = (min: 1, max: 5000) , c =(min: 50, max: 51000)
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ avg_col_len 3.4444444444444446
max_col_len 5
num_trues
num_falses
bit_vector HL
bit_vector
comment
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}}
PREHOOK: query: update tbl_ice_puffin set b='two' where b='one' or b='three'
Expand Down
Loading

0 comments on commit 7d07744

Please sign in to comment.