Skip to content

Commit

Permalink
HIVE-26277: NPEs and rounding issues in ColumnStatsAggregator classes…
Browse files Browse the repository at this point in the history
… (Alessandro Solimando reviewed by Stamatis Zampetakis)

1. Add and invoke checkStatisticsList to prevent NPEs in aggregators;
they all rely on a non-empty list of statistics.
2. Cast integers to double in divisions to make computations more
accurate and avoid rounding issues.
3. Align loggers names to match the class they are in and avoid
misleading log messages.
4. Add documentation for ndvtuner based on current understanding of how
it should work.

Closes apache#3339

Move (and complete) ndvTuner documentation from tests to production classes
  • Loading branch information
asolimando authored and DongWei-4 committed Oct 28, 2022
1 parent 8cfc8ee commit a4a6db1
Show file tree
Hide file tree
Showing 19 changed files with 2,028 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class BinaryColumnStatsAggregator extends ColumnStatsAggregator {
@Override
public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo,
List<String> partNames, boolean areAllPartsFound) throws MetaException {
checkStatisticsList(colStatsWithSourceInfo);

ColumnStatisticsObj statsObj = null;
String colType = null;
String colName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class BooleanColumnStatsAggregator extends ColumnStatsAggregator {
@Override
public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo,
List<String> partNames, boolean areAllPartsFound) throws MetaException {
checkStatisticsList(colStatsWithSourceInfo);

ColumnStatisticsObj statsObj = null;
String colType = null;
String colName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,28 @@

public abstract class ColumnStatsAggregator {
public boolean useDensityFunctionForNDVEstimation;
/**
* The tuner controls the derivation of the NDV value when aggregating statistics from multiple partitions. It accepts
* values in the range [0, 1] pushing the aggregated NDV closer to the lower, or upper bound respectively.
* <p>
* For example, consider the aggregation of three partitions with NDV values 2, 3, and 4, respectively. The NDV
* lower bound is 4 (the highest among individual NDVs), and the upper bound is 9 (the sum of individual NDVs). In
* this case the aggregated NDV will be in the range [4, 9] touching the bounds when the tuner is equal to 0, or 1
* respectively.
* </p>
* <p>
* It is optional and concrete implementations can choose to ignore it completely.
* </p>
*/
public double ndvTuner;

public abstract ColumnStatisticsObj aggregate(
List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo, List<String> partNames,
boolean areAllPartsFound) throws MetaException;

void checkStatisticsList(List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo) {
if (colStatsWithSourceInfo.isEmpty()) {
throw new IllegalArgumentException("Column statistics list must not be empty when aggregating");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class DateColumnStatsAggregator extends ColumnStatsAggregator implements
@Override
public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo,
List<String> partNames, boolean areAllPartsFound) throws MetaException {
checkStatisticsList(colStatsWithSourceInfo);

ColumnStatisticsObj statsObj = null;
String colType = null;
String colName = null;
Expand Down Expand Up @@ -99,9 +101,10 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) {
ColumnStatisticsObj cso = csp.getColStatsObj();
DateColumnStatsDataInspector newData = dateInspectorFromStats(cso);
lowerBound = Math.max(lowerBound, newData.getNumDVs());
higherBound += newData.getNumDVs();
if (newData.isSetLowValue() && newData.isSetHighValue()) {
densityAvgSum += (diff(newData.getHighValue(), newData.getLowValue())) / newData.getNumDVs();
densityAvgSum += ((double) diff(newData.getHighValue(), newData.getLowValue())) / newData.getNumDVs();
}
if (ndvEstimator != null) {
ndvEstimator.mergeEstimators(newData.getNdvEstimator());
Expand All @@ -124,7 +127,8 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
} else {
long estimation;
if (useDensityFunctionForNDVEstimation) {
if (useDensityFunctionForNDVEstimation && aggregateData != null
&& aggregateData.isSetLowValue() && aggregateData.isSetHighValue()) {
// We have estimation, lowerbound and higherbound. We use estimation
// if it is between lowerbound and higherbound.
double densityAvg = densityAvgSum / partNames.size();
Expand Down Expand Up @@ -161,7 +165,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
String partName = csp.getPartName();
DateColumnStatsData newData = cso.getStatsData().getDateStats();
if (useDensityFunctionForNDVEstimation) {
densityAvgSum += diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs();
densityAvgSum += ((double) diff(newData.getHighValue(), newData.getLowValue())) / newData.getNumDVs();
}
adjustedIndexMap.put(partName, (double) indexMap.get(partName));
adjustedStatsMap.put(partName, cso.getStatsData());
Expand Down Expand Up @@ -190,7 +194,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
csd.setDateStats(aggregateData);
adjustedStatsMap.put(pseudoPartName.toString(), csd);
if (useDensityFunctionForNDVEstimation) {
densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue())
densityAvgSum += ((double) diff(aggregateData.getHighValue(), aggregateData.getLowValue()))
/ aggregateData.getNumDVs();
}
// reset everything
Expand Down Expand Up @@ -223,7 +227,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
csd.setDateStats(aggregateData);
adjustedStatsMap.put(pseudoPartName.toString(), csd);
if (useDensityFunctionForNDVEstimation) {
densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue())
densityAvgSum += ((double) diff(aggregateData.getHighValue(), aggregateData.getLowValue()))
/ aggregateData.getNumDVs();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implemen
@Override
public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo,
List<String> partNames, boolean areAllPartsFound) throws MetaException {
checkStatisticsList(colStatsWithSourceInfo);

ColumnStatisticsObj statsObj = null;
String colType = null;
String colName = null;
Expand Down Expand Up @@ -128,7 +130,8 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
} else {
long estimation;
if (useDensityFunctionForNDVEstimation) {
if (useDensityFunctionForNDVEstimation && aggregateData != null
&& aggregateData.isSetLowValue() && aggregateData.isSetHighValue()) {
// We have estimation, lowerbound and higherbound. We use estimation
// if it is between lowerbound and higherbound.
double densityAvg = densityAvgSum / partNames.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implement
@Override
public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo,
List<String> partNames, boolean areAllPartsFound) throws MetaException {
checkStatisticsList(colStatsWithSourceInfo);

ColumnStatisticsObj statsObj = null;
String colType = null;
String colName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
@Override
public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo,
List<String> partNames, boolean areAllPartsFound) throws MetaException {
checkStatisticsList(colStatsWithSourceInfo);

ColumnStatisticsObj statsObj = null;
String colType = null;
String colName = null;
Expand Down Expand Up @@ -100,7 +102,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
LongColumnStatsDataInspector newData = longInspectorFromStats(cso);
lowerBound = Math.max(lowerBound, newData.getNumDVs());
higherBound += newData.getNumDVs();
densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
densityAvgSum += ((double) (newData.getHighValue() - newData.getLowValue())) / newData.getNumDVs();
if (ndvEstimator != null) {
ndvEstimator.mergeEstimators(newData.getNdvEstimator());
}
Expand Down Expand Up @@ -159,7 +161,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
String partName = csp.getPartName();
LongColumnStatsData newData = cso.getStatsData().getLongStats();
if (useDensityFunctionForNDVEstimation) {
densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
densityAvgSum += ((double) (newData.getHighValue() - newData.getLowValue())) / newData.getNumDVs();
}
adjustedIndexMap.put(partName, (double) indexMap.get(partName));
adjustedStatsMap.put(partName, cso.getStatsData());
Expand Down Expand Up @@ -188,7 +190,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
csd.setLongStats(aggregateData);
adjustedStatsMap.put(pseudoPartName.toString(), csd);
if (useDensityFunctionForNDVEstimation) {
densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
densityAvgSum += ((double) (aggregateData.getHighValue() - aggregateData.getLowValue())) / aggregateData.getNumDVs();
}
// reset everything
pseudoPartName = new StringBuilder();
Expand Down Expand Up @@ -221,7 +223,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
csd.setLongStats(aggregateData);
adjustedStatsMap.put(pseudoPartName.toString(), csd);
if (useDensityFunctionForNDVEstimation) {
densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
densityAvgSum += ((double) (aggregateData.getHighValue() - aggregateData.getLowValue())) / aggregateData.getNumDVs();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
public class StringColumnStatsAggregator extends ColumnStatsAggregator implements
IExtrapolatePartStatus {

private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class);
private static final Logger LOG = LoggerFactory.getLogger(StringColumnStatsAggregator.class);

@Override
public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo,
List<String> partNames, boolean areAllPartsFound) throws MetaException {
checkStatisticsList(colStatsWithSourceInfo);

ColumnStatisticsObj statsObj = null;
String colType = null;
String colName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class TimestampColumnStatsAggregator extends ColumnStatsAggregator implem
@Override
public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWithSourceInfo,
List<String> partNames, boolean areAllPartsFound) throws MetaException {
checkStatisticsList(colStatsWithSourceInfo);

ColumnStatisticsObj statsObj = null;
String colType = null;
String colName = null;
Expand Down Expand Up @@ -99,9 +101,10 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
for (ColStatsObjWithSourceInfo csp : colStatsWithSourceInfo) {
ColumnStatisticsObj cso = csp.getColStatsObj();
TimestampColumnStatsDataInspector newData = timestampInspectorFromStats(cso);
lowerBound = Math.max(lowerBound, newData.getNumDVs());
higherBound += newData.getNumDVs();
if (newData.isSetLowValue() && newData.isSetHighValue()) {
densityAvgSum += (diff(newData.getHighValue(), newData.getLowValue())) / newData.getNumDVs();
densityAvgSum += ((double) (diff(newData.getHighValue(), newData.getLowValue())) / newData.getNumDVs());
}
if (ndvEstimator != null) {
ndvEstimator.mergeEstimators(newData.getNdvEstimator());
Expand All @@ -124,7 +127,8 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
} else {
long estimation;
if (useDensityFunctionForNDVEstimation) {
if (useDensityFunctionForNDVEstimation && aggregateData != null
&& aggregateData.isSetLowValue() && aggregateData.isSetHighValue() ) {
// We have estimation, lowerbound and higherbound. We use estimation
// if it is between lowerbound and higherbound.
double densityAvg = densityAvgSum / partNames.size();
Expand Down Expand Up @@ -161,7 +165,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
String partName = csp.getPartName();
TimestampColumnStatsData newData = cso.getStatsData().getTimestampStats();
if (useDensityFunctionForNDVEstimation) {
densityAvgSum += diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs();
densityAvgSum += ((double) diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs());
}
adjustedIndexMap.put(partName, (double) indexMap.get(partName));
adjustedStatsMap.put(partName, cso.getStatsData());
Expand Down Expand Up @@ -190,7 +194,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
csd.setTimestampStats(aggregateData);
adjustedStatsMap.put(pseudoPartName.toString(), csd);
if (useDensityFunctionForNDVEstimation) {
densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue())
densityAvgSum += ((double) diff(aggregateData.getHighValue(), aggregateData.getLowValue()))
/ aggregateData.getNumDVs();
}
// reset everything
Expand Down Expand Up @@ -223,7 +227,7 @@ public ColumnStatisticsObj aggregate(List<ColStatsObjWithSourceInfo> colStatsWit
csd.setTimestampStats(aggregateData);
adjustedStatsMap.put(pseudoPartName.toString(), csd);
if (useDensityFunctionForNDVEstimation) {
densityAvgSum += diff(aggregateData.getHighValue(), aggregateData.getLowValue())
densityAvgSum += ((double) diff(aggregateData.getHighValue(), aggregateData.getLowValue()))
/ aggregateData.getNumDVs();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hive.metastore;

import org.apache.hadoop.hive.common.ndv.fm.FMSketch;
import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;

public class StatisticsTestUtils {

private static final String HIVE_ENGINE = "hive";

private StatisticsTestUtils() {
throw new AssertionError("Suppress default constructor for non instantiation");
}

/**
* Creates a {@link ColStatsObjWithSourceInfo} object for a given table, partition and column information,
* using the given statistics data.
* @param data the column statistics data
* @param tbl the target table for stats
* @param column the target column for stats
* @param partName the target partition for stats
* @return column statistics objects with source info.
*/
public static ColStatsObjWithSourceInfo createStatsWithInfo(ColumnStatisticsData data, Table tbl,
FieldSchema column, String partName) {
ColumnStatisticsObj statObj = new ColumnStatisticsObj(column.getName(), column.getType(), data);
return new ColStatsObjWithSourceInfo(statObj, tbl.getCatName(), tbl.getDbName(), column.getName(), partName);
}

/**
* Creates an FM sketch object initialized with the given values.
* @param values the values to be added
* @return an FM sketch initialized with the given values.
*/
public static FMSketch createFMSketch(long... values) {
FMSketch fm = new FMSketch(1);
for (long value : values) {
fm.addToEstimator(value);
}
return fm;
}

/**
* Creates an FM sketch object initialized with the given values.
* @param values the values to be added
* @return an FM sketch initialized with the given values.
*/
public static FMSketch createFMSketch(String... values) {
FMSketch fm = new FMSketch(1);
for (String value : values) {
fm.addToEstimator(value);
}
return fm;
}

/**
* Creates an HLL object initialized with the given values.
* @param values the values to be added
* @return an HLL object initialized with the given values.
*/
public static HyperLogLog createHll(long... values) {
HyperLogLog hll = HyperLogLog.builder().build();
for (long value : values) {
hll.addLong(value);
}
return hll;
}

/**
* Creates an HLL object initialized with the given values.
* @param values the values to be added
* @return an HLL object initialized with the given values.
*/
public static HyperLogLog createHll(String... values) {
HyperLogLog hll = HyperLogLog.builder().build();
for (String value : values) {
hll.addBytes(value.getBytes());
}
return hll;
}
}

0 comments on commit a4a6db1

Please sign in to comment.