From a776774c5468b297f406ac3821fe34cb8f9b34ed Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Mon, 10 Nov 2025 16:36:09 -0800 Subject: [PATCH 1/8] commit --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../catalyst/expressions/pipeOperators.scala | 8 +- .../apache/spark/sql/internal/SQLConf.scala | 12 + .../analyzer-results/pipe-operators.sql.out | 263 +++++++++++------ .../sql-tests/inputs/pipe-operators.sql | 61 +++- .../sql-tests/results/pipe-operators.sql.out | 274 ++++++++++++------ 6 files changed, 443 insertions(+), 177 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 98c514925fa0..796911d5b380 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -463,7 +463,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor ResolveProcedures :: BindProcedures :: ResolveTableSpec :: - ValidateAndStripPipeExpressions :: + ValidateAndStripPipeExpressions(conf.pipeOperatorAllowAggregateInSelect) :: ResolveSQLFunctions :: ResolveSQLTableFunctions :: ResolveAliases :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala index 2ee68663ad2f..2c9d97e1a9b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala @@ -64,8 +64,12 @@ object EliminatePipeOperators extends Rule[LogicalPlan] { /** * Validates and strips PipeExpression nodes from a logical plan once the child expressions are * resolved. + * @param allowAggregateInSelect When true, aggregate functions are allowed in non-AGGREGATE + * pipe operator clauses. When false, aggregate functions must be + * used exclusively with the AGGREGATE clause. */ -object ValidateAndStripPipeExpressions extends Rule[LogicalPlan] { +case class ValidateAndStripPipeExpressions(allowAggregateInSelect: Boolean) + extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsPattern(PIPE_EXPRESSION), ruleId) { case node: LogicalPlan => @@ -77,7 +81,7 @@ object ValidateAndStripPipeExpressions extends Rule[LogicalPlan] { if (p.isAggregate && firstAggregateFunction.isEmpty) { throw QueryCompilationErrors .pipeOperatorAggregateExpressionContainsNoAggregateFunction(p.child) - } else if (!p.isAggregate) { + } else if (!p.isAggregate && !allowAggregateInSelect) { firstAggregateFunction.foreach { a => throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, p.clause) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 36ded2bd7b63..d694c7062358 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3831,6 +3831,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val PIPE_OPERATOR_ALLOW_AGGREGATE_IN_SELECT = + buildConf("spark.sql.pipeOperator.allowAggregateInSelect") + .doc("When true, aggregate functions can be used in |> SELECT and other pipe operator " + + "clauses without requiring the |> AGGREGATE keyword. When false, aggregate functions " + + "must be used exclusively with the |> AGGREGATE clause for proper aggregation semantics.") + .version("4.1.0") + .booleanConf + .createWithDefault(true) + val TVF_ALLOW_MULTIPLE_TABLE_ARGUMENTS_ENABLED = buildConf("spark.sql.tvf.allowMultipleTableArguments.enabled") .doc("When true, allows multiple table arguments for table-valued functions, " + @@ -7584,6 +7593,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { override def exponentLiteralAsDecimalEnabled: Boolean = getConf(SQLConf.LEGACY_EXPONENT_LITERAL_AS_DECIMAL_ENABLED) + def pipeOperatorAllowAggregateInSelect: Boolean = + getConf(SQLConf.PIPE_OPERATOR_ALLOW_AGGREGATE_IN_SELECT) + def allowNegativeScaleOfDecimalEnabled: Boolean = getConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index dda0722e21d7..6bc6aa34b9ef 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -636,42 +636,25 @@ Repartition 3, true table t |> select sum(x) as result -- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(x#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 19, - "stopIndex" : 24, - "fragment" : "sum(x)" - } ] -} +Aggregate [sum(x#x) AS result#xL] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv -- !query table t |> select y, length(y) + sum(x) as result -- !query analysis -org.apache.spark.sql.AnalysisException +org.apache.spark.sql.catalyst.ExtendedAnalysisException { - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(x#x)" - }, + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 34, - "stopIndex" : 39, - "fragment" : "sum(x)" + "startIndex" : 12, + "stopIndex" : 49, + "fragment" : "select y, length(y) + sum(x) as result" } ] } @@ -680,42 +663,25 @@ org.apache.spark.sql.AnalysisException from t |> select sum(x) -- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(x#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 18, - "stopIndex" : 23, - "fragment" : "sum(x)" - } ] -} +Aggregate [sum(x#x) AS sum(x)#xL] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv -- !query from t as t_alias |> select y, sum(x) -- !query analysis -org.apache.spark.sql.AnalysisException +org.apache.spark.sql.catalyst.ExtendedAnalysisException { - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(x#x)" - }, + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 32, + "startIndex" : 22, "stopIndex" : 37, - "fragment" : "sum(x)" + "fragment" : "select y, sum(x)" } ] } @@ -868,20 +834,16 @@ Project [x#x, y#x, z#x, (z#x + 1) AS plus_one#x] table t |> extend sum(x) as z -- !query analysis -org.apache.spark.sql.AnalysisException +org.apache.spark.sql.catalyst.ExtendedAnalysisException { - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "EXTEND", - "expr" : "sum(x#x)" - }, + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 19, - "stopIndex" : 24, - "fragment" : "sum(x)" + "startIndex" : 1, + "stopIndex" : 29, + "fragment" : "table t\n|> extend sum(x) as z" } ] } @@ -3683,28 +3645,6 @@ org.apache.spark.sql.AnalysisException } --- !query -table other -|> select sum(a) as result --- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(a#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 23, - "stopIndex" : 28, - "fragment" : "sum(a)" - } ] -} - - -- !query table other |> aggregate @@ -4738,6 +4678,163 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException } +-- !query +table other +|> select sum(a) as result +-- !query analysis +Aggregate [sum(a#x) AS result#xL] ++- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +table other +|> select sum(a) as total_a, avg(b) as avg_b +-- !query analysis +Aggregate [sum(a#x) AS total_a#xL, avg(b#x) AS avg_b#x] ++- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +table other +|> where b > 1 +|> select sum(a) as result +-- !query analysis +Aggregate [sum(a#x) AS result#xL] ++- Filter (b#x > 1) + +- PipeOperator + +- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +table other +|> select sum(a) as total_a +|> select total_a * 2 as doubled +-- !query analysis +Project [(total_a#xL * cast(2 as bigint)) AS doubled#xL] ++- Aggregate [sum(a#x) AS total_a#xL] + +- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +table other +|> select a, sum(b) as sum_b group by a +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'group'", + "hint" : "" + } +} + + +-- !query +table other +|> extend sum(a) as total_a +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 39, + "fragment" : "table other\n|> extend sum(a) as total_a" + } ] +} + + +-- !query +table other +|> where sum(a) > 5 +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WHERE_CONDITION", + "sqlState" : "42903", + "messageParameters" : { + "condition" : "\"(sum(a) > 5)\"", + "expressionList" : "sum(spark_catalog.default.other.a)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 31, + "fragment" : "table other\n|> where sum(a) > 5" + } ] +} + + +-- !query +table other +|> aggregate sum(a) as total_a +-- !query analysis +Aggregate [sum(a#x) AS total_a#xL] ++- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +set spark.sql.pipeOperator.allowAggregateInSelect=false +-- !query analysis +SetCommand (spark.sql.pipeOperator.allowAggregateInSelect,Some(false)) + + +-- !query +table other +|> select sum(a) as result +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "clause" : "SELECT", + "expr" : "sum(a#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 23, + "stopIndex" : 28, + "fragment" : "sum(a)" + } ] +} + + +-- !query +table other +|> aggregate sum(a) as total_a +-- !query analysis +Aggregate [sum(a#x) AS total_a#xL] ++- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +set spark.sql.pipeOperator.allowAggregateInSelect=true +-- !query analysis +SetCommand (spark.sql.pipeOperator.allowAggregateInSelect,Some(true)) + + +-- !query +table other +|> select sum(a) as result +-- !query analysis +Aggregate [sum(a#x) AS result#xL] ++- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + -- !query drop table t -- !query analysis diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index ce70cea0e6e0..326a4c24eb71 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -1157,10 +1157,6 @@ select 1 as x, 2 as y table other |> aggregate a; --- Using aggregate functions without the AGGREGATE keyword is not allowed. -table other -|> select sum(a) as result; - -- The AGGREGATE keyword requires a GROUP BY clause and/or aggregation function(s). table other |> aggregate; @@ -1756,6 +1752,63 @@ table web_v1 |> order by item_sk, d_date |> limit 100; +-- Configuration test: allowing aggregates in SELECT with spark.sql.pipeOperator.allowAggregateInSelect. +------------------------------------------------------------------------------------------------------- +-- Test the configuration that controls whether aggregate functions can be used in |> SELECT +-- and other pipe operator clauses without requiring the |> AGGREGATE keyword. + +-- Verify that the default behavior (enabled) allows aggregates in SELECT. +table other +|> select sum(a) as result; + +-- Aggregates in SELECT with multiple aggregate functions. +table other +|> select sum(a) as total_a, avg(b) as avg_b; + +-- Aggregates in SELECT with WHERE clause. +table other +|> where b > 1 +|> select sum(a) as result; + +-- Aggregates in SELECT with chaining. +table other +|> select sum(a) as total_a +|> select total_a * 2 as doubled; + +-- Mixed aggregates and non-aggregates in SELECT (should work like regular aggregation). +table other +|> select a, sum(b) as sum_b group by a; + +-- Aggregates in EXTEND. +table other +|> extend sum(a) as total_a; + +-- Aggregates in WHERE should still fail (aggregates not allowed in WHERE generally). +table other +|> where sum(a) > 5; + +-- The |> AGGREGATE keyword should still work with the configuration enabled. +table other +|> aggregate sum(a) as total_a; + +-- Disable the configuration to test the legacy behavior. +set spark.sql.pipeOperator.allowAggregateInSelect=false; + +-- With configuration disabled, aggregates in SELECT now fail (requires |> AGGREGATE). +table other +|> select sum(a) as result; + +-- The |> AGGREGATE keyword is required when configuration is disabled. +table other +|> aggregate sum(a) as total_a; + +-- Re-enable the configuration. +set spark.sql.pipeOperator.allowAggregateInSelect=true; + +-- Verify that aggregates in SELECT work again. +table other +|> select sum(a) as result; + -- Cleanup. ----------- drop table t; diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index bf7453361276..f0a9848a89fe 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -583,24 +583,9 @@ struct table t |> select sum(x) as result -- !query schema -struct<> +struct -- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(x#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 19, - "stopIndex" : 24, - "fragment" : "sum(x)" - } ] -} +1 -- !query @@ -609,20 +594,16 @@ table t -- !query schema struct<> -- !query output -org.apache.spark.sql.AnalysisException +org.apache.spark.sql.catalyst.ExtendedAnalysisException { - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(x#x)" - }, + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 34, - "stopIndex" : 39, - "fragment" : "sum(x)" + "startIndex" : 12, + "stopIndex" : 49, + "fragment" : "select y, length(y) + sum(x) as result" } ] } @@ -631,24 +612,9 @@ org.apache.spark.sql.AnalysisException from t |> select sum(x) -- !query schema -struct<> +struct -- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(x#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 18, - "stopIndex" : 23, - "fragment" : "sum(x)" - } ] -} +1 -- !query @@ -657,20 +623,16 @@ from t as t_alias -- !query schema struct<> -- !query output -org.apache.spark.sql.AnalysisException +org.apache.spark.sql.catalyst.ExtendedAnalysisException { - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(x#x)" - }, + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 32, + "startIndex" : 22, "stopIndex" : 37, - "fragment" : "sum(x)" + "fragment" : "select y, sum(x)" } ] } @@ -821,20 +783,16 @@ table t -- !query schema struct<> -- !query output -org.apache.spark.sql.AnalysisException +org.apache.spark.sql.catalyst.ExtendedAnalysisException { - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "EXTEND", - "expr" : "sum(x#x)" - }, + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 19, - "stopIndex" : 24, - "fragment" : "sum(x)" + "startIndex" : 1, + "stopIndex" : 29, + "fragment" : "table t\n|> extend sum(x) as z" } ] } @@ -3325,30 +3283,6 @@ org.apache.spark.sql.AnalysisException } --- !query -table other -|> select sum(a) as result --- !query schema -struct<> --- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(a#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 23, - "stopIndex" : 28, - "fragment" : "sum(a)" - } ] -} - - -- !query table other |> aggregate @@ -4252,6 +4186,172 @@ struct select sum(a) as result +-- !query schema +struct +-- !query output +4 + + +-- !query +table other +|> select sum(a) as total_a, avg(b) as avg_b +-- !query schema +struct +-- !query output +4 2.3333333333333335 + + +-- !query +table other +|> where b > 1 +|> select sum(a) as result +-- !query schema +struct +-- !query output +3 + + +-- !query +table other +|> select sum(a) as total_a +|> select total_a * 2 as doubled +-- !query schema +struct +-- !query output +8 + + +-- !query +table other +|> select a, sum(b) as sum_b group by a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'group'", + "hint" : "" + } +} + + +-- !query +table other +|> extend sum(a) as total_a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 39, + "fragment" : "table other\n|> extend sum(a) as total_a" + } ] +} + + +-- !query +table other +|> where sum(a) > 5 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WHERE_CONDITION", + "sqlState" : "42903", + "messageParameters" : { + "condition" : "\"(sum(a) > 5)\"", + "expressionList" : "sum(spark_catalog.default.other.a)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 31, + "fragment" : "table other\n|> where sum(a) > 5" + } ] +} + + +-- !query +table other +|> aggregate sum(a) as total_a +-- !query schema +struct +-- !query output +4 + + +-- !query +set spark.sql.pipeOperator.allowAggregateInSelect=false +-- !query schema +struct +-- !query output +spark.sql.pipeOperator.allowAggregateInSelect false + + +-- !query +table other +|> select sum(a) as result +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "clause" : "SELECT", + "expr" : "sum(a#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 23, + "stopIndex" : 28, + "fragment" : "sum(a)" + } ] +} + + +-- !query +table other +|> aggregate sum(a) as total_a +-- !query schema +struct +-- !query output +4 + + +-- !query +set spark.sql.pipeOperator.allowAggregateInSelect=true +-- !query schema +struct +-- !query output +spark.sql.pipeOperator.allowAggregateInSelect true + + +-- !query +table other +|> select sum(a) as result +-- !query schema +struct +-- !query output +4 + + -- !query drop table t -- !query schema From f796bf6b2641c428d3e99d0447b9da991632e469 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Wed, 12 Nov 2025 10:32:46 -0800 Subject: [PATCH 2/8] respond to code review comments --- .../apache/spark/sql/internal/SQLConf.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d694c7062358..491e77cb1a74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3831,15 +3831,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val PIPE_OPERATOR_ALLOW_AGGREGATE_IN_SELECT = - buildConf("spark.sql.pipeOperator.allowAggregateInSelect") - .doc("When true, aggregate functions can be used in |> SELECT and other pipe operator " + - "clauses without requiring the |> AGGREGATE keyword. When false, aggregate functions " + - "must be used exclusively with the |> AGGREGATE clause for proper aggregation semantics.") - .version("4.1.0") - .booleanConf - .createWithDefault(true) - val TVF_ALLOW_MULTIPLE_TABLE_ARGUMENTS_ENABLED = buildConf("spark.sql.tvf.allowMultipleTableArguments.enabled") .doc("When true, allows multiple table arguments for table-valued functions, " + @@ -6201,6 +6192,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PIPE_OPERATOR_ALLOW_AGGREGATE_IN_SELECT = + buildConf("spark.sql.allowAggregateInSelectWithPipeOperator") + .doc("When true, aggregate functions can be used in |> SELECT and other pipe operator " + + "clauses without requiring the |> AGGREGATE keyword. When false, aggregate functions " + + "must be used exclusively with the |> AGGREGATE clause for proper aggregation semantics.") + .version("4.2.0") + .booleanConf + .createWithDefault(true) + val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation") .internal() .doc("If true, the old bogus percentile_disc calculation is used. The old calculation " + From ded5b4b03cbc173fa8c42ed843f5ec0b10bb468a Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Wed, 12 Nov 2025 10:35:32 -0800 Subject: [PATCH 3/8] respond to code review comments --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 491e77cb1a74..46bd4afe0267 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -6192,7 +6192,7 @@ object SQLConf { .booleanConf .createWithDefault(true) - val PIPE_OPERATOR_ALLOW_AGGREGATE_IN_SELECT = + val OPERATOR_PIPE_ALLOW_AGGREGATE_IN_SELECT = buildConf("spark.sql.allowAggregateInSelectWithPipeOperator") .doc("When true, aggregate functions can be used in |> SELECT and other pipe operator " + "clauses without requiring the |> AGGREGATE keyword. When false, aggregate functions " + @@ -7594,7 +7594,7 @@ class SQLConf extends Serializable with Logging with SqlApiConf { getConf(SQLConf.LEGACY_EXPONENT_LITERAL_AS_DECIMAL_ENABLED) def pipeOperatorAllowAggregateInSelect: Boolean = - getConf(SQLConf.PIPE_OPERATOR_ALLOW_AGGREGATE_IN_SELECT) + getConf(SQLConf.OPERATOR_PIPE_ALLOW_AGGREGATE_IN_SELECT) def allowNegativeScaleOfDecimalEnabled: Boolean = getConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED) From 35294f80258fe47ef4c8fde50ca35ede375b2f83 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Wed, 12 Nov 2025 10:39:45 -0800 Subject: [PATCH 4/8] respond to code review comments --- .../analyzer-results/pipe-operators.sql.out | 19 +++---------------- .../sql-tests/results/pipe-operators.sql.out | 19 ++----------------- 2 files changed, 5 insertions(+), 33 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index 6bc6aa34b9ef..d58ea1aa326b 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -4793,22 +4793,9 @@ SetCommand (spark.sql.pipeOperator.allowAggregateInSelect,Some(false)) table other |> select sum(a) as result -- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(a#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 23, - "stopIndex" : 28, - "fragment" : "sum(a)" - } ] -} +Aggregate [sum(a#x) AS result#xL] ++- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index f0a9848a89fe..365d9e45b243 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -4306,24 +4306,9 @@ spark.sql.pipeOperator.allowAggregateInSelect false table other |> select sum(a) as result -- !query schema -struct<> +struct -- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(a#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 23, - "stopIndex" : 28, - "fragment" : "sum(a)" - } ] -} +4 -- !query From 7d87676bb39525a5feac2901e01f8f913956926c Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 13 Nov 2025 14:35:48 -0800 Subject: [PATCH 5/8] support |> SELECT with GROUP BY and fix |> EXTEND with aggregates --- .../sql/catalyst/parser/SqlBaseParser.g4 | 2 +- .../catalyst/expressions/pipeOperators.scala | 11 +- .../sql/catalyst/parser/AstBuilder.scala | 2 +- .../analyzer-results/pipe-operators.sql.out | 159 ++++++++--------- .../sql-tests/inputs/pipe-operators.sql | 42 ++--- .../sql-tests/results/pipe-operators.sql.out | 166 ++++++++---------- 6 files changed, 179 insertions(+), 203 deletions(-) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 8ccac6a39d2c..c9ca385c78e8 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -1739,7 +1739,7 @@ version ; operatorPipeRightSide - : selectClause windowClause? + : selectClause aggregationClause? windowClause? | EXTEND extendList=namedExpressionSeq | SET operatorPipeSetAssignmentSeq | DROP identifierSeq diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala index 2c9d97e1a9b5..22fe10de6e55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala @@ -81,9 +81,14 @@ case class ValidateAndStripPipeExpressions(allowAggregateInSelect: Boolean) if (p.isAggregate && firstAggregateFunction.isEmpty) { throw QueryCompilationErrors .pipeOperatorAggregateExpressionContainsNoAggregateFunction(p.child) - } else if (!p.isAggregate && !allowAggregateInSelect) { - firstAggregateFunction.foreach { a => - throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, p.clause) + } else if (!p.isAggregate) { + // For non-aggregate clauses, only allow aggregate functions in SELECT when the + // configuration is enabled. All other clauses (EXTEND, SET, etc.) disallow aggregates. + val aggregateAllowed = allowAggregateInSelect && p.clause == PipeOperators.selectClause + if (!aggregateAllowed) { + firstAggregateFunction.foreach { a => + throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, p.clause) + } } } p.child diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 971633b9a46a..ed2f6cdc6b10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -6586,7 +6586,7 @@ class AstBuilder extends DataTypeAstBuilder selectClause = c, lateralView = new java.util.ArrayList[LateralViewContext](), whereClause = null, - aggregationClause = null, + aggregationClause = ctx.aggregationClause, havingClause = null, windowClause = ctx.windowClause, relation = left, diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index d58ea1aa326b..dc569cfaef6c 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -632,75 +632,6 @@ Repartition 3, true +- Relation spark_catalog.default.t[x#x,y#x] csv --- !query -table t -|> select sum(x) as result --- !query analysis -Aggregate [sum(x#x) AS result#xL] -+- SubqueryAlias spark_catalog.default.t - +- Relation spark_catalog.default.t[x#x,y#x] csv - - --- !query -table t -|> select y, length(y) + sum(x) as result --- !query analysis -org.apache.spark.sql.catalyst.ExtendedAnalysisException -{ - "errorClass" : "MISSING_GROUP_BY", - "sqlState" : "42803", - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 12, - "stopIndex" : 49, - "fragment" : "select y, length(y) + sum(x) as result" - } ] -} - - --- !query -from t -|> select sum(x) --- !query analysis -Aggregate [sum(x#x) AS sum(x)#xL] -+- SubqueryAlias spark_catalog.default.t - +- Relation spark_catalog.default.t[x#x,y#x] csv - - --- !query -from t as t_alias -|> select y, sum(x) --- !query analysis -org.apache.spark.sql.catalyst.ExtendedAnalysisException -{ - "errorClass" : "MISSING_GROUP_BY", - "sqlState" : "42803", - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 22, - "stopIndex" : 37, - "fragment" : "select y, sum(x)" - } ] -} - - --- !query -from t as t_alias -|> select y, sum(x) group by y --- !query analysis -org.apache.spark.sql.catalyst.parser.ParseException -{ - "errorClass" : "PARSE_SYNTAX_ERROR", - "sqlState" : "42601", - "messageParameters" : { - "error" : "'group'", - "hint" : "" - } -} - - -- !query table t |> extend 1 as z @@ -834,16 +765,20 @@ Project [x#x, y#x, z#x, (z#x + 1) AS plus_one#x] table t |> extend sum(x) as z -- !query analysis -org.apache.spark.sql.catalyst.ExtendedAnalysisException +org.apache.spark.sql.AnalysisException { - "errorClass" : "MISSING_GROUP_BY", - "sqlState" : "42803", + "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "clause" : "EXTEND", + "expr" : "sum(x#x)" + }, "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 1, - "stopIndex" : 29, - "fragment" : "table t\n|> extend sum(x) as z" + "startIndex" : 19, + "stopIndex" : 24, + "fragment" : "sum(x)" } ] } @@ -4723,13 +4658,61 @@ Project [(total_a#xL * cast(2 as bigint)) AS doubled#xL] table other |> select a, sum(b) as sum_b group by a -- !query analysis -org.apache.spark.sql.catalyst.parser.ParseException +Aggregate [a#x], [a#x, sum(b#x) AS sum_b#xL] ++- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +select 1 as x, 2 as y, 3 as z +|> select x, y, sum(z) as total group by x, y +-- !query analysis +Aggregate [x#x, y#x], [x#x, y#x, sum(z#x) AS total#xL] ++- Project [1 AS x#x, 2 AS y#x, 3 AS z#x] + +- OneRowRelation + + +-- !query +table other +|> select a, sum(b) as sum_b group by 1 +-- !query analysis +Aggregate [a#x], [a#x, sum(b#x) AS sum_b#xL] ++- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +table other +|> select a, sum(b) as sum_b group by a +|> where sum_b > 1 +-- !query analysis +Filter (sum_b#xL > cast(1 as bigint)) ++- PipeOperator + +- Aggregate [a#x], [a#x, sum(b#x) AS sum_b#xL] + +- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +select 1 as x, 2 as y +|> select x + 1 as x_plus_one, sum(y) as sum_y group by x + 1 +-- !query analysis +Aggregate [(x#x + 1)], [(x#x + 1) AS x_plus_one#x, sum(y#x) AS sum_y#xL] ++- Project [1 AS x#x, 2 AS y#x] + +- OneRowRelation + + +-- !query +table other +|> select a, sum(b) as sum_b group by b +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException { - "errorClass" : "PARSE_SYNTAX_ERROR", - "sqlState" : "42601", + "errorClass" : "MISSING_AGGREGATION", + "sqlState" : "42803", "messageParameters" : { - "error" : "'group'", - "hint" : "" + "expression" : "\"a\"", + "expressionAnyValue" : "\"any_value(a)\"" } } @@ -4738,16 +4721,20 @@ org.apache.spark.sql.catalyst.parser.ParseException table other |> extend sum(a) as total_a -- !query analysis -org.apache.spark.sql.catalyst.ExtendedAnalysisException +org.apache.spark.sql.AnalysisException { - "errorClass" : "MISSING_GROUP_BY", - "sqlState" : "42803", + "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "clause" : "EXTEND", + "expr" : "sum(a#x)" + }, "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 1, - "stopIndex" : 39, - "fragment" : "table other\n|> extend sum(a) as total_a" + "startIndex" : 23, + "stopIndex" : 28, + "fragment" : "sum(a)" } ] } diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index 326a4c24eb71..387074ec4a86 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -215,25 +215,6 @@ table t table t |> select /*+ repartition(3) */ all x; --- SELECT operators: negative tests. ---------------------------------------- - --- Aggregate functions are not allowed in the pipe operator SELECT list. -table t -|> select sum(x) as result; - -table t -|> select y, length(y) + sum(x) as result; - -from t -|> select sum(x); - -from t as t_alias -|> select y, sum(x); - -from t as t_alias -|> select y, sum(x) group by y; - -- EXTEND operators: positive tests. ------------------------------------ @@ -1779,7 +1760,28 @@ table other table other |> select a, sum(b) as sum_b group by a; --- Aggregates in EXTEND. +-- Multiple grouping columns in SELECT. +select 1 as x, 2 as y, 3 as z +|> select x, y, sum(z) as total group by x, y; + +-- GROUP BY with ordinal position referring to input column. +table other +|> select a, sum(b) as sum_b group by 1; + +-- Chaining: GROUP BY followed by WHERE on aggregated result. +table other +|> select a, sum(b) as sum_b group by a +|> where sum_b > 1; + +-- GROUP BY with expression and alias. +select 1 as x, 2 as y +|> select x + 1 as x_plus_one, sum(y) as sum_y group by x + 1; + +-- Non-aggregated column without being in GROUP BY should fail. +table other +|> select a, sum(b) as sum_b group by b; + +-- Aggregates in EXTEND. These are not allowed. table other |> extend sum(a) as total_a; diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index 365d9e45b243..e4fefa373442 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -579,81 +579,6 @@ struct 1 --- !query -table t -|> select sum(x) as result --- !query schema -struct --- !query output -1 - - --- !query -table t -|> select y, length(y) + sum(x) as result --- !query schema -struct<> --- !query output -org.apache.spark.sql.catalyst.ExtendedAnalysisException -{ - "errorClass" : "MISSING_GROUP_BY", - "sqlState" : "42803", - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 12, - "stopIndex" : 49, - "fragment" : "select y, length(y) + sum(x) as result" - } ] -} - - --- !query -from t -|> select sum(x) --- !query schema -struct --- !query output -1 - - --- !query -from t as t_alias -|> select y, sum(x) --- !query schema -struct<> --- !query output -org.apache.spark.sql.catalyst.ExtendedAnalysisException -{ - "errorClass" : "MISSING_GROUP_BY", - "sqlState" : "42803", - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 22, - "stopIndex" : 37, - "fragment" : "select y, sum(x)" - } ] -} - - --- !query -from t as t_alias -|> select y, sum(x) group by y --- !query schema -struct<> --- !query output -org.apache.spark.sql.catalyst.parser.ParseException -{ - "errorClass" : "PARSE_SYNTAX_ERROR", - "sqlState" : "42601", - "messageParameters" : { - "error" : "'group'", - "hint" : "" - } -} - - -- !query table t |> extend 1 as z @@ -783,16 +708,20 @@ table t -- !query schema struct<> -- !query output -org.apache.spark.sql.catalyst.ExtendedAnalysisException +org.apache.spark.sql.AnalysisException { - "errorClass" : "MISSING_GROUP_BY", - "sqlState" : "42803", + "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "clause" : "EXTEND", + "expr" : "sum(x#x)" + }, "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 1, - "stopIndex" : 29, - "fragment" : "table t\n|> extend sum(x) as z" + "startIndex" : 19, + "stopIndex" : 24, + "fragment" : "sum(x)" } ] } @@ -4228,15 +4157,64 @@ struct table other |> select a, sum(b) as sum_b group by a -- !query schema +struct +-- !query output +1 3 +2 4 + + +-- !query +select 1 as x, 2 as y, 3 as z +|> select x, y, sum(z) as total group by x, y +-- !query schema +struct +-- !query output +1 2 3 + + +-- !query +table other +|> select a, sum(b) as sum_b group by 1 +-- !query schema +struct +-- !query output +1 3 +2 4 + + +-- !query +table other +|> select a, sum(b) as sum_b group by a +|> where sum_b > 1 +-- !query schema +struct +-- !query output +1 3 +2 4 + + +-- !query +select 1 as x, 2 as y +|> select x + 1 as x_plus_one, sum(y) as sum_y group by x + 1 +-- !query schema +struct +-- !query output +2 2 + + +-- !query +table other +|> select a, sum(b) as sum_b group by b +-- !query schema struct<> -- !query output -org.apache.spark.sql.catalyst.parser.ParseException +org.apache.spark.sql.catalyst.ExtendedAnalysisException { - "errorClass" : "PARSE_SYNTAX_ERROR", - "sqlState" : "42601", + "errorClass" : "MISSING_AGGREGATION", + "sqlState" : "42803", "messageParameters" : { - "error" : "'group'", - "hint" : "" + "expression" : "\"a\"", + "expressionAnyValue" : "\"any_value(a)\"" } } @@ -4247,16 +4225,20 @@ table other -- !query schema struct<> -- !query output -org.apache.spark.sql.catalyst.ExtendedAnalysisException +org.apache.spark.sql.AnalysisException { - "errorClass" : "MISSING_GROUP_BY", - "sqlState" : "42803", + "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "clause" : "EXTEND", + "expr" : "sum(a#x)" + }, "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 1, - "stopIndex" : 39, - "fragment" : "table other\n|> extend sum(a) as total_a" + "startIndex" : 23, + "stopIndex" : 28, + "fragment" : "sum(a)" } ] } From b56ce1767285087f81a1df90c2123d2d2ffde083 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Thu, 13 Nov 2025 15:30:40 -0800 Subject: [PATCH 6/8] fix check for |> SELECT with GROUP BY --- .../sql/catalyst/parser/AstBuilder.scala | 8 ++- .../analyzer-results/pipe-operators.sql.out | 47 +++++++++++++++--- .../sql-tests/inputs/pipe-operators.sql | 8 ++- .../sql-tests/results/pipe-operators.sql.out | 49 ++++++++++++++++--- 4 files changed, 96 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ed2f6cdc6b10..7ca8de59fe2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -6577,10 +6577,16 @@ class AstBuilder extends DataTypeAstBuilder private def visitOperatorPipeRightSide( ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = { - if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) { + if (!conf.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) { operationNotAllowed("Operator pipe SQL syntax using |>", ctx) } Option(ctx.selectClause).map { c => + // Check if GROUP BY is used with pipe SELECT when the config is disabled + if (ctx.aggregationClause != null && !conf.pipeOperatorAllowAggregateInSelect) { + operationNotAllowed( + "|> SELECT with a GROUP BY clause is not allowed when " + + "spark.sql.allowAggregateInSelectWithPipeOperator is disabled", ctx) + } withSelectQuerySpecification( ctx = ctx, selectClause = c, diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index dc569cfaef6c..60e69b5c0d8e 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -4771,18 +4771,51 @@ Aggregate [sum(a#x) AS total_a#xL] -- !query -set spark.sql.pipeOperator.allowAggregateInSelect=false +set spark.sql.allowAggregateInSelectWithPipeOperator=false -- !query analysis -SetCommand (spark.sql.pipeOperator.allowAggregateInSelect,Some(false)) +SetCommand (spark.sql.allowAggregateInSelectWithPipeOperator,Some(false)) -- !query table other |> select sum(a) as result -- !query analysis -Aggregate [sum(a#x) AS result#xL] -+- SubqueryAlias spark_catalog.default.other - +- Relation spark_catalog.default.other[a#x,b#x] json +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "clause" : "SELECT", + "expr" : "sum(a#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 23, + "stopIndex" : 28, + "fragment" : "sum(a)" + } ] +} + + +-- !query +table other +|> select a, sum(b) as sum_b group by a +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_0035", + "messageParameters" : { + "message" : "|> SELECT with a GROUP BY clause is not allowed when spark.sql.allowAggregateInSelectWithPipeOperator is disabled" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 51, + "fragment" : "table other\n|> select a, sum(b) as sum_b group by a" + } ] +} -- !query @@ -4795,9 +4828,9 @@ Aggregate [sum(a#x) AS total_a#xL] -- !query -set spark.sql.pipeOperator.allowAggregateInSelect=true +set spark.sql.allowAggregateInSelectWithPipeOperator=true -- !query analysis -SetCommand (spark.sql.pipeOperator.allowAggregateInSelect,Some(true)) +SetCommand (spark.sql.allowAggregateInSelectWithPipeOperator,Some(true)) -- !query diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index 387074ec4a86..0a2478a219cc 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -1794,18 +1794,22 @@ table other |> aggregate sum(a) as total_a; -- Disable the configuration to test the legacy behavior. -set spark.sql.pipeOperator.allowAggregateInSelect=false; +set spark.sql.allowAggregateInSelectWithPipeOperator=false; -- With configuration disabled, aggregates in SELECT now fail (requires |> AGGREGATE). table other |> select sum(a) as result; +-- With configuration disabled, GROUP BY in SELECT also fails. +table other +|> select a, sum(b) as sum_b group by a; + -- The |> AGGREGATE keyword is required when configuration is disabled. table other |> aggregate sum(a) as total_a; -- Re-enable the configuration. -set spark.sql.pipeOperator.allowAggregateInSelect=true; +set spark.sql.allowAggregateInSelectWithPipeOperator=true; -- Verify that aggregates in SELECT work again. table other diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index e4fefa373442..768611b67463 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -4277,20 +4277,57 @@ struct -- !query -set spark.sql.pipeOperator.allowAggregateInSelect=false +set spark.sql.allowAggregateInSelectWithPipeOperator=false -- !query schema struct -- !query output -spark.sql.pipeOperator.allowAggregateInSelect false +spark.sql.allowAggregateInSelectWithPipeOperator false -- !query table other |> select sum(a) as result -- !query schema -struct +struct<> -- !query output -4 +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "clause" : "SELECT", + "expr" : "sum(a#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 23, + "stopIndex" : 28, + "fragment" : "sum(a)" + } ] +} + + +-- !query +table other +|> select a, sum(b) as sum_b group by a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_0035", + "messageParameters" : { + "message" : "|> SELECT with a GROUP BY clause is not allowed when spark.sql.allowAggregateInSelectWithPipeOperator is disabled" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 51, + "fragment" : "table other\n|> select a, sum(b) as sum_b group by a" + } ] +} -- !query @@ -4303,11 +4340,11 @@ struct -- !query -set spark.sql.pipeOperator.allowAggregateInSelect=true +set spark.sql.allowAggregateInSelectWithPipeOperator=true -- !query schema struct -- !query output -spark.sql.pipeOperator.allowAggregateInSelect true +spark.sql.allowAggregateInSelectWithPipeOperator true -- !query From 188f77890717a7fe07a4d386911a6a0009ba365e Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Fri, 14 Nov 2025 10:41:52 -0800 Subject: [PATCH 7/8] remove the config per discussion --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../catalyst/expressions/pipeOperators.scala | 12 +-- .../sql/catalyst/parser/AstBuilder.scala | 6 -- .../analyzer-results/pipe-operators.sql.out | 72 ----------------- .../sql-tests/inputs/pipe-operators.sql | 40 +++------- .../sql-tests/results/pipe-operators.sql.out | 80 ------------------- 6 files changed, 15 insertions(+), 197 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4250f701b6d3..f9e14cb0daf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -463,7 +463,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor ResolveProcedures :: BindProcedures :: ResolveTableSpec :: - ValidateAndStripPipeExpressions(conf.pipeOperatorAllowAggregateInSelect) :: + ValidateAndStripPipeExpressions :: ResolveSQLFunctions :: ResolveSQLTableFunctions :: ResolveAliases :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala index 22fe10de6e55..b2bb949c9e5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala @@ -64,12 +64,8 @@ object EliminatePipeOperators extends Rule[LogicalPlan] { /** * Validates and strips PipeExpression nodes from a logical plan once the child expressions are * resolved. - * @param allowAggregateInSelect When true, aggregate functions are allowed in non-AGGREGATE - * pipe operator clauses. When false, aggregate functions must be - * used exclusively with the AGGREGATE clause. */ -case class ValidateAndStripPipeExpressions(allowAggregateInSelect: Boolean) - extends Rule[LogicalPlan] { +case object ValidateAndStripPipeExpressions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsPattern(PIPE_EXPRESSION), ruleId) { case node: LogicalPlan => @@ -82,9 +78,9 @@ case class ValidateAndStripPipeExpressions(allowAggregateInSelect: Boolean) throw QueryCompilationErrors .pipeOperatorAggregateExpressionContainsNoAggregateFunction(p.child) } else if (!p.isAggregate) { - // For non-aggregate clauses, only allow aggregate functions in SELECT when the - // configuration is enabled. All other clauses (EXTEND, SET, etc.) disallow aggregates. - val aggregateAllowed = allowAggregateInSelect && p.clause == PipeOperators.selectClause + // For non-aggregate clauses, only allow aggregate functions in SELECT. + // All other clauses (EXTEND, SET, etc.) disallow aggregates. + val aggregateAllowed = p.clause == PipeOperators.selectClause if (!aggregateAllowed) { firstAggregateFunction.foreach { a => throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, p.clause) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 925ab385139b..abc282e9c488 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -6609,12 +6609,6 @@ class AstBuilder extends DataTypeAstBuilder operationNotAllowed("Operator pipe SQL syntax using |>", ctx) } Option(ctx.selectClause).map { c => - // Check if GROUP BY is used with pipe SELECT when the config is disabled - if (ctx.aggregationClause != null && !conf.pipeOperatorAllowAggregateInSelect) { - operationNotAllowed( - "|> SELECT with a GROUP BY clause is not allowed when " + - "spark.sql.allowAggregateInSelectWithPipeOperator is disabled", ctx) - } withSelectQuerySpecification( ctx = ctx, selectClause = c, diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index 60e69b5c0d8e..52f81703075d 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -4770,78 +4770,6 @@ Aggregate [sum(a#x) AS total_a#xL] +- Relation spark_catalog.default.other[a#x,b#x] json --- !query -set spark.sql.allowAggregateInSelectWithPipeOperator=false --- !query analysis -SetCommand (spark.sql.allowAggregateInSelectWithPipeOperator,Some(false)) - - --- !query -table other -|> select sum(a) as result --- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(a#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 23, - "stopIndex" : 28, - "fragment" : "sum(a)" - } ] -} - - --- !query -table other -|> select a, sum(b) as sum_b group by a --- !query analysis -org.apache.spark.sql.catalyst.parser.ParseException -{ - "errorClass" : "_LEGACY_ERROR_TEMP_0035", - "messageParameters" : { - "message" : "|> SELECT with a GROUP BY clause is not allowed when spark.sql.allowAggregateInSelectWithPipeOperator is disabled" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 1, - "stopIndex" : 51, - "fragment" : "table other\n|> select a, sum(b) as sum_b group by a" - } ] -} - - --- !query -table other -|> aggregate sum(a) as total_a --- !query analysis -Aggregate [sum(a#x) AS total_a#xL] -+- SubqueryAlias spark_catalog.default.other - +- Relation spark_catalog.default.other[a#x,b#x] json - - --- !query -set spark.sql.allowAggregateInSelectWithPipeOperator=true --- !query analysis -SetCommand (spark.sql.allowAggregateInSelectWithPipeOperator,Some(true)) - - --- !query -table other -|> select sum(a) as result --- !query analysis -Aggregate [sum(a#x) AS result#xL] -+- SubqueryAlias spark_catalog.default.other - +- Relation spark_catalog.default.other[a#x,b#x] json - - -- !query drop table t -- !query analysis diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index 0a2478a219cc..24e8da720348 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -1733,12 +1733,11 @@ table web_v1 |> order by item_sk, d_date |> limit 100; --- Configuration test: allowing aggregates in SELECT with spark.sql.pipeOperator.allowAggregateInSelect. -------------------------------------------------------------------------------------------------------- --- Test the configuration that controls whether aggregate functions can be used in |> SELECT --- and other pipe operator clauses without requiring the |> AGGREGATE keyword. +-- Aggregates in SELECT: positive tests. +------------------------------------------ +-- Aggregate functions can be used in |> SELECT without requiring the |> AGGREGATE keyword. --- Verify that the default behavior (enabled) allows aggregates in SELECT. +-- Aggregates in SELECT. table other |> select sum(a) as result; @@ -1781,40 +1780,21 @@ select 1 as x, 2 as y table other |> select a, sum(b) as sum_b group by b; --- Aggregates in EXTEND. These are not allowed. +-- Aggregates in SELECT: negative tests. +------------------------------------------ + +-- Aggregates in EXTEND are not allowed. table other |> extend sum(a) as total_a; --- Aggregates in WHERE should still fail (aggregates not allowed in WHERE generally). +-- Aggregates in WHERE are not allowed. table other |> where sum(a) > 5; --- The |> AGGREGATE keyword should still work with the configuration enabled. +-- The |> AGGREGATE keyword also works for aggregation. table other |> aggregate sum(a) as total_a; --- Disable the configuration to test the legacy behavior. -set spark.sql.allowAggregateInSelectWithPipeOperator=false; - --- With configuration disabled, aggregates in SELECT now fail (requires |> AGGREGATE). -table other -|> select sum(a) as result; - --- With configuration disabled, GROUP BY in SELECT also fails. -table other -|> select a, sum(b) as sum_b group by a; - --- The |> AGGREGATE keyword is required when configuration is disabled. -table other -|> aggregate sum(a) as total_a; - --- Re-enable the configuration. -set spark.sql.allowAggregateInSelectWithPipeOperator=true; - --- Verify that aggregates in SELECT work again. -table other -|> select sum(a) as result; - -- Cleanup. ----------- drop table t; diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index 768611b67463..8ec7253816b6 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -4276,86 +4276,6 @@ struct 4 --- !query -set spark.sql.allowAggregateInSelectWithPipeOperator=false --- !query schema -struct --- !query output -spark.sql.allowAggregateInSelectWithPipeOperator false - - --- !query -table other -|> select sum(a) as result --- !query schema -struct<> --- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION", - "sqlState" : "0A000", - "messageParameters" : { - "clause" : "SELECT", - "expr" : "sum(a#x)" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 23, - "stopIndex" : 28, - "fragment" : "sum(a)" - } ] -} - - --- !query -table other -|> select a, sum(b) as sum_b group by a --- !query schema -struct<> --- !query output -org.apache.spark.sql.catalyst.parser.ParseException -{ - "errorClass" : "_LEGACY_ERROR_TEMP_0035", - "messageParameters" : { - "message" : "|> SELECT with a GROUP BY clause is not allowed when spark.sql.allowAggregateInSelectWithPipeOperator is disabled" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 1, - "stopIndex" : 51, - "fragment" : "table other\n|> select a, sum(b) as sum_b group by a" - } ] -} - - --- !query -table other -|> aggregate sum(a) as total_a --- !query schema -struct --- !query output -4 - - --- !query -set spark.sql.allowAggregateInSelectWithPipeOperator=true --- !query schema -struct --- !query output -spark.sql.allowAggregateInSelectWithPipeOperator true - - --- !query -table other -|> select sum(a) as result --- !query schema -struct --- !query output -4 - - -- !query drop table t -- !query schema From 30ce9b777e807b0ee8e8a3f23fad2d5bed1dcb9d Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Fri, 14 Nov 2025 11:26:05 -0800 Subject: [PATCH 8/8] remove unused config --- .../org/apache/spark/sql/internal/SQLConf.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 181dd9dbcbb2..f3951d307918 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -6248,15 +6248,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val OPERATOR_PIPE_ALLOW_AGGREGATE_IN_SELECT = - buildConf("spark.sql.allowAggregateInSelectWithPipeOperator") - .doc("When true, aggregate functions can be used in |> SELECT and other pipe operator " + - "clauses without requiring the |> AGGREGATE keyword. When false, aggregate functions " + - "must be used exclusively with the |> AGGREGATE clause for proper aggregation semantics.") - .version("4.2.0") - .booleanConf - .createWithDefault(true) - val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation") .internal() .doc("If true, the old bogus percentile_disc calculation is used. The old calculation " + @@ -7655,9 +7646,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def singleCharacterPipeOperatorEnabled: Boolean = getConf(SQLConf.SINGLE_CHARACTER_PIPE_OPERATOR_ENABLED) - def pipeOperatorAllowAggregateInSelect: Boolean = - getConf(SQLConf.OPERATOR_PIPE_ALLOW_AGGREGATE_IN_SELECT) - def allowNegativeScaleOfDecimalEnabled: Boolean = getConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED)