Skip to content

Commit

Permalink
[SPARK-34165][SQL] Add count_distinct as an option to Dataset#summary
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add `count_distinct` as an option argument to Dataset#summary (the method already supports count, min, max, etc.)

### Why are the changes needed?

The `summary()` method is used for lightweight exploratory data analysis.  A distinct count of all the columns is one of the most common exploratory data analysis queries.

Distinct counts can be expensive, so this shouldn't be enabled by default.  The proposed implementation is completely backwards compatible.

### Does this PR introduce _any_ user-facing change?

Yes, users can now call `df.summary("count_distinct")`, which wasn't an option before.  Users can still call `df.summary()` without any arguments and the output is the same.  `count_distinct` was not added as one of the `defaultStatistics`.

### How was this patch tested?

Unit tests.

### Additional comments

If this idea is accepted, we should add a PySpark implementation in this PR, as suggested by zero323.

Closes #31254 from MrPowers/SPARK-34165.

Authored-by: MrPowers <matthewkevinpowers@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
MrPowers authored and srowen committed Jan 28, 2021
1 parent 15445a8 commit 9ed0e3c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 0 deletions.
16 changes: 16 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2670,6 +2670,8 @@ class Dataset[T] private[sql](
* <li>min</li>
* <li>max</li>
* <li>arbitrary approximate percentiles specified as a percentage (e.g. 75%)</li>
* <li>count_distinct</li>
* <li>approx_count_distinct</li>
* </ul>
*
* If no statistics are given, this function computes count, mean, stddev, min,
Expand Down Expand Up @@ -2712,6 +2714,20 @@ class Dataset[T] private[sql](
* ds.select("age", "height").summary().show()
* }}}
*
* Specify statistics to output custom summaries:
*
* {{{
* ds.summary("count", "count_distinct").show()
* }}}
*
* The distinct count isn't included by default.
*
* You can also run approximate distinct counts which are faster:
*
* {{{
* ds.summary("count", "approx_count_distinct").show()
* }}}
*
* See also [[describe]] for basic statistics.
*
* @param statistics Statistics from above list to be computed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ object StatFunctions extends Logging {
} else {
stats.toLowerCase(Locale.ROOT) match {
case "count" => (child: Expression) => Count(child).toAggregateExpression()
case "count_distinct" => (child: Expression) =>
Count(child).toAggregateExpression(isDistinct = true)
case "approx_count_distinct" => (child: Expression) =>
HyperLogLogPlusPlus(child).toAggregateExpression()
case "mean" => (child: Expression) => Average(child).toAggregateExpression()
case "stddev" => (child: Expression) => StddevSamp(child).toAggregateExpression()
case "min" => (child: Expression) => Min(child).toAggregateExpression()
Expand Down
28 changes: 28 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,15 @@ class DataFrameSuite extends QueryTest
("David", 60, 192),
("Amy", 24, 180)).toDF("name", "age", "height")

private lazy val person3: DataFrame = Seq(
("Luis", 1, 99),
("Luis", 16, 99),
("Luis", 16, 176),
("Fernando", 32, 99),
("Fernando", 32, 164),
("David", 60, 99),
("Amy", 24, 99)).toDF("name", "age", "height")

test("describe") {
val describeResult = Seq(
Row("count", "4", "4", "4"),
Expand Down Expand Up @@ -921,6 +930,25 @@ class DataFrameSuite extends QueryTest
checkAnswer(emptyDescription, emptySummaryResult)
}

test("SPARK-34165: Add count_distinct to summary") {
val summaryDF = person3.summary("count", "count_distinct")

val summaryResult = Seq(
Row("count", "7", "7", "7"),
Row("count_distinct", "4", "5", "3"))

def getSchemaAsSeq(df: DataFrame): Seq[String] = df.schema.map(_.name)
assert(getSchemaAsSeq(summaryDF) === Seq("summary", "name", "age", "height"))
checkAnswer(summaryDF, summaryResult)

val approxSummaryDF = person3.summary("count", "approx_count_distinct")
val approxSummaryResult = Seq(
Row("count", "7", "7", "7"),
Row("approx_count_distinct", "4", "5", "3"))
assert(getSchemaAsSeq(summaryDF) === Seq("summary", "name", "age", "height"))
checkAnswer(approxSummaryDF, approxSummaryResult)
}

test("summary advanced") {
val stats = Array("count", "50.01%", "max", "mean", "min", "25%")
val orderMatters = person2.summary(stats: _*)
Expand Down

0 comments on commit 9ed0e3c

Please sign in to comment.