Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1793,7 +1793,7 @@ version
;

operatorPipeRightSide
: selectClause windowClause?
: selectClause aggregationClause? windowClause?
| EXTEND extendList=namedExpressionSeq
| SET operatorPipeSetAssignmentSeq
| DROP identifierSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object EliminatePipeOperators extends Rule[LogicalPlan] {
* Validates and strips PipeExpression nodes from a logical plan once the child expressions are
* resolved.
*/
object ValidateAndStripPipeExpressions extends Rule[LogicalPlan] {
case object ValidateAndStripPipeExpressions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(PIPE_EXPRESSION), ruleId) {
case node: LogicalPlan =>
Expand All @@ -78,8 +78,13 @@ object ValidateAndStripPipeExpressions extends Rule[LogicalPlan] {
throw QueryCompilationErrors
.pipeOperatorAggregateExpressionContainsNoAggregateFunction(p.child)
} else if (!p.isAggregate) {
firstAggregateFunction.foreach { a =>
throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, p.clause)
// For non-aggregate clauses, only allow aggregate functions in SELECT.
// All other clauses (EXTEND, SET, etc.) disallow aggregates.
val aggregateAllowed = p.clause == PipeOperators.selectClause
if (!aggregateAllowed) {
firstAggregateFunction.foreach { a =>
throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, p.clause)
}
}
}
p.child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6605,7 +6605,7 @@ class AstBuilder extends DataTypeAstBuilder

private def visitOperatorPipeRightSide(
ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = {
if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) {
if (!conf.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) {
operationNotAllowed("Operator pipe SQL syntax using |>", ctx)
}
Option(ctx.selectClause).map { c =>
Expand All @@ -6614,7 +6614,7 @@ class AstBuilder extends DataTypeAstBuilder
selectClause = c,
lateralView = new java.util.ArrayList[LateralViewContext](),
whereClause = null,
aggregationClause = null,
aggregationClause = ctx.aggregationClause,
havingClause = null,
windowClause = ctx.windowClause,
relation = left,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,109 +632,6 @@ Repartition 3, true
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select sum(x) as result
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
"clause" : "SELECT",
"expr" : "sum(x#x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 19,
"stopIndex" : 24,
"fragment" : "sum(x)"
} ]
}


-- !query
table t
|> select y, length(y) + sum(x) as result
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
"clause" : "SELECT",
"expr" : "sum(x#x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 34,
"stopIndex" : 39,
"fragment" : "sum(x)"
} ]
}


-- !query
from t
|> select sum(x)
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
"clause" : "SELECT",
"expr" : "sum(x#x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 18,
"stopIndex" : 23,
"fragment" : "sum(x)"
} ]
}


-- !query
from t as t_alias
|> select y, sum(x)
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
"clause" : "SELECT",
"expr" : "sum(x#x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 32,
"stopIndex" : 37,
"fragment" : "sum(x)"
} ]
}


-- !query
from t as t_alias
|> select y, sum(x) group by y
-- !query analysis
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "PARSE_SYNTAX_ERROR",
"sqlState" : "42601",
"messageParameters" : {
"error" : "'group'",
"hint" : ""
}
}


-- !query
table t
|> extend 1 as z
Expand Down Expand Up @@ -3683,28 +3580,6 @@ org.apache.spark.sql.AnalysisException
}


-- !query
table other
|> select sum(a) as result
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
"clause" : "SELECT",
"expr" : "sum(a#x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 23,
"stopIndex" : 28,
"fragment" : "sum(a)"
} ]
}


-- !query
table other
|> aggregate
Expand Down Expand Up @@ -4947,6 +4822,163 @@ Project [x#x, y#x]
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table other
|> select sum(a) as result
-- !query analysis
Aggregate [sum(a#x) AS result#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json


-- !query
table other
|> select sum(a) as total_a, avg(b) as avg_b
-- !query analysis
Aggregate [sum(a#x) AS total_a#xL, avg(b#x) AS avg_b#x]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json


-- !query
table other
|> where b > 1
|> select sum(a) as result
-- !query analysis
Aggregate [sum(a#x) AS result#xL]
+- Filter (b#x > 1)
+- PipeOperator
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json


-- !query
table other
|> select sum(a) as total_a
|> select total_a * 2 as doubled
-- !query analysis
Project [(total_a#xL * cast(2 as bigint)) AS doubled#xL]
+- Aggregate [sum(a#x) AS total_a#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json


-- !query
table other
|> select a, sum(b) as sum_b group by a
-- !query analysis
Aggregate [a#x], [a#x, sum(b#x) AS sum_b#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json


-- !query
select 1 as x, 2 as y, 3 as z
|> select x, y, sum(z) as total group by x, y
-- !query analysis
Aggregate [x#x, y#x], [x#x, y#x, sum(z#x) AS total#xL]
+- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
+- OneRowRelation


-- !query
table other
|> select a, sum(b) as sum_b group by 1
-- !query analysis
Aggregate [a#x], [a#x, sum(b#x) AS sum_b#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json


-- !query
table other
|> select a, sum(b) as sum_b group by a
|> where sum_b > 1
-- !query analysis
Filter (sum_b#xL > cast(1 as bigint))
+- PipeOperator
+- Aggregate [a#x], [a#x, sum(b#x) AS sum_b#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json


-- !query
select 1 as x, 2 as y
|> select x + 1 as x_plus_one, sum(y) as sum_y group by x + 1
-- !query analysis
Aggregate [(x#x + 1)], [(x#x + 1) AS x_plus_one#x, sum(y#x) AS sum_y#xL]
+- Project [1 AS x#x, 2 AS y#x]
+- OneRowRelation


-- !query
table other
|> select a, sum(b) as sum_b group by b
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "MISSING_AGGREGATION",
"sqlState" : "42803",
"messageParameters" : {
"expression" : "\"a\"",
"expressionAnyValue" : "\"any_value(a)\""
}
}


-- !query
table other
|> extend sum(a) as total_a
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
"clause" : "EXTEND",
"expr" : "sum(a#x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 23,
"stopIndex" : 28,
"fragment" : "sum(a)"
} ]
}


-- !query
table other
|> where sum(a) > 5
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "INVALID_WHERE_CONDITION",
"sqlState" : "42903",
"messageParameters" : {
"condition" : "\"(sum(a) > 5)\"",
"expressionList" : "sum(spark_catalog.default.other.a)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 31,
"fragment" : "table other\n|> where sum(a) > 5"
} ]
}


-- !query
table other
|> aggregate sum(a) as total_a
-- !query analysis
Aggregate [sum(a#x) AS total_a#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json


-- !query
drop table t
-- !query analysis
Expand Down
Loading