Skip to content

[SPARK-52588][SQL] Approx_top_k: accumulate and estimate #51308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

yhuang-db
Copy link
Contributor

@yhuang-db yhuang-db commented Jun 27, 2025

What changes were proposed in this pull request?

This PR adds two SQL functions; approx_top_k_accumulate, an aggregation function that accumulates input data into a sketch, and approx_top_k_estimate, an expression function that estimates the top k frequent items from a sketch.

1. approx_top_k_accumulate

Syntax

approx_too_k_accumulate(expr[, maxItemsTracked])

Arguments

  • expr: An expression of BOOLEAN, BINARY, STRING, DATE, TIMESTAMP or numeric type.
  • maxItemsTracked: An optional INTEGER literal. If maxItemsTracked is not specified, it defaults to 10000. This is the maximum number of distinct values that can be tracked by the sketch.

Returns

The return of this function is a STRUCT with three fields: (1) Sketch field, which is the BINARY form of the sketch status; (2) ItemTypeNull field, which is a null value indicating the original type of expr. And (3) MaxItemsTracked, which is the maxItemsTracked argument.

2. approx_top_k_estimate

Syntax

approx_top_k_estimate(state[, k])

Arguments

  • state: An expression for the sketch STRUCT that is generated by approx_top_k_accumulate or approx_top_k_combine
  • k: An optional INTEGER literal greater than 0. If k is not specified, it defaults to 5.

Returns

Results are returned as an ARRAY of type STRUCT, where each STRUCT contains an item field for the value (with its original input type) and a count field (of type LONG) with the approximate number of occurrences. The array is sorted by count descending.

Summary of changes:

Tests:

  • DataFrameAggregateSuite.scala
    • End-to-end SQL query tests with approx_top_k_estimate(approx_top_k_accumulate(expr, maxItemsTracked), k) together.
  • ApproxTopKSuite.scala
    • Negative expression tests for invalid parameters.

Implementation:

  • ApproxTopKAggregates.scala
    • approx_top_k_accumulate
  • ApproxTopKExpressions.scala
    • approx_top_k_estimation

Why are the changes needed?

They are useful sibling functions for approx_top_k queries.

Does this PR introduce any user-facing change?

Yes, this PR introduces a new user-facing SQL function. See user examples as below.

> SELECT approx_top_k_estimate(approx_top_k_accumulate(expr, 100), 10) FROM VALUES (0), (0), (1), (1), (2), (3), (4), (4) AS tab(expr);
 [{'item':4,'count':2},{'item':1,'count':2},{'item':0,'count':2},{'item':3,'count':1},{'item':2,'count':1}]

How was this patch tested?

Unit tests for end-to-end SQL queries and invalid input for expressions.

Was this patch authored or co-authored using generative AI tooling?

N/A

@github-actions github-actions bot added the SQL label Jun 27, 2025
@yhuang-db yhuang-db changed the title Spark 52588 [SPARK-52588][SQL] Approx_top_k: accumulate, combine, estimate Jun 27, 2025
@yhuang-db yhuang-db changed the title [SPARK-52588][SQL] Approx_top_k: accumulate, combine, estimate [SPARK-52588][SQL] Approx_top_k: accumulate and estimate Jul 3, 2025
@yhuang-db yhuang-db marked this pull request as ready for review July 3, 2025 21:15
defaultCheck
} else if (!k.foldable) {
TypeCheckFailure("K must be a constant literal")
} else {
Copy link
Member

@gengliangwang gengliangwang Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also check the StructType of state?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's add test for this.


def getSketchStateDataType(itemDataType: DataType): StructType =
StructType(
StructField("Sketch", BinaryType, nullable = false) ::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sketch => sketch. let's use camelCase

* and the maximum number of items tracked by the sketch.
*
* @param expr the child expression to accumulate items from
* @param maxItemsTracked the maximum number of items to track in the sketch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add doc for mutableAggBufferOffset and inputAggBufferOffset

k = Literal(10),
maxItemsTracked = Literal(10000)
)
assert(agg.checkInputDataTypes().isFailure)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also check the failure message

k = Sum(BoundReference(1, IntegerType, nullable = true)),
maxItemsTracked = Literal(10)
)
assert(badAgg.checkInputDataTypes().isFailure)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also check the failure message

@@ -2931,6 +2904,112 @@ class DataFrameAggregateSuite extends QueryTest
res,
Row(LocalTime.of(22, 1, 0), LocalTime.of(3, 0, 0)))
}

test("SPARK-52588: accumulate and estimate of Integer with default parameters") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move the new tests related to approx_top_k_* to a new test suite?

@@ -2931,6 +2904,112 @@ class DataFrameAggregateSuite extends QueryTest
res,
Row(LocalTime.of(22, 1, 0), LocalTime.of(3, 0, 0)))
}

test("SPARK-52588: accumulate and estimate of Integer with default parameters") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also test inputs with different data types, similar to the approx_top_k function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants