diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index a8d5741ebd4f..91f84cef51d8 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -56,8 +56,10 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Context.Operation; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -452,27 +454,33 @@ public boolean canSetColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsT } @Override - public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable, - List colStats) { + public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable, List colStats) { Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); String snapshotId = String.format("%s-STATS-%d", tbl.name(), tbl.currentSnapshot().snapshotId()); - invalidateStats(getStatsPath(tbl)); - byte[] serializeColStats = SerializationUtils.serialize((Serializable) colStats); - try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString())) - .createdBy(Constants.HIVE_ENGINE).build()) { - writer.add( - new Blob( - tbl.name() + "-" + snapshotId, - ImmutableList.of(1), - tbl.currentSnapshot().snapshotId(), - tbl.currentSnapshot().sequenceNumber(), - ByteBuffer.wrap(serializeColStats), - PuffinCompressionCodec.NONE, - ImmutableMap.of())); - writer.finish(); - return true; - } catch (IOException e) { - LOG.error(String.valueOf(e)); + return writeColStats(colStats.get(0), tbl, snapshotId); + } + + private boolean writeColStats(ColumnStatistics tableColStats, Table tbl, String snapshotId) { + try { + boolean rewriteStats = removeColStatsIfExists(tbl); + if (!rewriteStats) { + checkAndMergeColStats(tableColStats, tbl); + } + // Currently, we are only serializing table level stats. + byte[] serializeColStats = SerializationUtils.serialize(tableColStats); + try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(getColStatsPath(tbl).toString())) + .createdBy(Constants.HIVE_ENGINE).build()) { + writer.add(new Blob(tbl.name() + "-" + snapshotId, ImmutableList.of(1), tbl.currentSnapshot().snapshotId(), + tbl.currentSnapshot().sequenceNumber(), ByteBuffer.wrap(serializeColStats), PuffinCompressionCodec.NONE, + ImmutableMap.of())); + writer.finish(); + return true; + } catch (IOException e) { + LOG.warn("Unable to write stats to puffin file", e.getMessage()); + return false; + } + } catch (InvalidObjectException | IOException e) { + LOG.warn("Unable to invalidate or merge stats: ", e.getMessage()); return false; } } @@ -480,17 +488,17 @@ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTabl @Override public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - if (canSetColStatistics(hmsTable)) { - Path statsPath = getStatsPath(table); - try { - FileSystem fs = statsPath.getFileSystem(conf); - if (fs.exists(statsPath)) { - return true; - } - } catch (IOException e) { - LOG.warn("Exception when trying to find Iceberg column stats for table:{} , snapshot:{} , " + - "statsPath: {} , stack trace: {}", table.name(), table.currentSnapshot(), statsPath, e); - } + return canSetColStatistics(hmsTable) && canProvideColStats(table, table.currentSnapshot().snapshotId()); + } + + private boolean canProvideColStats(Table table, long snapshotId) { + Path statsPath = getColStatsPath(table, snapshotId); + try { + FileSystem fs = statsPath.getFileSystem(conf); + return fs.exists(statsPath); + } catch (IOException e) { + LOG.warn("Exception when trying to find Iceberg column stats for table:{} , snapshot:{} , " + + "statsPath: {} , stack trace: {}", table.name(), table.currentSnapshot(), statsPath, e); } return false; } @@ -498,19 +506,22 @@ public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table @Override public List getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - String statsPath = getStatsPath(table).toString(); + Path statsPath = getColStatsPath(table); LOG.info("Using stats from puffin file at: {}", statsPath); - try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath)).build()) { + return readColStats(table, statsPath).getStatsObj(); + } + + private ColumnStatistics readColStats(Table table, Path statsPath) { + try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath.toString())).build()) { List blobMetadata = reader.fileMetadata().blobs(); - Map> collect = - Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first, - blobMetadataByteBufferPair -> SerializationUtils.deserialize( - ByteBuffers.toByteArray(blobMetadataByteBufferPair.second())))); - return collect.get(blobMetadata.get(0)).get(0).getStatsObj(); - } catch (IOException e) { - LOG.error("Error when trying to read iceberg col stats from puffin files: ", e); + Map collect = Streams.stream(reader.readAll(blobMetadata)).collect( + Collectors.toMap(Pair::first, blobMetadataByteBufferPair -> SerializationUtils.deserialize( + ByteBuffers.toByteArray(blobMetadataByteBufferPair.second())))); + return collect.get(blobMetadata.get(0)); + } catch (IOException | IndexOutOfBoundsException e) { + LOG.warn(" Unable to read iceberg col stats from puffin files: ", e); + return new ColumnStatistics(); } - return null; } @Override @@ -535,18 +546,31 @@ private String getStatsSource() { return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_STATS_SOURCE, Constants.ICEBERG).toLowerCase(); } - private Path getStatsPath(Table table) { - return new Path(table.location() + STATS + table.name() + table.currentSnapshot().snapshotId()); + private Path getColStatsPath(Table table) { + return getColStatsPath(table, table.currentSnapshot().snapshotId()); } - private void invalidateStats(Path statsPath) { - try { - FileSystem fs = statsPath.getFileSystem(conf); - if (fs.exists(statsPath)) { - fs.delete(statsPath, true); + private Path getColStatsPath(Table table, long snapshotId) { + return new Path(table.location() + STATS + table.name() + snapshotId); + } + + private boolean removeColStatsIfExists(Table tbl) throws IOException { + Path statsPath = getColStatsPath(tbl); + FileSystem fs = statsPath.getFileSystem(conf); + if (fs.exists(statsPath)) { + // Analyze table and stats updater thread + return fs.delete(statsPath, true); + } + return false; + } + + private void checkAndMergeColStats(ColumnStatistics statsObjNew, Table tbl) throws InvalidObjectException { + Long previousSnapshotId = tbl.currentSnapshot().parentId(); + if (previousSnapshotId != null && canProvideColStats(tbl, previousSnapshotId)) { + ColumnStatistics statsObjOld = readColStats(tbl, getColStatsPath(tbl, previousSnapshotId)); + if (statsObjOld != null && statsObjOld.getStatsObjSize() != 0 && !statsObjNew.getStatsObj().isEmpty()) { + MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld); } - } catch (IOException e) { - LOG.error("Failed to invalidate stale column stats: {}", e.getMessage()); } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java index 4ed42dac9fde..e09cc732e3b3 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java @@ -229,6 +229,37 @@ public void testColumnStatsAccurate() throws Exception { } } + @Test + public void testMergeStatsWithInsert() { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname, true); + testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of()); + + if (testTableType != TestTables.TestTableType.HIVE_CATALOG) { + // If the location is set and we have to gather stats, then we have to update the table stats now + shell.executeStatement("ANALYZE TABLE " + identifier + " COMPUTE STATISTICS FOR COLUMNS"); + } + + String insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, identifier, false); + shell.executeStatement(insert); + + checkColStat(identifier.name(), "customer_id", true); + checkColStatMinMaxDistinctValue(identifier.name(), "customer_id", 0, 2, 3, 0); + + insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1, identifier, false); + shell.executeStatement(insert); + + checkColStat(identifier.name(), "customer_id", true); + checkColStatMinMaxDistinctValue(identifier.name(), "customer_id", 0, 5, 6, 0); + + insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, identifier, false); + shell.executeStatement(insert); + checkColStat(identifier.name(), "customer_id", true); + checkColStatMinMaxDistinctValue(identifier.name(), "customer_id", 0, 5, 6, 0); + } + private void checkColStat(String tableName, String colName, boolean accurate) { List rows = shell.executeStatement("DESCRIBE " + tableName + " " + colName); @@ -274,4 +305,47 @@ private void checkColStatMaxLengthDistinctValue(String tableName, String colName Assert.assertEquals("distinct_count", rows.get(5)[0]); Assert.assertEquals(String.valueOf(distinct), rows.get(5)[1]); } + + private void checkColStatMinMaxDistinctValue(String tableName, String colName, int minValue, int maxValue, + int distinct, int nulls) { + + shell.executeStatement("set hive.iceberg.stats.source=metastore"); + List rows = shell.executeStatement("DESCRIBE FORMATTED " + tableName + " " + colName); + + // Check min + Assert.assertEquals("min", rows.get(2)[0]); + Assert.assertEquals(String.valueOf(minValue), rows.get(2)[1]); + + // Check max + Assert.assertEquals("max", rows.get(3)[0]); + Assert.assertEquals(String.valueOf(maxValue), rows.get(3)[1]); + + // Check num of nulls + Assert.assertEquals("num_nulls", rows.get(4)[0]); + Assert.assertEquals(String.valueOf(nulls), rows.get(4)[1]); + + // Check distinct + Assert.assertEquals("distinct_count", rows.get(5)[0]); + Assert.assertEquals(String.valueOf(distinct), rows.get(5)[1]); + + shell.executeStatement("set hive.iceberg.stats.source=iceberg"); + rows = shell.executeStatement("DESCRIBE FORMATTED " + tableName + " " + colName); + + // Check min + Assert.assertEquals("min", rows.get(2)[0]); + Assert.assertEquals(String.valueOf(minValue), rows.get(2)[1]); + + // Check max + Assert.assertEquals("max", rows.get(3)[0]); + Assert.assertEquals(String.valueOf(maxValue), rows.get(3)[1]); + + // Check num of nulls + Assert.assertEquals("num_nulls", rows.get(4)[0]); + Assert.assertEquals(String.valueOf(nulls), rows.get(4)[1]); + + // Check distinct + Assert.assertEquals("distinct_count", rows.get(5)[0]); + Assert.assertEquals(String.valueOf(distinct), rows.get(5)[1]); + + } } diff --git a/iceberg/iceberg-handler/src/test/queries/positive/puffin_col_stats.q b/iceberg/iceberg-handler/src/test/queries/positive/puffin_col_stats.q new file mode 100644 index 000000000000..93269513f53e --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/puffin_col_stats.q @@ -0,0 +1,21 @@ +-- Mask random uuid +--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/ +set hive.stats.autogather=true; +set hive.stats.column.autogather=true; + +set hive.iceberg.stats.source=iceberg; +drop table if exists tbl_ice_puffin; +create external table tbl_ice_puffin(a int, b string, c int) stored by iceberg tblproperties ('format-version'='2'); +insert into tbl_ice_puffin values (1, 'one', 50), (2, 'two', 51),(2, 'two', 51),(2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56); +desc formatted tbl_ice_puffin a; +desc formatted tbl_ice_puffin c; +explain select * from tbl_ice_puffin order by a, b, c; +insert into tbl_ice_puffin values (1000, 'one', 1000), (5000, 'two', 5000); +desc formatted tbl_ice_puffin a; +desc formatted tbl_ice_puffin c; +explain select * from tbl_ice_puffin order by a, b, c; +insert into tbl_ice_puffin values (10, 'one', 100000), (5000, 'two', 510000); +explain select * from tbl_ice_puffin order by a, b, c; +desc formatted tbl_ice_puffin a; +desc formatted tbl_ice_puffin c; +-- Result: a = (min: 1, max: 5000) , c =(min: 50, max: 51000) \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/col_stats.q.out b/iceberg/iceberg-handler/src/test/results/positive/col_stats.q.out index 8c905528e79d..d46fc29798e8 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/col_stats.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/col_stats.q.out @@ -96,7 +96,7 @@ avg_col_len 3.4444444444444446 max_col_len 5 num_trues num_falses -bit_vector HL +bit_vector comment COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}} PREHOOK: query: update tbl_ice_puffin set b='two' where b='one' or b='three' diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out index 037d1b439f73..328198edfce1 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out @@ -76,9 +76,9 @@ Stage-3 <-Map 1 [SIMPLE_EDGE] vectorized PARTITION_ONLY_SHUFFLE [RS_13] PartitionCols:_col1 - Select Operator [SEL_12] (rows=22 width=91) + Select Operator [SEL_12] (rows=22 width=87) Output:["_col0","_col1"] - TableScan [TS_0] (rows=22 width=91) + TableScan [TS_0] (rows=22 width=87) default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] Reducer 3 vectorized File Output Operator [FS_21] @@ -90,7 +90,7 @@ Stage-3 PARTITION_ONLY_SHUFFLE [RS_16] Group By Operator [GBY_15] (rows=1 width=400) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"] - Select Operator [SEL_14] (rows=22 width=91) + Select Operator [SEL_14] (rows=22 width=87) Output:["a","ccy"] Please refer to the previous Select Operator [SEL_12] @@ -170,9 +170,9 @@ Stage-3 <-Map 1 [SIMPLE_EDGE] vectorized PARTITION_ONLY_SHUFFLE [RS_13] PartitionCols:iceberg_bucket(_col1, 2) - Select Operator [SEL_12] (rows=22 width=91) + Select Operator [SEL_12] (rows=22 width=87) Output:["_col0","_col1"] - TableScan [TS_0] (rows=22 width=91) + TableScan [TS_0] (rows=22 width=87) default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] Reducer 3 vectorized File Output Operator [FS_21] @@ -184,7 +184,7 @@ Stage-3 PARTITION_ONLY_SHUFFLE [RS_16] Group By Operator [GBY_15] (rows=1 width=400) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"] - Select Operator [SEL_14] (rows=22 width=91) + Select Operator [SEL_14] (rows=22 width=87) Output:["a","ccy"] Please refer to the previous Select Operator [SEL_12] @@ -264,9 +264,9 @@ Stage-3 <-Map 1 [SIMPLE_EDGE] vectorized PARTITION_ONLY_SHUFFLE [RS_13] PartitionCols:_col1, iceberg_bucket(_col2, 3) - Select Operator [SEL_12] (rows=22 width=99) + Select Operator [SEL_12] (rows=22 width=94) Output:["_col0","_col1","_col2"] - TableScan [TS_0] (rows=22 width=99) + TableScan [TS_0] (rows=22 width=94) default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] Reducer 3 vectorized File Output Operator [FS_21] @@ -278,7 +278,7 @@ Stage-3 PARTITION_ONLY_SHUFFLE [RS_16] Group By Operator [GBY_15] (rows=1 width=568) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"] - Select Operator [SEL_14] (rows=22 width=99) + Select Operator [SEL_14] (rows=22 width=94) Output:["a","ccy","c"] Please refer to the previous Select Operator [SEL_12] @@ -403,7 +403,7 @@ Stage-3 Output:["_col0","_col1","_col2"] Filter Operator [FIL_14] (rows=4 width=99) predicate:(b = 'EUR') - TableScan [TS_0] (rows=22 width=99) + TableScan [TS_0] (rows=22 width=94) default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] Reducer 3 vectorized File Output Operator [FS_24] @@ -461,7 +461,7 @@ Stage-3 Output:["_col0","_col1","_col2"] Filter Operator [FIL_12] (rows=1 width=99) predicate:((c = 100L) and (b = 'USD')) - TableScan [TS_0] (rows=22 width=99) + TableScan [TS_0] (rows=22 width=94) default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] PARTITION_ONLY_SHUFFLE [RS_17] Group By Operator [GBY_16] (rows=1 width=568) diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out index 50ce82dc2484..73d039d76e0f 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out @@ -589,13 +589,13 @@ STAGE PLANS: minReductionHashAggr: 0.99 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: double), _col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: decimal(4,2)) null sort order: zzzzzzzzz sort order: +++++++++ Map-reduce partition columns: _col0 (type: double), _col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: decimal(4,2)) - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col9 (type: float) Execution mode: vectorized, llap LLAP IO: all inputs (cache only) @@ -607,14 +607,14 @@ STAGE PLANS: keys: KEY._col0 (type: double), KEY._col1 (type: boolean), KEY._col2 (type: int), KEY._col3 (type: bigint), KEY._col4 (type: binary), KEY._col5 (type: string), KEY._col6 (type: timestamp), KEY._col7 (type: date), KEY._col8 (type: decimal(4,2)) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: _col9 (type: float), _col0 (type: double), _col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: decimal(4,2)) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -691,13 +691,13 @@ STAGE PLANS: minReductionHashAggr: 0.99 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: double), _col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: decimal(4,2)) null sort order: zzzzzzzzz sort order: +++++++++ Map-reduce partition columns: _col0 (type: double), _col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: decimal(4,2)) - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col9 (type: float) Execution mode: vectorized, llap LLAP IO: all inputs (cache only) @@ -709,14 +709,14 @@ STAGE PLANS: keys: KEY._col0 (type: double), KEY._col1 (type: boolean), KEY._col2 (type: int), KEY._col3 (type: bigint), KEY._col4 (type: binary), KEY._col5 (type: string), KEY._col6 (type: timestamp), KEY._col7 (type: date), KEY._col8 (type: decimal(4,2)) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: _col9 (type: float), _col0 (type: double), _col1 (type: boolean), _col2 (type: int), _col3 (type: bigint), _col4 (type: binary), _col5 (type: string), _col6 (type: timestamp), _col7 (type: date), _col8 (type: decimal(4,2)) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 373 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 746 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat diff --git a/iceberg/iceberg-handler/src/test/results/positive/puffin_col_stats.q.out b/iceberg/iceberg-handler/src/test/results/positive/puffin_col_stats.q.out new file mode 100644 index 000000000000..063a32d823e9 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/puffin_col_stats.q.out @@ -0,0 +1,234 @@ +PREHOOK: query: drop table if exists tbl_ice_puffin +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists tbl_ice_puffin +POSTHOOK: type: DROPTABLE +PREHOOK: query: create external table tbl_ice_puffin(a int, b string, c int) stored by iceberg tblproperties ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_ice_puffin +POSTHOOK: query: create external table tbl_ice_puffin(a int, b string, c int) stored by iceberg tblproperties ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_ice_puffin +PREHOOK: query: insert into tbl_ice_puffin values (1, 'one', 50), (2, 'two', 51),(2, 'two', 51),(2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_ice_puffin +POSTHOOK: query: insert into tbl_ice_puffin values (1, 'one', 50), (2, 'two', 51),(2, 'two', 51),(2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_ice_puffin +PREHOOK: query: desc formatted tbl_ice_puffin a +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@tbl_ice_puffin +POSTHOOK: query: desc formatted tbl_ice_puffin a +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@tbl_ice_puffin +col_name a +data_type int +min 1 +max 333 +num_nulls 0 +distinct_count 7 +avg_col_len +max_col_len +num_trues +num_falses +bit_vector HL +comment +COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}} +PREHOOK: query: desc formatted tbl_ice_puffin c +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@tbl_ice_puffin +POSTHOOK: query: desc formatted tbl_ice_puffin c +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@tbl_ice_puffin +col_name c +data_type int +min 50 +max 56 +num_nulls 0 +distinct_count 7 +avg_col_len +max_col_len +num_trues +num_falses +bit_vector HL +comment +COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}} +PREHOOK: query: explain select * from tbl_ice_puffin order by a, b, c +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_puffin +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select * from tbl_ice_puffin order by a, b, c +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_puffin +POSTHOOK: Output: hdfs://### HDFS PATH ### +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_8] + Select Operator [SEL_7] (rows=9 width=95) + Output:["_col0","_col1","_col2"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_6] + Select Operator [SEL_5] (rows=9 width=95) + Output:["_col0","_col1","_col2"] + TableScan [TS_0] (rows=9 width=95) + default@tbl_ice_puffin,tbl_ice_puffin,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] + +PREHOOK: query: insert into tbl_ice_puffin values (1000, 'one', 1000), (5000, 'two', 5000) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_ice_puffin +POSTHOOK: query: insert into tbl_ice_puffin values (1000, 'one', 1000), (5000, 'two', 5000) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_ice_puffin +PREHOOK: query: desc formatted tbl_ice_puffin a +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@tbl_ice_puffin +POSTHOOK: query: desc formatted tbl_ice_puffin a +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@tbl_ice_puffin +col_name a +data_type int +min 1 +max 5000 +num_nulls 0 +distinct_count 9 +avg_col_len +max_col_len +num_trues +num_falses +bit_vector +comment +COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}} +PREHOOK: query: desc formatted tbl_ice_puffin c +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@tbl_ice_puffin +POSTHOOK: query: desc formatted tbl_ice_puffin c +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@tbl_ice_puffin +col_name c +data_type int +min 50 +max 5000 +num_nulls 0 +distinct_count 9 +avg_col_len +max_col_len +num_trues +num_falses +bit_vector +comment +COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}} +PREHOOK: query: explain select * from tbl_ice_puffin order by a, b, c +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_puffin +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select * from tbl_ice_puffin order by a, b, c +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_puffin +POSTHOOK: Output: hdfs://### HDFS PATH ### +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_8] + Select Operator [SEL_7] (rows=11 width=95) + Output:["_col0","_col1","_col2"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_6] + Select Operator [SEL_5] (rows=11 width=95) + Output:["_col0","_col1","_col2"] + TableScan [TS_0] (rows=11 width=95) + default@tbl_ice_puffin,tbl_ice_puffin,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] + +PREHOOK: query: insert into tbl_ice_puffin values (10, 'one', 100000), (5000, 'two', 510000) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_ice_puffin +POSTHOOK: query: insert into tbl_ice_puffin values (10, 'one', 100000), (5000, 'two', 510000) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_ice_puffin +PREHOOK: query: explain select * from tbl_ice_puffin order by a, b, c +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_puffin +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select * from tbl_ice_puffin order by a, b, c +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_puffin +POSTHOOK: Output: hdfs://### HDFS PATH ### +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_8] + Select Operator [SEL_7] (rows=13 width=95) + Output:["_col0","_col1","_col2"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_6] + Select Operator [SEL_5] (rows=13 width=95) + Output:["_col0","_col1","_col2"] + TableScan [TS_0] (rows=13 width=95) + default@tbl_ice_puffin,tbl_ice_puffin,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] + +PREHOOK: query: desc formatted tbl_ice_puffin a +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@tbl_ice_puffin +POSTHOOK: query: desc formatted tbl_ice_puffin a +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@tbl_ice_puffin +col_name a +data_type int +min 1 +max 5000 +num_nulls 0 +distinct_count 9 +avg_col_len +max_col_len +num_trues +num_falses +bit_vector +comment +COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}} +PREHOOK: query: desc formatted tbl_ice_puffin c +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@tbl_ice_puffin +POSTHOOK: query: desc formatted tbl_ice_puffin c +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@tbl_ice_puffin +col_name c +data_type int +min 50 +max 510000 +num_nulls 0 +distinct_count 9 +avg_col_len +max_col_len +num_trues +num_falses +bit_vector +comment +COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\",\"c\":\"true\"}} diff --git a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out index 34696284306a..1e37901ddd3c 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/vectorized_iceberg_read_mixed.q.out @@ -538,14 +538,14 @@ Stage-0 Stage-1 Reducer 2 vectorized File Output Operator [FS_11] - Select Operator [SEL_10] (rows=1 width=373) + Select Operator [SEL_10] (rows=2 width=373) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"] - Group By Operator [GBY_9] (rows=1 width=373) + Group By Operator [GBY_9] (rows=2 width=373) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2, KEY._col3, KEY._col4, KEY._col5, KEY._col6, KEY._col7, KEY._col8 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_8] PartitionCols:_col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8 - Group By Operator [GBY_7] (rows=1 width=373) + Group By Operator [GBY_7] (rows=2 width=373) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(t_float)"],keys:t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal TableScan [TS_0] (rows=2 width=373) default@tbl_ice_mixed_all_types,tbl_ice_mixed_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"] @@ -603,14 +603,14 @@ Stage-0 Stage-1 Reducer 2 vectorized File Output Operator [FS_11] - Select Operator [SEL_10] (rows=1 width=373) + Select Operator [SEL_10] (rows=2 width=373) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"] - Group By Operator [GBY_9] (rows=1 width=373) + Group By Operator [GBY_9] (rows=2 width=373) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2, KEY._col3, KEY._col4, KEY._col5, KEY._col6, KEY._col7, KEY._col8 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_8] PartitionCols:_col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8 - Group By Operator [GBY_7] (rows=1 width=373) + Group By Operator [GBY_7] (rows=2 width=373) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["max(t_float)"],keys:t_double, t_boolean, t_int, t_bigint, t_binary, t_string, t_timestamp, t_date, t_decimal TableScan [TS_0] (rows=2 width=373) default@tbl_ice_mixed_all_types,tbl_ice_mixed_all_types,Tbl:COMPLETE,Col:COMPLETE,Output:["t_float","t_double","t_boolean","t_int","t_bigint","t_binary","t_string","t_timestamp","t_date","t_decimal"] diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/DescTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/DescTableOperation.java index c39dbc860e6e..940f80526d2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/DescTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/DescTableOperation.java @@ -208,9 +208,12 @@ private void getColumnDataColPathSpecified(Table table, Partition part, List partitions = new ArrayList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index c04e68ba3bc4..16f347a84450 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.parse.StorageFormat.StorageHandlerTypes; import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; @@ -272,8 +273,7 @@ default boolean canProvideBasicStatistics() { * @param colStats * @return boolean */ - default boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table table, - List colStats) { + default boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table table, List colStats) { return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java index 8bdf6647993e..e2777a128bc4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java @@ -63,6 +63,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class ColStatsProcessor implements IStatsProcessor { private static transient final Logger LOG = LoggerFactory.getLogger(ColStatsProcessor.class);