Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.ErrorMsg;
Expand Down Expand Up @@ -452,65 +454,74 @@ public boolean canSetColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsT
}

@Override
public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
List<ColumnStatistics> colStats) {
public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable, List<ColumnStatistics> colStats) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
String snapshotId = String.format("%s-STATS-%d", tbl.name(), tbl.currentSnapshot().snapshotId());
invalidateStats(getStatsPath(tbl));
byte[] serializeColStats = SerializationUtils.serialize((Serializable) colStats);
try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString()))
.createdBy(Constants.HIVE_ENGINE).build()) {
writer.add(
new Blob(
tbl.name() + "-" + snapshotId,
ImmutableList.of(1),
tbl.currentSnapshot().snapshotId(),
tbl.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(serializeColStats),
PuffinCompressionCodec.NONE,
ImmutableMap.of()));
writer.finish();
return true;
} catch (IOException e) {
LOG.error(String.valueOf(e));
return writeColStats(colStats.get(0), tbl, snapshotId);
}

private boolean writeColStats(ColumnStatistics tableColStats, Table tbl, String snapshotId) {
try {
boolean rewriteStats = removeColStatsIfExists(tbl);
if (!rewriteStats) {
checkAndMergeColStats(tableColStats, tbl);
}
// Currently, we are only serializing table level stats.
byte[] serializeColStats = SerializationUtils.serialize(tableColStats);
try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(getColStatsPath(tbl).toString()))
.createdBy(Constants.HIVE_ENGINE).build()) {
writer.add(new Blob(tbl.name() + "-" + snapshotId, ImmutableList.of(1), tbl.currentSnapshot().snapshotId(),
tbl.currentSnapshot().sequenceNumber(), ByteBuffer.wrap(serializeColStats), PuffinCompressionCodec.NONE,
ImmutableMap.of()));
writer.finish();
return true;
} catch (IOException e) {
LOG.warn("Unable to write stats to puffin file", e.getMessage());
return false;
}
} catch (InvalidObjectException | IOException e) {
LOG.warn("Unable to invalidate or merge stats: ", e.getMessage());
return false;
}
}

@Override
public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
if (canSetColStatistics(hmsTable)) {
Path statsPath = getStatsPath(table);
try {
FileSystem fs = statsPath.getFileSystem(conf);
if (fs.exists(statsPath)) {
return true;
}
} catch (IOException e) {
LOG.warn("Exception when trying to find Iceberg column stats for table:{} , snapshot:{} , " +
"statsPath: {} , stack trace: {}", table.name(), table.currentSnapshot(), statsPath, e);
}
return canSetColStatistics(hmsTable) && canProvideColStats(table, table.currentSnapshot().snapshotId());
}

private boolean canProvideColStats(Table table, long snapshotId) {
Path statsPath = getColStatsPath(table, snapshotId);
try {
FileSystem fs = statsPath.getFileSystem(conf);
return fs.exists(statsPath);
} catch (IOException e) {
LOG.warn("Exception when trying to find Iceberg column stats for table:{} , snapshot:{} , " +
"statsPath: {} , stack trace: {}", table.name(), table.currentSnapshot(), statsPath, e);
}
return false;
}

@Override
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
String statsPath = getStatsPath(table).toString();
Path statsPath = getColStatsPath(table);
LOG.info("Using stats from puffin file at: {}", statsPath);
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath)).build()) {
return readColStats(table, statsPath).getStatsObj();
}

private ColumnStatistics readColStats(Table table, Path statsPath) {
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
Map<BlobMetadata, List<ColumnStatistics>> collect =
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
blobMetadataByteBufferPair -> SerializationUtils.deserialize(
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
} catch (IOException e) {
LOG.error("Error when trying to read iceberg col stats from puffin files: ", e);
Map<BlobMetadata, ColumnStatistics> collect = Streams.stream(reader.readAll(blobMetadata)).collect(
Collectors.toMap(Pair::first, blobMetadataByteBufferPair -> SerializationUtils.deserialize(
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
return collect.get(blobMetadata.get(0));
} catch (IOException | IndexOutOfBoundsException e) {
LOG.warn(" Unable to read iceberg col stats from puffin files: ", e);
return new ColumnStatistics();
}
return null;
}

@Override
Expand All @@ -535,18 +546,31 @@ private String getStatsSource() {
return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_STATS_SOURCE, Constants.ICEBERG).toLowerCase();
}

private Path getStatsPath(Table table) {
return new Path(table.location() + STATS + table.name() + table.currentSnapshot().snapshotId());
private Path getColStatsPath(Table table) {
return getColStatsPath(table, table.currentSnapshot().snapshotId());
}

private void invalidateStats(Path statsPath) {
try {
FileSystem fs = statsPath.getFileSystem(conf);
if (fs.exists(statsPath)) {
fs.delete(statsPath, true);
private Path getColStatsPath(Table table, long snapshotId) {
return new Path(table.location() + STATS + table.name() + snapshotId);
}

private boolean removeColStatsIfExists(Table tbl) throws IOException {
Path statsPath = getColStatsPath(tbl);
FileSystem fs = statsPath.getFileSystem(conf);
if (fs.exists(statsPath)) {
// Analyze table and stats updater thread
return fs.delete(statsPath, true);
}
return false;
}

private void checkAndMergeColStats(ColumnStatistics statsObjNew, Table tbl) throws InvalidObjectException {
Long previousSnapshotId = tbl.currentSnapshot().parentId();
if (previousSnapshotId != null && canProvideColStats(tbl, previousSnapshotId)) {
ColumnStatistics statsObjOld = readColStats(tbl, getColStatsPath(tbl, previousSnapshotId));
if (statsObjOld != null && statsObjOld.getStatsObjSize() != 0 && !statsObjNew.getStatsObj().isEmpty()) {
MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld);
}
} catch (IOException e) {
LOG.error("Failed to invalidate stale column stats: {}", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,37 @@ public void testColumnStatsAccurate() throws Exception {
}
}

@Test
public void testMergeStatsWithInsert() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname, true);
testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of());

if (testTableType != TestTables.TestTableType.HIVE_CATALOG) {
// If the location is set and we have to gather stats, then we have to update the table stats now
shell.executeStatement("ANALYZE TABLE " + identifier + " COMPUTE STATISTICS FOR COLUMNS");
}

String insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, identifier, false);
shell.executeStatement(insert);

checkColStat(identifier.name(), "customer_id", true);
checkColStatMinMaxDistinctValue(identifier.name(), "customer_id", 0, 2, 3, 0);

insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1, identifier, false);
shell.executeStatement(insert);

checkColStat(identifier.name(), "customer_id", true);
checkColStatMinMaxDistinctValue(identifier.name(), "customer_id", 0, 5, 6, 0);

insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, identifier, false);
shell.executeStatement(insert);
checkColStat(identifier.name(), "customer_id", true);
checkColStatMinMaxDistinctValue(identifier.name(), "customer_id", 0, 5, 6, 0);
}

private void checkColStat(String tableName, String colName, boolean accurate) {
List<Object[]> rows = shell.executeStatement("DESCRIBE " + tableName + " " + colName);

Expand Down Expand Up @@ -274,4 +305,47 @@ private void checkColStatMaxLengthDistinctValue(String tableName, String colName
Assert.assertEquals("distinct_count", rows.get(5)[0]);
Assert.assertEquals(String.valueOf(distinct), rows.get(5)[1]);
}

private void checkColStatMinMaxDistinctValue(String tableName, String colName, int minValue, int maxValue,
int distinct, int nulls) {

shell.executeStatement("set hive.iceberg.stats.source=metastore");
List<Object[]> rows = shell.executeStatement("DESCRIBE FORMATTED " + tableName + " " + colName);

// Check min
Assert.assertEquals("min", rows.get(2)[0]);
Assert.assertEquals(String.valueOf(minValue), rows.get(2)[1]);

// Check max
Assert.assertEquals("max", rows.get(3)[0]);
Assert.assertEquals(String.valueOf(maxValue), rows.get(3)[1]);

// Check num of nulls
Assert.assertEquals("num_nulls", rows.get(4)[0]);
Assert.assertEquals(String.valueOf(nulls), rows.get(4)[1]);

// Check distinct
Assert.assertEquals("distinct_count", rows.get(5)[0]);
Assert.assertEquals(String.valueOf(distinct), rows.get(5)[1]);

shell.executeStatement("set hive.iceberg.stats.source=iceberg");
rows = shell.executeStatement("DESCRIBE FORMATTED " + tableName + " " + colName);

// Check min
Assert.assertEquals("min", rows.get(2)[0]);
Assert.assertEquals(String.valueOf(minValue), rows.get(2)[1]);

// Check max
Assert.assertEquals("max", rows.get(3)[0]);
Assert.assertEquals(String.valueOf(maxValue), rows.get(3)[1]);

// Check num of nulls
Assert.assertEquals("num_nulls", rows.get(4)[0]);
Assert.assertEquals(String.valueOf(nulls), rows.get(4)[1]);

// Check distinct
Assert.assertEquals("distinct_count", rows.get(5)[0]);
Assert.assertEquals(String.valueOf(distinct), rows.get(5)[1]);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
set hive.stats.autogather=true;
set hive.stats.column.autogather=true;

set hive.iceberg.stats.source=iceberg;
drop table if exists tbl_ice_puffin;
create external table tbl_ice_puffin(a int, b string, c int) stored by iceberg tblproperties ('format-version'='2');
insert into tbl_ice_puffin values (1, 'one', 50), (2, 'two', 51),(2, 'two', 51),(2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
desc formatted tbl_ice_puffin a;
desc formatted tbl_ice_puffin c;
explain select * from tbl_ice_puffin order by a, b, c;
insert into tbl_ice_puffin values (1000, 'one', 1000), (5000, 'two', 5000);
desc formatted tbl_ice_puffin a;
desc formatted tbl_ice_puffin c;
explain select * from tbl_ice_puffin order by a, b, c;
insert into tbl_ice_puffin values (10, 'one', 100000), (5000, 'two', 510000);
explain select * from tbl_ice_puffin order by a, b, c;
desc formatted tbl_ice_puffin a;
desc formatted tbl_ice_puffin c;
-- Result: a = (min: 1, max: 5000) , c =(min: 50, max: 51000)
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ avg_col_len 3.4444444444444446
max_col_len 5
num_trues
num_falses
bit_vector HL
bit_vector
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change?

Copy link
Member Author

@simhadri-g simhadri-g Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect it is removed here because of:
HIVE-23846 : Avoid unnecessary serialization and deserialization of bitvectors

In the method getNdvEstimator of ColumnStatsDataInspector, it will call isSetBitVectors(), in which it serializes the bitvectors again when we already have deserialized bitvectors ndvEstimator.

ps: HIVE-17286 introduced HL to avoid expensive string serialisation for ndv

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't get, how adding minor delta stats merge caused that behavior change? @kasakrisz any hints?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick search and bit_vector has HL in case of non iceberg tables in our q test outs when the column stats present for that column.
@simhadri-g Could you please look into why was it removed in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, thanks @kasakrisz

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bit vector that contains the ndv values is being overwritten by updatecolstats which is called by alter table command.
That's what I want to fix in the alter Table PR
Since bit vector is being overwritten, Hyper log log is not being used
So HL is missing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A alter table command is run during the doCommit Stage, which seems to unnecessarily overwrite the bit_vector . This should be fixed for iceberg tables. I have created follow up jira to address it.
https://issues.apache.org/jira/browse/HIVE-27528

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@simhadri-g, please add a link to a follow-up ticket

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}}
PREHOOK: query: update tbl_ice_puffin set b='two' where b='one' or b='three'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ Stage-3
<-Map 1 [SIMPLE_EDGE] vectorized
PARTITION_ONLY_SHUFFLE [RS_13]
PartitionCols:_col1
Select Operator [SEL_12] (rows=22 width=91)
Select Operator [SEL_12] (rows=22 width=87)
Output:["_col0","_col1"]
TableScan [TS_0] (rows=22 width=91)
TableScan [TS_0] (rows=22 width=87)
default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
Reducer 3 vectorized
File Output Operator [FS_21]
Expand All @@ -90,7 +90,7 @@ Stage-3
PARTITION_ONLY_SHUFFLE [RS_16]
Group By Operator [GBY_15] (rows=1 width=400)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"]
Select Operator [SEL_14] (rows=22 width=91)
Select Operator [SEL_14] (rows=22 width=87)
Output:["a","ccy"]
Please refer to the previous Select Operator [SEL_12]

Expand Down Expand Up @@ -170,9 +170,9 @@ Stage-3
<-Map 1 [SIMPLE_EDGE] vectorized
PARTITION_ONLY_SHUFFLE [RS_13]
PartitionCols:iceberg_bucket(_col1, 2)
Select Operator [SEL_12] (rows=22 width=91)
Select Operator [SEL_12] (rows=22 width=87)
Output:["_col0","_col1"]
TableScan [TS_0] (rows=22 width=91)
TableScan [TS_0] (rows=22 width=87)
default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"]
Reducer 3 vectorized
File Output Operator [FS_21]
Expand All @@ -184,7 +184,7 @@ Stage-3
PARTITION_ONLY_SHUFFLE [RS_16]
Group By Operator [GBY_15] (rows=1 width=400)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"]
Select Operator [SEL_14] (rows=22 width=91)
Select Operator [SEL_14] (rows=22 width=87)
Output:["a","ccy"]
Please refer to the previous Select Operator [SEL_12]

Expand Down Expand Up @@ -264,9 +264,9 @@ Stage-3
<-Map 1 [SIMPLE_EDGE] vectorized
PARTITION_ONLY_SHUFFLE [RS_13]
PartitionCols:_col1, iceberg_bucket(_col2, 3)
Select Operator [SEL_12] (rows=22 width=99)
Select Operator [SEL_12] (rows=22 width=94)
Output:["_col0","_col1","_col2"]
TableScan [TS_0] (rows=22 width=99)
TableScan [TS_0] (rows=22 width=94)
default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"]
Reducer 3 vectorized
File Output Operator [FS_21]
Expand All @@ -278,7 +278,7 @@ Stage-3
PARTITION_ONLY_SHUFFLE [RS_16]
Group By Operator [GBY_15] (rows=1 width=568)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"]
Select Operator [SEL_14] (rows=22 width=99)
Select Operator [SEL_14] (rows=22 width=94)
Output:["a","ccy","c"]
Please refer to the previous Select Operator [SEL_12]

Expand Down Expand Up @@ -403,7 +403,7 @@ Stage-3
Output:["_col0","_col1","_col2"]
Filter Operator [FIL_14] (rows=4 width=99)
predicate:(b = 'EUR')
TableScan [TS_0] (rows=22 width=99)
TableScan [TS_0] (rows=22 width=94)
default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"]
Reducer 3 vectorized
File Output Operator [FS_24]
Expand Down Expand Up @@ -461,7 +461,7 @@ Stage-3
Output:["_col0","_col1","_col2"]
Filter Operator [FIL_12] (rows=1 width=99)
predicate:((c = 100L) and (b = 'USD'))
TableScan [TS_0] (rows=22 width=99)
TableScan [TS_0] (rows=22 width=94)
default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"]
PARTITION_ONLY_SHUFFLE [RS_17]
Group By Operator [GBY_16] (rows=1 width=568)
Expand Down
Loading