diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java index aae47aa963201..f12408fb49313 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java @@ -135,27 +135,57 @@ public static byte[] subStringSQL(byte[] bytes, int pos, int len) { return Arrays.copyOfRange(bytes, start, end); } + /** + * Concatenate multiple byte arrays into one. + * If one of the inputs is null then null will be returned. + * + * @param inputs byte arrays to concatenate + * @return the concatenated byte array or null if one of the arguments is null + */ public static byte[] concat(byte[]... inputs) { + return concatWS(EMPTY_BYTE, inputs); + } + + /** + * Concatenate multiple byte arrays with a given delimiter. + * If the delimiter or one of the inputs is null then null will be returned. + * + * @param delimiter byte array to be placed between each input + * @param inputs byte arrays to concatenate + * @return the concatenated byte array or null if one of the arguments is null + */ + public static byte[] concatWS(byte[] delimiter, byte[]... inputs) { + if (delimiter == null) { + return null; + } // Compute the total length of the result long totalLength = 0; for (byte[] input : inputs) { if (input != null) { - totalLength += input.length; + totalLength += input.length + delimiter.length; } else { return null; } } - + if (totalLength > 0) totalLength -= delimiter.length; // Allocate a new byte array, and copy the inputs one by one into it final byte[] result = new byte[Ints.checkedCast(totalLength)]; int offset = 0; - for (byte[] input : inputs) { + for (int i = 0; i < inputs.length; i++) { + byte[] input = inputs[i]; int len = input.length; Platform.copyMemory( input, Platform.BYTE_ARRAY_OFFSET, result, Platform.BYTE_ARRAY_OFFSET + offset, len); offset += len; + if (delimiter.length > 0 && i < inputs.length - 1) { + Platform.copyMemory( + delimiter, Platform.BYTE_ARRAY_OFFSET, + result, Platform.BYTE_ARRAY_OFFSET + offset, + delimiter.length); + offset += delimiter.length; + } } return result; } diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java index aff619175ff7b..5e221b4e359d4 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java @@ -67,4 +67,59 @@ public void testCompareBinary() { byte[] y4 = new byte[]{(byte) 100, (byte) 200}; Assertions.assertEquals(0, ByteArray.compareBinary(x4, y4)); } + + @Test + public void testConcat() { + byte[] x1 = new byte[]{(byte) 1, (byte) 2, (byte) 3}; + byte[] y1 = new byte[]{(byte) 4, (byte) 5, (byte) 6}; + byte[] result1 = ByteArray.concat(x1, y1); + byte[] expected1 = new byte[]{(byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5, (byte) 6}; + Assertions.assertArrayEquals(expected1, result1); + + byte[] x2 = new byte[]{(byte) 1, (byte) 2, (byte) 3}; + byte[] y2 = new byte[0]; + byte[] result2 = ByteArray.concat(x2, y2); + byte[] expected2 = new byte[]{(byte) 1, (byte) 2, (byte) 3}; + Assertions.assertArrayEquals(expected2, result2); + + byte[] x3 = new byte[0]; + byte[] y3 = new byte[]{(byte) 4, (byte) 5, (byte) 6}; + byte[] result3 = ByteArray.concat(x3, y3); + byte[] expected3 = new byte[]{(byte) 4, (byte) 5, (byte) 6}; + Assertions.assertArrayEquals(expected3, result3); + + byte[] x4 = new byte[]{(byte) 1, (byte) 2, (byte) 3}; + byte[] y4 = null; + byte[] result4 = ByteArray.concat(x4, y4); + Assertions.assertArrayEquals(null, result4); + } + + @Test + public void testConcatWS() { + byte[] separator = new byte[]{(byte) 42}; + + byte[] x1 = new byte[]{(byte) 1, (byte) 2, (byte) 3}; + byte[] y1 = new byte[]{(byte) 4, (byte) 5, (byte) 6}; + byte[] result1 = ByteArray.concatWS(separator, x1, y1); + byte[] expected1 = new byte[]{(byte) 1, (byte) 2, (byte) 3, (byte) 42, + (byte) 4, (byte) 5, (byte) 6}; + Assertions.assertArrayEquals(expected1, result1); + + byte[] x2 = new byte[]{(byte) 1, (byte) 2, (byte) 3}; + byte[] y2 = new byte[0]; + byte[] result2 = ByteArray.concatWS(separator, x2, y2); + byte[] expected2 = new byte[]{(byte) 1, (byte) 2, (byte) 3, (byte) 42}; + Assertions.assertArrayEquals(expected2, result2); + + byte[] x3 = new byte[0]; + byte[] y3 = new byte[]{(byte) 4, (byte) 5, (byte) 6}; + byte[] result3 = ByteArray.concatWS(separator, x3, y3); + byte[] expected3 = new byte[]{(byte) 42, (byte) 4, (byte) 5, (byte) 6}; + Assertions.assertArrayEquals(expected3, result3); + + byte[] x4 = new byte[]{(byte) 1, (byte) 2, (byte) 3}; + byte[] y4 = null; + byte[] result4 = ByteArray.concatWS(separator, x4, y4); + Assertions.assertArrayEquals(null, result4); + } } diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 58718cb99f551..9d0f04ed2e17e 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2627,29 +2627,6 @@ ], "sqlState" : "22006" }, - "INVALID_INVERSE_DISTRIBUTION_FUNCTION" : { - "message" : [ - "Invalid inverse distribution function ." - ], - "subClass" : { - "DISTINCT_UNSUPPORTED" : { - "message" : [ - "Cannot use DISTINCT with WITHIN GROUP." - ] - }, - "WITHIN_GROUP_MISSING" : { - "message" : [ - "WITHIN GROUP is required for inverse distribution function." - ] - }, - "WRONG_NUM_ORDERINGS" : { - "message" : [ - "Requires orderings in WITHIN GROUP but got ." - ] - } - }, - "sqlState" : "42K0K" - }, "INVALID_JAVA_IDENTIFIER_AS_FIELD_NAME" : { "message" : [ " is not a valid identifier of Java and cannot be used as field name", @@ -3364,6 +3341,34 @@ ], "sqlState" : "42601" }, + "INVALID_WITHIN_GROUP_EXPRESSION" : { + "message" : [ + "Invalid function with WITHIN GROUP." + ], + "subClass" : { + "DISTINCT_UNSUPPORTED" : { + "message" : [ + "The function does not support DISTINCT with WITHIN GROUP." + ] + }, + "MISMATCH_WITH_DISTINCT_INPUT" : { + "message" : [ + "The function is invoked with DISTINCT and WITHIN GROUP but expressions and do not match. The WITHIN GROUP ordering expression must be picked from the function inputs." + ] + }, + "WITHIN_GROUP_MISSING" : { + "message" : [ + "WITHIN GROUP is required for the function." + ] + }, + "WRONG_NUM_ORDERINGS" : { + "message" : [ + "The function requires orderings in WITHIN GROUP but got ." + ] + } + }, + "sqlState" : "42K0K" + }, "INVALID_WRITER_COMMIT_MESSAGE" : { "message" : [ "The data source writer has generated an invalid number of commit messages. Expected exactly one writer commit message from each task, but received ." diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index cf8f685ea4499..6c7ce80072923 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -83,7 +83,13 @@ def test_function_parity(self): missing_in_py = jvm_fn_set.difference(py_fn_set) # Functions that we expect to be missing in python until they are added to pyspark - expected_missing_in_py = set() + expected_missing_in_py = { + # TODO(SPARK-50220): listagg functions will soon be added and removed from this list + "listagg_distinct", + "listagg", + "string_agg", + "string_agg_distinct", + } self.assertEqual( expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected" diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index 2a04212ee2585..9f509fa843a2b 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -1147,6 +1147,77 @@ object functions { */ def sum_distinct(e: Column): Column = Column.fn("sum", isDistinct = true, e) + /** + * Aggregate function: returns the concatenation of non-null input values. + * + * @group agg_funcs + * @since 4.0.0 + */ + def listagg(e: Column): Column = Column.fn("listagg", e) + + /** + * Aggregate function: returns the concatenation of non-null input values, separated by the + * delimiter. + * + * @group agg_funcs + * @since 4.0.0 + */ + def listagg(e: Column, delimiter: Column): Column = Column.fn("listagg", e, delimiter) + + /** + * Aggregate function: returns the concatenation of distinct non-null input values. + * + * @group agg_funcs + * @since 4.0.0 + */ + def listagg_distinct(e: Column): Column = Column.fn("listagg", isDistinct = true, e) + + /** + * Aggregate function: returns the concatenation of distinct non-null input values, separated by + * the delimiter. + * + * @group agg_funcs + * @since 4.0.0 + */ + def listagg_distinct(e: Column, delimiter: Column): Column = + Column.fn("listagg", isDistinct = true, e, delimiter) + + /** + * Aggregate function: returns the concatenation of non-null input values. Alias for `listagg`. + * + * @group agg_funcs + * @since 4.0.0 + */ + def string_agg(e: Column): Column = Column.fn("string_agg", e) + + /** + * Aggregate function: returns the concatenation of non-null input values, separated by the + * delimiter. Alias for `listagg`. + * + * @group agg_funcs + * @since 4.0.0 + */ + def string_agg(e: Column, delimiter: Column): Column = Column.fn("string_agg", e, delimiter) + + /** + * Aggregate function: returns the concatenation of distinct non-null input values. Alias for + * `listagg`. + * + * @group agg_funcs + * @since 4.0.0 + */ + def string_agg_distinct(e: Column): Column = Column.fn("string_agg", isDistinct = true, e) + + /** + * Aggregate function: returns the concatenation of distinct non-null input values, separated by + * the delimiter. Alias for `listagg`. + * + * @group agg_funcs + * @since 4.0.0 + */ + def string_agg_distinct(e: Column, delimiter: Column): Column = + Column.fn("string_agg", isDistinct = true, e, delimiter) + /** * Aggregate function: alias for `var_samp`. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bed7bea61597f..81c7c75f47c61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2782,6 +2782,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor ne case e: Expression if e.foldable => e // No need to create an attribute reference if it will be evaluated as a Literal. + case e: SortOrder => + // For SortOder just recursively extract the from child expression. + e.copy(child = extractExpr(e.child)) case e: NamedArgumentExpression => // For NamedArgumentExpression, we extract the value and replace it with // an AttributeReference (with an internal column name, e.g. "_w0"). diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 586a0312e1507..4506e5bdaf2dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Median, PercentileCont, PercentileDisc} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, ListAgg, Median, PercentileCont, PercentileDisc} import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery, InlineCTE} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -423,10 +423,23 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB "funcName" -> toSQLExpr(wf), "windowExpr" -> toSQLExpr(w))) + case agg @ AggregateExpression(listAgg: ListAgg, _, _, _, _) + if agg.isDistinct && listAgg.needSaveOrderValue => + throw QueryCompilationErrors.functionAndOrderExpressionMismatchError( + listAgg.prettyName, listAgg.child, listAgg.orderExpressions) + case w: WindowExpression => // Only allow window functions with an aggregate expression or an offset window // function or a Pandas window UDF. w.windowFunction match { + case agg @ AggregateExpression(fun: ListAgg, _, _, _, _) + // listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY ...) is unsupported + if fun.orderingFilled && (w.windowSpec.orderSpec.nonEmpty || + w.windowSpec.frameSpecification != + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) => + agg.failAnalysis( + errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction))) case agg @ AggregateExpression( _: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _) if w.windowSpec.orderSpec.nonEmpty || w.windowSpec.frameSpecification != diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 5103f8048856a..d9e9f49ce065e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -506,6 +506,8 @@ object FunctionRegistry { expression[CollectList]("collect_list"), expression[CollectList]("array_agg", true, Some("3.3.0")), expression[CollectSet]("collect_set"), + expression[ListAgg]("listagg"), + expression[ListAgg]("string_agg", setAlias = true), expressionBuilder("count_min_sketch", CountMinSketchAggExpressionBuilder), expression[BoolAnd]("every", true), expression[BoolAnd]("bool_and"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala index 5a27a72190325..800126e0030e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala @@ -128,18 +128,15 @@ class FunctionResolution( numArgs: Int, u: UnresolvedFunction): Expression = { func match { - case owg: SupportsOrderingWithinGroup if u.isDistinct => - throw QueryCompilationErrors.distinctInverseDistributionFunctionUnsupportedError( - owg.prettyName - ) + case owg: SupportsOrderingWithinGroup if !owg.isDistinctSupported && u.isDistinct => + throw QueryCompilationErrors.distinctWithOrderingFunctionUnsupportedError(owg.prettyName) case owg: SupportsOrderingWithinGroup - if !owg.orderingFilled && u.orderingWithinGroup.isEmpty => - throw QueryCompilationErrors.inverseDistributionFunctionMissingWithinGroupError( - owg.prettyName - ) + if owg.isOrderingMandatory && !owg.orderingFilled && u.orderingWithinGroup.isEmpty => + throw QueryCompilationErrors.functionMissingWithinGroupError(owg.prettyName) case owg: SupportsOrderingWithinGroup if owg.orderingFilled && u.orderingWithinGroup.nonEmpty => - throw QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError( + // e.g mode(expr1) within group (order by expr2) is not supported + throw QueryCompilationErrors.wrongNumOrderingsForFunctionError( owg.prettyName, 0, u.orderingWithinGroup.length @@ -198,7 +195,7 @@ class FunctionResolution( case agg: AggregateFunction => // Note: PythonUDAF does not support these advanced clauses. if (agg.isInstanceOf[PythonUDAF]) checkUnsupportedAggregateClause(agg, u) - // After parse, the inverse distribution functions not set the ordering within group yet. + // After parse, the functions not set the ordering within group yet. val newAgg = agg match { case owg: SupportsOrderingWithinGroup if !owg.orderingFilled && u.orderingWithinGroup.nonEmpty => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index 97add0b8e45bc..f3eeaa96b3d46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -183,6 +183,8 @@ case class Mode( } override def orderingFilled: Boolean = child != UnresolvedWithinGroup + override def isOrderingMandatory: Boolean = true + override def isDistinctSupported: Boolean = false assert(orderingFilled || (!orderingFilled && reverseOpt.isEmpty)) @@ -190,7 +192,7 @@ case class Mode( child match { case UnresolvedWithinGroup => if (orderingWithinGroup.length != 1) { - throw QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError( + throw QueryCompilationErrors.wrongNumOrderingsForFunctionError( nodeName, 1, orderingWithinGroup.length) } orderingWithinGroup.head match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala index 9c0502a2c1fcf..453251ac61cde 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala @@ -20,9 +20,26 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.catalyst.expressions.SortOrder /** - * The trait used to set the [[SortOrder]] after inverse distribution functions parsed. + * The trait used to set the [[SortOrder]] for supporting functions. */ trait SupportsOrderingWithinGroup { self: AggregateFunction => - def orderingFilled: Boolean = false def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): AggregateFunction + + /** Indicator that ordering was set. */ + def orderingFilled: Boolean + + /** + * Tells Analyzer that WITHIN GROUP (ORDER BY ...) is mandatory for function. + * + * @see [[QueryCompilationErrors.functionMissingWithinGroupError]] + */ + def isOrderingMandatory: Boolean + + /** + * Tells Analyzer that DISTINCT is supported. + * The DISTINCT can conflict with order so some functions can ban it. + * + * @see [[QueryCompilationErrors.functionMissingWithinGroupError]] + */ + def isDistinctSupported: Boolean } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 3aaf353043a9a..7789c23b50a48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -18,16 +18,22 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import scala.collection.mutable -import scala.collection.mutable.Growable +import scala.collection.mutable.{ArrayBuffer, Growable} +import scala.util.{Left, Right} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, TypeUtils, UnsafeRowUtils} +import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} +import org.apache.spark.sql.errors.DataTypeErrors.{toSQLId, toSQLType} +import org.apache.spark.sql.internal.types.StringTypeWithCollation import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.{ByteArray, UTF8String} import org.apache.spark.util.BoundedPriorityQueue /** @@ -36,8 +42,7 @@ import org.apache.spark.util.BoundedPriorityQueue * We have to store all the collected elements in memory, and so notice that too many elements * can cause GC paused and eventually OutOfMemory Errors. */ -abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] - with UnaryLike[Expression] { +abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] { val child: Expression @@ -102,7 +107,8 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper case class CollectList( child: Expression, mutableAggBufferOffset: Int = 0, - inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] { + inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with UnaryLike[Expression] { def this(child: Expression) = this(child, 0, 0) @@ -149,7 +155,7 @@ case class CollectSet( child: Expression, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0) - extends Collect[mutable.HashSet[Any]] with QueryErrorsBase { + extends Collect[mutable.HashSet[Any]] with QueryErrorsBase with UnaryLike[Expression] { def this(child: Expression) = this(child, 0, 0) @@ -215,7 +221,8 @@ case class CollectTopK( num: Int, reverse: Boolean = false, mutableAggBufferOffset: Int = 0, - inputAggBufferOffset: Int = 0) extends Collect[BoundedPriorityQueue[Any]] { + inputAggBufferOffset: Int = 0) extends Collect[BoundedPriorityQueue[Any]] + with UnaryLike[Expression] { assert(num > 0) def this(child: Expression, num: Int) = this(child, num, false, 0, 0) @@ -265,3 +272,280 @@ private[aggregate] object CollectTopK { case _ => throw QueryCompilationErrors.invalidNumParameter(e) } } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(expr[, delimiter])[ WITHIN GROUP (ORDER BY key [ASC | DESC] [,...])] - Returns + the concatenation of non-null input values, separated by the delimiter ordered by key. + If all values are null, null is returned. + """, + arguments = """ + Arguments: + * expr - a string or binary expression to be concatenated. + * delimiter - an optional string or binary foldable expression used to separate the input values. + If null, the concatenation will be performed without a delimiter. Default is null. + * key - an optional expression for ordering the input values. Multiple keys can be specified. + If none are specified, the order of the rows in the result is non-deterministic. + """, + examples = """ + Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + abc + > SELECT _FUNC_(col) WITHIN GROUP (ORDER BY col DESC) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + cba + > SELECT _FUNC_(col) FROM VALUES ('a'), (NULL), ('b') AS tab(col); + ab + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + aa + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + ab + > SELECT _FUNC_(col, ', ') FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a, b, c + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + note = """ + * If the order is not specified, the function is non-deterministic because + the order of the rows may be non-deterministic after a shuffle. + * If DISTINCT is specified, then expr and key must be the same expression. + """, + group = "agg_funcs", + since = "4.0.0" +) +// scalastyle:on line.size.limit +case class ListAgg( + child: Expression, + delimiter: Expression = Literal(null), + orderExpressions: Seq[SortOrder] = Nil, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) + extends Collect[mutable.ArrayBuffer[Any]] + with SupportsOrderingWithinGroup + with ImplicitCastInputTypes { + + override def orderingFilled: Boolean = orderExpressions.nonEmpty + + override def isOrderingMandatory: Boolean = false + + override def isDistinctSupported: Boolean = true + + override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): AggregateFunction = + copy(orderExpressions = orderingWithinGroup) + + override protected lazy val bufferElementType: DataType = { + if (!needSaveOrderValue) { + child.dataType + } else { + StructType( + StructField("value", child.dataType) + +: orderValuesField + ) + } + } + /** Indicates that the result of [[child]] is not enough for evaluation */ + lazy val needSaveOrderValue: Boolean = !isOrderCompatible(orderExpressions) + + def this(child: Expression) = + this(child, Literal(null), Nil, 0, 0) + + def this(child: Expression, delimiter: Expression) = + this(child, delimiter, Nil, 0, 0) + + override def nullable: Boolean = true + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = + copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = + copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def defaultResult: Option[Literal] = Option(Literal.create(null, dataType)) + + override def sql(isDistinct: Boolean): String = { + val distinct = if (isDistinct) "DISTINCT " else "" + val withinGroup = if (orderingFilled) { + s" WITHIN GROUP (ORDER BY ${orderExpressions.map(_.sql).mkString(", ")})" + } else { + "" + } + s"$prettyName($distinct${child.sql}, ${delimiter.sql})$withinGroup" + } + + override def inputTypes: Seq[AbstractDataType] = + TypeCollection( + StringTypeWithCollation(supportsTrimCollation = true), + BinaryType + ) +: + TypeCollection( + StringTypeWithCollation(supportsTrimCollation = true), + BinaryType, + NullType + ) +: + orderExpressions.map(_ => AnyDataType) + + override def checkInputDataTypes(): TypeCheckResult = { + val matchInputTypes = super.checkInputDataTypes() + if (matchInputTypes.isFailure) { + matchInputTypes + } else if (!delimiter.foldable) { + DataTypeMismatch( + errorSubClass = "NON_FOLDABLE_INPUT", + messageParameters = Map( + "inputName" -> toSQLId("delimiter"), + "inputType" -> toSQLType(delimiter.dataType), + "inputExpr" -> toSQLExpr(delimiter) + ) + ) + } else if (delimiter.dataType == NullType) { + // null is the default empty delimiter so type is not important + TypeCheckSuccess + } else { + TypeUtils.checkForSameTypeInputExpr(child.dataType :: delimiter.dataType :: Nil, prettyName) + } + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { + if (buffer.nonEmpty) { + val sortedBufferWithoutNulls = sortBuffer(buffer) + concatSkippingNulls(sortedBufferWithoutNulls) + } else { + null + } + } + + /** + * Sort buffer according orderExpressions. + * If orderExpressions is empty then returns buffer as is. + * The format of buffer is determined by [[needSaveOrderValue]] + * @return sorted buffer containing only child's values + */ + private[this] def sortBuffer(buffer: mutable.ArrayBuffer[Any]): mutable.ArrayBuffer[Any] = { + if (!orderingFilled) { + // without order return as is. + return buffer + } + if (!needSaveOrderValue) { + // Here the buffer has structure [childValue0, childValue1, ...] + // and we want to sort it by childValues + val sortOrderExpression = orderExpressions.head + val ascendingOrdering = PhysicalDataType.ordering(sortOrderExpression.dataType) + val ordering = + if (sortOrderExpression.direction == Ascending) ascendingOrdering + else ascendingOrdering.reverse + buffer.sorted(ordering) + } else { + // Here the buffer has structure + // [[childValue, orderValue0, orderValue1, ...], + // [childValue, orderValue0, orderValue1, ...], + // ...] + // and we want to sort it by tuples (orderValue0, orderValue1, ...) + buffer + .asInstanceOf[mutable.ArrayBuffer[InternalRow]] + .sorted(bufferOrdering) + // drop orderValues after sort + .map(_.get(0, child.dataType)) + } + } + + /** + * @return ordering by (orderValue0, orderValue1, ...) + * for InternalRow with format [childValue, orderValue0, orderValue1, ...] + */ + private[this] def bufferOrdering: Ordering[InternalRow] = { + val bufferSortOrder = orderExpressions.zipWithIndex.map { + case (originalOrder, i) => + originalOrder.copy( + // first value is the evaluated child so add +1 for order's values + child = BoundReference(i + 1, originalOrder.dataType, originalOrder.child.nullable) + ) + } + new InterpretedOrdering(bufferSortOrder) + } + + private[this] def concatSkippingNulls(buffer: mutable.ArrayBuffer[Any]): Any = { + getDelimiterValue match { + case Right(delimiterValue: Array[Byte]) => + val inputs = buffer.filter(_ != null).map(_.asInstanceOf[Array[Byte]]) + ByteArray.concatWS(delimiterValue, inputs.toSeq: _*) + case Left(delimiterValue: UTF8String) => + val inputs = buffer.filter(_ != null).map(_.asInstanceOf[UTF8String]) + UTF8String.concatWs(delimiterValue, inputs.toSeq: _*) + } + } + + /** + * @return delimiter value or default empty value if delimiter is null. Type respects [[dataType]] + */ + private[this] def getDelimiterValue: Either[UTF8String, Array[Byte]] = { + val delimiterValue = delimiter.eval() + dataType match { + case _: StringType => + Left( + if (delimiterValue == null) UTF8String.fromString("") + else delimiterValue.asInstanceOf[UTF8String] + ) + case _: BinaryType => + Right( + if (delimiterValue == null) ByteArray.EMPTY_BYTE + else delimiterValue.asInstanceOf[Array[Byte]] + ) + } + } + + override def dataType: DataType = child.dataType + + override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { + val value = child.eval(input) + if (value != null) { + val v = if (!needSaveOrderValue) { + convertToBufferElement(value) + } else { + InternalRow.fromSeq(convertToBufferElement(value) +: evalOrderValues(input)) + } + buffer += v + } + buffer + } + + private[this] def evalOrderValues(internalRow: InternalRow): Seq[Any] = { + orderExpressions.map(order => convertToBufferElement(order.child.eval(internalRow))) + } + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + + override def children: Seq[Expression] = child +: delimiter +: orderExpressions + + /** + * Utility func to check if given order is defined and different from [[child]]. + * + * @see [[QueryCompilationErrors.functionAndOrderExpressionMismatchError]] + * @see [[needSaveOrderValue]] + */ + private[this] def isOrderCompatible(someOrder: Seq[SortOrder]): Boolean = { + if (someOrder.isEmpty) { + return true + } + if (someOrder.size == 1 && someOrder.head.child.semanticEquals(child)) { + return true + } + false + } + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = + copy( + child = newChildren.head, + delimiter = newChildren(1), + orderExpressions = newChildren + .drop(2) + .map(_.asInstanceOf[SortOrder]) + ) + + private[this] def orderValuesField: Seq[StructField] = { + orderExpressions.zipWithIndex.map { + case (order, i) => StructField(s"sortOrderValue[$i]", order.dataType) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala index 89a6984b80852..6dfa1b499df23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala @@ -378,7 +378,7 @@ case class PercentileCont(left: Expression, right: Expression, reverse: Boolean override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): AggregateFunction = { if (orderingWithinGroup.length != 1) { - throw QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError( + throw QueryCompilationErrors.wrongNumOrderingsForFunctionError( nodeName, 1, orderingWithinGroup.length) } orderingWithinGroup.head match { @@ -390,6 +390,10 @@ case class PercentileCont(left: Expression, right: Expression, reverse: Boolean override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): PercentileCont = this.copy(left = newLeft, right = newRight) + + override def orderingFilled: Boolean = left != UnresolvedWithinGroup + override def isOrderingMandatory: Boolean = true + override def isDistinctSupported: Boolean = false } /** @@ -432,7 +436,7 @@ case class PercentileDisc( override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): AggregateFunction = { if (orderingWithinGroup.length != 1) { - throw QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError( + throw QueryCompilationErrors.wrongNumOrderingsForFunctionError( nodeName, 1, orderingWithinGroup.length) } orderingWithinGroup.head match { @@ -467,6 +471,10 @@ case class PercentileDisc( toDoubleValue(higherKey) } } + + override def orderingFilled: Boolean = left != UnresolvedWithinGroup + override def isOrderingMandatory: Boolean = true + override def isDistinctSupported: Boolean = false } // scalastyle:off line.size.limit diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index b628412929e37..4c970d066d31e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentif import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateMap, CreateStruct, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateMap, CreateStruct, Expression, GroupingID, NamedExpression, SortOrder, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition} import org.apache.spark.sql.catalyst.expressions.aggregate.AnyValue import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Assignment, InputParameter, Join, LogicalPlan, SerdeInfo, Window} @@ -725,28 +725,32 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "windowExpr" -> toSQLExpr(windowExpr))) } - def distinctInverseDistributionFunctionUnsupportedError(funcName: String): Throwable = { + def distinctWithOrderingFunctionUnsupportedError(funcName: String): Throwable = { new AnalysisException( - errorClass = "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED", - messageParameters = Map("funcName" -> toSQLId(funcName))) + errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED", + messageParameters = Map("funcName" -> toSQLId(funcName)) + ) } - def inverseDistributionFunctionMissingWithinGroupError(funcName: String): Throwable = { + def functionMissingWithinGroupError(funcName: String): Throwable = { new AnalysisException( - errorClass = "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING", - messageParameters = Map("funcName" -> toSQLId(funcName))) + errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING", + messageParameters = Map("funcName" -> toSQLId(funcName)) + ) } - def wrongNumOrderingsForInverseDistributionFunctionError( + def wrongNumOrderingsForFunctionError( funcName: String, validOrderingsNumber: Int, actualOrderingsNumber: Int): Throwable = { new AnalysisException( - errorClass = "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS", + errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS", messageParameters = Map( "funcName" -> toSQLId(funcName), "expectedNum" -> validOrderingsNumber.toString, - "actualNum" -> actualOrderingsNumber.toString)) + "actualNum" -> actualOrderingsNumber.toString + ) + ) } def aliasNumberNotMatchColumnNumberError( @@ -1049,6 +1053,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "operation" -> operation)) } + def functionAndOrderExpressionMismatchError( + functionName: String, + functionArg: Expression, + orderExpr: Seq[SortOrder]): Throwable = { + new AnalysisException( + errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT", + messageParameters = Map( + "funcName" -> toSQLId(functionName), + "funcArg" -> toSQLExpr(functionArg), + "orderingExpr" -> orderExpr.map(order => toSQLExpr(order.child)).mkString(", "))) + } + def wrongCommandForObjectTypeError( operation: String, requiredType: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 22082aca81a22..e4b89b4f4de88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -607,7 +607,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // [COUNT(DISTINCT bar), COUNT(DISTINCT foo)] is disallowed because those two distinct // aggregates have different column expressions. val distinctExpressions = - functionsWithDistinct.head.aggregateFunction.children.filterNot(_.foldable) + functionsWithDistinct.head.aggregateFunction.children + .filterNot(_.foldable) + .map { + case s: SortOrder => s.child + case e => e + } val normalizedNamedDistinctExpressions = distinctExpressions.map { e => // Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here // because `distinctExpressions` is not extracted during logical phase. diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index c54e09735a9be..39cefdaa892b2 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -425,6 +425,8 @@ | org.apache.spark.sql.catalyst.expressions.aggregate.Kurtosis | kurtosis | SELECT kurtosis(col) FROM VALUES (-10), (-20), (100), (1000) AS tab(col) | struct | | org.apache.spark.sql.catalyst.expressions.aggregate.Last | last | SELECT last(col) FROM VALUES (10), (5), (20) AS tab(col) | struct | | org.apache.spark.sql.catalyst.expressions.aggregate.Last | last_value | SELECT last_value(col) FROM VALUES (10), (5), (20) AS tab(col) | struct | +| org.apache.spark.sql.catalyst.expressions.aggregate.ListAgg | listagg | SELECT listagg(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col) | struct | +| org.apache.spark.sql.catalyst.expressions.aggregate.ListAgg | string_agg | SELECT string_agg(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col) | struct | | org.apache.spark.sql.catalyst.expressions.aggregate.Max | max | SELECT max(col) FROM VALUES (10), (50), (20) AS tab(col) | struct | | org.apache.spark.sql.catalyst.expressions.aggregate.MaxBy | max_by | SELECT max_by(x, y) FROM VALUES ('a', 10), ('b', 50), ('c', 20) AS tab(x, y) | struct | | org.apache.spark.sql.catalyst.expressions.aggregate.Median | median | SELECT median(col) FROM VALUES (0), (10) AS tab(col) | struct | diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/listagg-collations.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg-collations.sql.out new file mode 100644 index 0000000000000..ca471858a5416 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg-collations.sql.out @@ -0,0 +1,86 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1) +-- !query analysis +Aggregate [listagg(c1#x, null, collate(c1#x, utf8_binary) ASC NULLS FIRST, 0, 0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_binary) ASC NULLS FIRST)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1) +-- !query analysis +Aggregate [listagg(c1#x, null, collate(c1#x, utf8_lcase) ASC NULLS FIRST, 0, 0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_lcase) ASC NULLS FIRST)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1) +-- !query analysis +Aggregate [listagg(distinct collate(c1#x, utf8_binary), null, 0, 0) AS listagg(DISTINCT collate(c1, utf8_binary), NULL)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1) +-- !query analysis +Aggregate [listagg(distinct collate(c1#x, utf8_lcase), null, 0, 0) AS listagg(DISTINCT collate(c1, utf8_lcase), NULL)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('B'), ('b'), ('A')) AS t(c1) +-- !query analysis +Aggregate [listagg(distinct collate(c1#x, utf8_lcase), null, collate(c1#x, utf8_lcase) ASC NULLS FIRST, 0, 0) AS listagg(DISTINCT collate(c1, utf8_lcase), NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_lcase) ASC NULLS FIRST)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE unicode_rtrim) FROM (VALUES ('abc '), ('abc '), ('x'), ('abc')) AS t(c1) +-- !query analysis +Aggregate [listagg(distinct collate(c1#x, unicode_rtrim), null, 0, 0) AS listagg(DISTINCT collate(c1, unicode_rtrim), NULL)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1) FROM (VALUES ('abc '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1) +-- !query analysis +Aggregate [listagg(c1#x, null, c1#x ASC NULLS FIRST, 0, 0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY c1 ASC NULLS FIRST)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE unicode_rtrim) FROM (VALUES ('abc '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1) +-- !query analysis +Aggregate [listagg(c1#x, null, collate(c1#x, unicode_rtrim) ASC NULLS FIRST, 0, 0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, unicode_rtrim) ASC NULLS FIRST)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('b'), ('A'), ('B')) AS t(c1) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT", + "sqlState" : "42K0K", + "messageParameters" : { + "funcArg" : "\"collate(c1, utf8_lcase)\"", + "funcName" : "`listagg`", + "orderingExpr" : "\"collate(c1, utf8_binary)\"" + } +} diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/listagg.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg.sql.out new file mode 100644 index 0000000000000..71eb3f8ca76b3 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg.sql.out @@ -0,0 +1,435 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b) +-- !query analysis +CreateViewCommand `df`, SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b), false, false, LocalTempView, UNSUPPORTED, true + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b) +-- !query analysis +CreateViewCommand `df2`, SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b), false, false, LocalTempView, UNSUPPORTED, true + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(b) FROM df GROUP BY a +-- !query analysis +Aggregate [a#x], [listagg(b#x, null, 0, 0) AS listagg(b, NULL)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT string_agg(b) FROM df GROUP BY a +-- !query analysis +Aggregate [a#x], [string_agg(b#x, null, 0, 0) AS string_agg(b, NULL)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(b, NULL) FROM df GROUP BY a +-- !query analysis +Aggregate [a#x], [listagg(b#x, null, 0, 0) AS listagg(b, NULL)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(b) FROM df WHERE 1 != 1 +-- !query analysis +Aggregate [listagg(b#x, null, 0, 0) AS listagg(b, NULL)#x] ++- Filter NOT (1 = 1) + +- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(b, '|') FROM df GROUP BY a +-- !query analysis +Aggregate [a#x], [listagg(b#x, |, 0, 0) AS listagg(b, |)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a) FROM df +-- !query analysis +Aggregate [listagg(a#x, null, 0, 0) AS listagg(a, NULL)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(DISTINCT a) FROM df +-- !query analysis +Aggregate [listagg(distinct a#x, null, 0, 0) AS listagg(DISTINCT a, NULL)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df +-- !query analysis +Aggregate [listagg(a#x, null, a#x ASC NULLS FIRST, 0, 0) AS listagg(a, NULL) WITHIN GROUP (ORDER BY a ASC NULLS FIRST)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df +-- !query analysis +Aggregate [listagg(a#x, null, a#x DESC NULLS LAST, 0, 0) AS listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df +-- !query analysis +Project [listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER (PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x] ++- Project [a#x, b#x, listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER (PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER (PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x] + +- Window [listagg(a#x, null, a#x DESC NULLS LAST, 0, 0) windowspecdefinition(b#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER (PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x], [b#x] + +- Project [a#x, b#x] + +- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df +-- !query analysis +Aggregate [listagg(a#x, null, b#x ASC NULLS FIRST, 0, 0) AS listagg(a, NULL) WITHIN GROUP (ORDER BY b ASC NULLS FIRST)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query analysis +Aggregate [listagg(a#x, null, b#x DESC NULLS LAST, 0, 0) AS listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query analysis +Aggregate [listagg(a#x, |, b#x DESC NULLS LAST, 0, 0) AS listagg(a, |) WITHIN GROUP (ORDER BY b DESC NULLS LAST)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df +-- !query analysis +Aggregate [listagg(a#x, null, b#x DESC NULLS LAST, a#x ASC NULLS FIRST, 0, 0) AS listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a ASC NULLS FIRST)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df +-- !query analysis +Aggregate [listagg(a#x, null, b#x DESC NULLS LAST, a#x DESC NULLS LAST, 0, 0) AS listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a DESC NULLS LAST)#x] ++- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query analysis +Aggregate [listagg(c1#x, null, 0, 0) AS listagg(c1, NULL)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query analysis +Aggregate [listagg(c1#x, null, 0, 0) AS listagg(c1, NULL)#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query analysis +Aggregate [listagg(c1#x, 0x42, 0, 0) AS listagg(c1, X'42')#x] ++- SubqueryAlias t + +- Project [col1#x AS c1#x] + +- LocalRelation [col1#x] + + +-- !query +SELECT listagg(a), listagg(b, ',') FROM df2 +-- !query analysis +Aggregate [listagg(cast(a#x as string), null, 0, 0) AS listagg(a, NULL)#x, listagg(cast(b#x as string), ,, 0, 0) AS listagg(b, ,)#x] ++- SubqueryAlias df2 + +- View (`df2`, [a#x, b#x]) + +- Project [cast(a#x as int) AS a#x, cast(b#x as boolean) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(c1) FROM (VALUES (ARRAY('a', 'b'))) AS t(c1) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"c1\"", + "inputType" : "\"ARRAY\"", + "paramIndex" : "first", + "requiredType" : "(\"STRING\" or \"BINARY\")", + "sqlExpr" : "\"listagg(c1, NULL)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 18, + "fragment" : "listagg(c1)" + } ] +} + + +-- !query +SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + "sqlState" : "42K09", + "messageParameters" : { + "dataType" : "(\"BINARY\" or \"STRING\")", + "functionName" : "`listagg`", + "sqlExpr" : "\"listagg(c1, , )\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 24, + "fragment" : "listagg(c1, ', ')" + } ] +} + + +-- !query +SELECT listagg(b, a) FROM df GROUP BY a +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + "sqlState" : "42K09", + "messageParameters" : { + "inputExpr" : "\"a\"", + "inputName" : "`delimiter`", + "inputType" : "\"STRING\"", + "sqlExpr" : "\"listagg(b, a)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 20, + "fragment" : "listagg(b, a)" + } ] +} + + +-- !query +SELECT listagg(a) OVER (ORDER BY a) FROM df +-- !query analysis +Project [listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x] ++- Project [a#x, listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x, listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x] + +- Window [listagg(a#x, null, 0, 0) windowspecdefinition(a#x ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x], [a#x ASC NULLS FIRST] + +- Project [a#x] + +- SubqueryAlias df + +- View (`df`, [a#x, b#x]) + +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias t + +- Project [col1#x AS a#x, col2#x AS b#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + "sqlState" : "42601", + "messageParameters" : { + "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 61, + "fragment" : "listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)" + } ] +} + + +-- !query +SELECT string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + "sqlState" : "42601", + "messageParameters" : { + "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 64, + "fragment" : "string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)" + } ] +} + + +-- !query +SELECT listagg(DISTINCT a) OVER (ORDER BY a) FROM df +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED", + "sqlState" : "0A000", + "messageParameters" : { + "windowExpr" : "\"listagg(DISTINCT a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 44, + "fragment" : "listagg(DISTINCT a) OVER (ORDER BY a)" + } ] +} + + +-- !query +SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY b) FROM df +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT", + "sqlState" : "42K0K", + "messageParameters" : { + "funcArg" : "\"a\"", + "funcName" : "`listagg`", + "orderingExpr" : "\"b\"" + } +} + + +-- !query +SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY a, b) FROM df +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT", + "sqlState" : "42K0K", + "messageParameters" : { + "funcArg" : "\"a\"", + "funcName" : "`listagg`", + "orderingExpr" : "\"a\", \"b\"" + } +} diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out index d6ecbc72a7178..8028c344140f5 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out @@ -74,7 +74,7 @@ SELECT department, mode(DISTINCT salary) FROM basic_pays GROUP BY department ORD -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`mode`" @@ -379,7 +379,7 @@ FROM basic_pays -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`mode`" @@ -401,7 +401,7 @@ FROM basic_pays -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`mode`" @@ -423,7 +423,7 @@ FROM basic_pays -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS", "sqlState" : "42K0K", "messageParameters" : { "actualNum" : "1", diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out index 4a31cff8c7d0f..eb8102afa47ef 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out @@ -248,7 +248,7 @@ FROM aggr -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`percentile_cont`" @@ -270,7 +270,7 @@ FROM aggr -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`percentile_cont`" @@ -342,7 +342,7 @@ FROM aggr -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`percentile_cont`" @@ -364,7 +364,7 @@ FROM aggr -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`percentile_cont`" @@ -386,7 +386,7 @@ FROM aggr -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS", "sqlState" : "42K0K", "messageParameters" : { "actualNum" : "2", diff --git a/sql/core/src/test/resources/sql-tests/inputs/listagg-collations.sql b/sql/core/src/test/resources/sql-tests/inputs/listagg-collations.sql new file mode 100644 index 0000000000000..35f86183c37b3 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/listagg-collations.sql @@ -0,0 +1,12 @@ +-- Test cases with collations +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1); +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1); +SELECT listagg(DISTINCT c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1); +SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1); +SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('B'), ('b'), ('A')) AS t(c1); +SELECT listagg(DISTINCT c1 COLLATE unicode_rtrim) FROM (VALUES ('abc '), ('abc '), ('x'), ('abc')) AS t(c1); +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1) FROM (VALUES ('abc '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1); +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE unicode_rtrim) FROM (VALUES ('abc '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1); + +-- Error case with collations +SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('b'), ('A'), ('B')) AS t(c1); \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/inputs/listagg.sql b/sql/core/src/test/resources/sql-tests/inputs/listagg.sql new file mode 100644 index 0000000000000..15c8cfa823e9b --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/listagg.sql @@ -0,0 +1,38 @@ +-- Create temporary views +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b); + +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b); + +-- Test cases for listagg function +SELECT listagg(b) FROM df GROUP BY a; +SELECT string_agg(b) FROM df GROUP BY a; +SELECT listagg(b, NULL) FROM df GROUP BY a; +SELECT listagg(b) FROM df WHERE 1 != 1; +SELECT listagg(b, '|') FROM df GROUP BY a; +SELECT listagg(a) FROM df; +SELECT listagg(DISTINCT a) FROM df; +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df; +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df; +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df; +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df; +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df; +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df; +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df; +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df; +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1); +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1); +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1); +SELECT listagg(a), listagg(b, ',') FROM df2; + +-- Error cases +SELECT listagg(c1) FROM (VALUES (ARRAY('a', 'b'))) AS t(c1); +SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1); +SELECT listagg(b, a) FROM df GROUP BY a; +SELECT listagg(a) OVER (ORDER BY a) FROM df; +SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df; +SELECT string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df; +SELECT listagg(DISTINCT a) OVER (ORDER BY a) FROM df; +SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY b) FROM df; +SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY a, b) FROM df; \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/listagg-collations.sql.out b/sql/core/src/test/resources/sql-tests/results/listagg-collations.sql.out new file mode 100644 index 0000000000000..cf3bac04f09ca --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/listagg-collations.sql.out @@ -0,0 +1,82 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1) +-- !query schema +struct +-- !query output +ABab + + +-- !query +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1) +-- !query schema +struct +-- !query output +aAbB + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1) +-- !query schema +struct +-- !query output +aAbB + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'), ('b'), ('B')) AS t(c1) +-- !query schema +struct +-- !query output +ab + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('B'), ('b'), ('A')) AS t(c1) +-- !query schema +struct +-- !query output +aB + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE unicode_rtrim) FROM (VALUES ('abc '), ('abc '), ('x'), ('abc')) AS t(c1) +-- !query schema +struct +-- !query output +abc x + + +-- !query +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1) FROM (VALUES ('abc '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1) +-- !query schema +struct +-- !query output +abcabc +abc abc x + + +-- !query +SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE unicode_rtrim) FROM (VALUES ('abc '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1) +-- !query schema +struct +-- !query output +abc abc abcabc +x + + +-- !query +SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('b'), ('A'), ('B')) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT", + "sqlState" : "42K0K", + "messageParameters" : { + "funcArg" : "\"collate(c1, utf8_lcase)\"", + "funcName" : "`listagg`", + "orderingExpr" : "\"collate(c1, utf8_binary)\"" + } +} diff --git a/sql/core/src/test/resources/sql-tests/results/listagg.sql.out b/sql/core/src/test/resources/sql-tests/results/listagg.sql.out new file mode 100644 index 0000000000000..ef580704992ce --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/listagg.sql.out @@ -0,0 +1,368 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT listagg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT string_agg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b, NULL) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b) FROM df WHERE 1 != 1 +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT listagg(b, '|') FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +b|c +c|d + + +-- !query +SELECT listagg(a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(DISTINCT a) FROM df +-- !query schema +struct +-- !query output +ab + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df +-- !query schema +struct +-- !query output +NULL +a +b +ba +ba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +b|a|b|a + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭB�� + + +-- !query +SELECT listagg(a), listagg(b, ',') FROM df2 +-- !query schema +struct +-- !query output +123 true,false,false + + +-- !query +SELECT listagg(c1) FROM (VALUES (ARRAY('a', 'b'))) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"c1\"", + "inputType" : "\"ARRAY\"", + "paramIndex" : "first", + "requiredType" : "(\"STRING\" or \"BINARY\")", + "sqlExpr" : "\"listagg(c1, NULL)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 18, + "fragment" : "listagg(c1)" + } ] +} + + +-- !query +SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + "sqlState" : "42K09", + "messageParameters" : { + "dataType" : "(\"BINARY\" or \"STRING\")", + "functionName" : "`listagg`", + "sqlExpr" : "\"listagg(c1, , )\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 24, + "fragment" : "listagg(c1, ', ')" + } ] +} + + +-- !query +SELECT listagg(b, a) FROM df GROUP BY a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + "sqlState" : "42K09", + "messageParameters" : { + "inputExpr" : "\"a\"", + "inputName" : "`delimiter`", + "inputType" : "\"STRING\"", + "sqlExpr" : "\"listagg(b, a)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 20, + "fragment" : "listagg(b, a)" + } ] +} + + +-- !query +SELECT listagg(a) OVER (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +NULL +aa +aa +aabb +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + "sqlState" : "42601", + "messageParameters" : { + "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 61, + "fragment" : "listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)" + } ] +} + + +-- !query +SELECT string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + "sqlState" : "42601", + "messageParameters" : { + "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 64, + "fragment" : "string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)" + } ] +} + + +-- !query +SELECT listagg(DISTINCT a) OVER (ORDER BY a) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED", + "sqlState" : "0A000", + "messageParameters" : { + "windowExpr" : "\"listagg(DISTINCT a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 44, + "fragment" : "listagg(DISTINCT a) OVER (ORDER BY a)" + } ] +} + + +-- !query +SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY b) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT", + "sqlState" : "42K0K", + "messageParameters" : { + "funcArg" : "\"a\"", + "funcName" : "`listagg`", + "orderingExpr" : "\"b\"" + } +} + + +-- !query +SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY a, b) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT", + "sqlState" : "42K0K", + "messageParameters" : { + "funcArg" : "\"a\"", + "funcName" : "`listagg`", + "orderingExpr" : "\"a\", \"b\"" + } +} diff --git a/sql/core/src/test/resources/sql-tests/results/mode.sql.out b/sql/core/src/test/resources/sql-tests/results/mode.sql.out index ad7d59eeb1634..70f253066d4f9 100644 --- a/sql/core/src/test/resources/sql-tests/results/mode.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/mode.sql.out @@ -51,7 +51,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`mode`" @@ -373,7 +373,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`mode`" @@ -397,7 +397,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`mode`" @@ -421,7 +421,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS", "sqlState" : "42K0K", "messageParameters" : { "actualNum" : "1", diff --git a/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out b/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out index cd95eee186e12..55aaa8ee7378e 100644 --- a/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out @@ -222,7 +222,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`percentile_cont`" @@ -246,7 +246,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`percentile_cont`" @@ -324,7 +324,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`percentile_cont`" @@ -348,7 +348,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING", "sqlState" : "42K0K", "messageParameters" : { "funcName" : "`percentile_cont`" @@ -372,7 +372,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS", + "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS", "sqlState" : "42K0K", "messageParameters" : { "actualNum" : "2", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 6348e5f315395..ad80dc65926bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -621,6 +621,41 @@ class DataFrameAggregateSuite extends QueryTest ) } + test("listagg function") { + // normal case + val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") + checkAnswer( + df.selectExpr("listagg(a)", "listagg(b)"), + Seq(Row("abc", "bcd")) + ) + checkAnswer( + df.select(listagg($"a"), listagg($"b")), + Seq(Row("abc", "bcd")) + ) + + // distinct case + val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b") + checkAnswer( + df2.select(listagg_distinct($"a"), listagg_distinct($"b")), + Seq(Row("ab", "bd")) + ) + + // null case + val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, null)).toDF("a", "b", "c") + checkAnswer( + df3.select(listagg_distinct($"a"), listagg($"a"), listagg_distinct($"b"), listagg($"b"), + listagg($"c")), + Seq(Row("a", "aa", "b", "bb", null)) + ) + + // custom delimiter + val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b") + checkAnswer( + df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"), + Seq(Row("a|b|c", "b|c|d")) + ) + } + test("SPARK-31500: collect_set() of BinaryType returns duplicate elements") { val bytesTest1 = "test1".getBytes val bytesTest2 = "test2".getBytes diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 975a82e26f4eb..4494057b1eefe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -73,7 +73,9 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { "sum_distinct", // equivalent to sum(distinct foo) "typedLit", "typedlit", // Scala only "udaf", "udf", // create function statement in sql - "call_function" // moot in SQL as you just call the function directly + "call_function", // moot in SQL as you just call the function directly + "listagg_distinct", // equivalent to listagg(distinct foo) + "string_agg_distinct" // equivalent to string_agg(distinct foo) ) val excludedSqlFunctions = Set.empty[String] diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 662f43fc00399..283454ad273ed 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -104,6 +104,7 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServ "timestampNTZ/datetime-special-ansi.sql", // SPARK-47264 "collations.sql", + "listagg-collations.sql", "pipe-operators.sql", // VARIANT type "variant/named-function-arguments.sql"