New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23445] ColumnStat refactoring #20624
Conversation
Test build #87500 has finished for PR 20624 at commit
|
* [[ColumnStat.fromExternalString]]. | ||
* | ||
* As part of the protocol, the returned map always contains a key called "version". | ||
* In the case min/max values are null (None), they won't appear in the map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now all fields are optional, we should update this comment.
@@ -305,15 +260,15 @@ object ColumnStat extends Logging { | |||
percentiles: Option[ArrayData]): ColumnStat = { | |||
// The first 6 fields are basic column stats, the 7th is ndvs for histogram bins. | |||
val cs = ColumnStat( | |||
distinctCount = BigInt(row.getLong(0)), | |||
distinctCount = Option(BigInt(row.getLong(0))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should use Some(value)
if value
is expected to be not null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd keep it an Option, just to be prepared for more flexibility and more optionality, unless you have a strong opinion. (note: this code has moved to AnalyzeColumnCommand)
@@ -32,13 +32,18 @@ object AggregateEstimation { | |||
val childStats = agg.child.stats | |||
// Check if we have column stats for all group-by columns. | |||
val colStatsExist = agg.groupingExpressions.forall { e => | |||
e.isInstanceOf[Attribute] && childStats.attributeStats.contains(e.asInstanceOf[Attribute]) | |||
e.isInstanceOf[Attribute] && ( | |||
childStats.attributeStats.get(e.asInstanceOf[Attribute]) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: childStats.attributeStats.get(e.asInstanceOf[Attribute]).exists(_.hasCountStats)
nullCount: Option[BigInt] = None, | ||
avgLen: Option[Long] = None, | ||
maxLen: Option[Long] = None, | ||
histogram: Option[Histogram] = None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indents.
|
||
try { | ||
Some(CatalogColumnStat( | ||
distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have migration issue here? Now, the key is changed. Can Spark 2.4 read the catalog prop wrote by Spark 2.3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The keys or format of stats in the metastore didn't change. After this patch it remains backwards compatible with stats created before.
What changed here is that the map
passed here used to contain stats for just one column, stripped of the columnName prefix, and now I'm passing a map
that has all statistics for all columns, with keys prefixed by columnName.
It reduces complexity in statsFromProperties
, see https://github.com/apache/spark/pull/20624/files#diff-159191585e10542f013cb3a714f26075R1057
It used to create a filtered map for every column, stripping the prefix together with column name.
Now it just passes the map of all column's stat properties, and an individual column picks up what it needs.
I'll add a bit of doc / comments about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test case? BTW, forwards compatibility is also needed since Hive metastore is being shared by different Spark versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the format doesn't change, we just change the way to save/restore stats in metastore, which looks cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The format doesn't change.
There is existing test StatisticsSuite."verify serialized column stats after analyzing columns"
that the format of the serialized stats in the metastore doesn't change by comparing it to a manual map of properties.
I will add a test that verifies it the other way - adds the properties manually as TBLPROPERTIES, and verifies that they are successfully parsed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added "verify column stats can be deserialized from tblproperties"
test.
* The key is the name of the column and name of the field (e.g. "colName.distinctCount"), | ||
* and the value is the string representation for the value. | ||
* min/max values are stored as Strings. They can be deserialized using | ||
* [[ColumnStat.fromExternalString]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Shall we move fromExternalString
from ColumnStat
to CatalogColumnStat
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that actually everything from ColumnStat object should move.
fromExternalString
/ toExternalString
-> CatalogColumnStat
And also:
supportsDatatype
/ supportsHistogram
-> AnalyzeColumnCommand
statExprs
/ rowToColumnStat
-> AnalyzeColumnCommand
because they are tied to that specific method of stats collection.
} | ||
|
||
/** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */ | ||
def toPlanStat( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toPlanStat
is the same as CatalogStatistics.toPlanStat
. Should we use toColumnState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally made it the same.
CatalogStatistics.toPlanStat
converts it to Statistics
. CatalogColumnStat.toPlanStat
converts it to ColumnStat
. The name signifies that it is used to convert both of these objects to their counterparts that are used in the query plan.
Test build #87657 has finished for PR 20624 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a few comments.
colStats += field.name -> cs | ||
} | ||
val colStats = new mutable.HashMap[String, CatalogColumnStat] | ||
val statPropsForField = new mutable.HashMap[String, mutable.HashMap[String, String]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is useless.
} | ||
|
||
// Find all the column names by matching the KEY_VERSION properties for them. | ||
val fieldNames = colStatsProps.keys.filter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fieldNames
is not being used.
val colStats = stats.attributeStats.get(col) | ||
if (colStats.get.nullCount > 0) { | ||
val colStats = stats.attributeStats.get(col).get | ||
if (!colStats.hasCountStats || colStats.nullCount.get > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check whether it is defined before calling .get
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasCountStats == distinctCount.isDefined && nullCount.isDefined
So if it passed to the second part of the ||, then hasCountStats == true -> nullCount.isDefined
avgLen = Option(row.getLong(4)), | ||
maxLen = Option(row.getLong(5)) | ||
) | ||
if (row.isNullAt(6) || !cs.nullCount.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!cs.nullCount.isDefined
-> cs.nullCount.isEmpty
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal | ||
// This version of Spark does not use min/max for binary/string types so we ignore it. | ||
case _ => | ||
throw new AnalysisException("Column statistics deserialization is not supported for " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deserialization
-> serialization
?
Test build #87671 has finished for PR 20624 at commit
|
LGTM Thanks! Merged to master |
What changes were proposed in this pull request?
Refactor ColumnStat to be more flexible.
ColumnStat
andCatalogColumnStat
just likeCatalogStatistics
is split fromStatistics
. This detaches how the statistics are stored from how they are processed in the query plan.CatalogColumnStat
keepsmin
andmax
asString
, making it not depend on dataType information.CatalogColumnStat
, parse column names from property names in the metastore (KEY_VERSION
property), not from metastore schema. This means thatCatalogColumnStat
s can be created for columns even if the schema itself is not stored in the metastore.min
,max
andhistogram
for columns were optional already. Having them all optional is more consistent, and gives flexibility to e.g. drop some of the fields through transformations if they are difficult / impossible to calculate.The added flexibility will make it possible to have alternative implementations for stats, and separates stats collection from stats and estimation processing in plans.
How was this patch tested?
Refactored existing tests to work with refactored
ColumnStat
andCatalogColumnStat
.New tests added in
StatisticsSuite
checking that backwards / forwards compatibility is not broken.