Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
"CANNOT_UP_CAST_DATATYPE" : {
"message" : [ "Cannot up cast %s from %s to %s.\n%s" ]
},
"CANNOT_USE_MIXTURE" : {
"message" : [ "Cannot use a mixture of aggregate function and group aggregate pandas UDF" ]
},
"CAST_CAUSES_OVERFLOW" : {
"message" : [ "Casting %s to %s causes overflow. To return NULL instead, use 'try_cast'. If necessary set %s to false to bypass this error." ],
"sqlState" : "22005"
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/tests/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ def test_udf_not_supported_in_join_condition(self):

def runWithJoinType(join_type, type_string):
with self.assertRaisesRegex(
AnalysisException, "Using PythonUDF.*%s is not supported." % type_string
AnalysisException,
"Using PythonUDF in join condition of join type %s is not supported" % type_string,
):
left.join(right, [f("a", "b"), left.a1 == right.b1], join_type).collect()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ object QueryCompilationErrors {
}

def pandasUDFAggregateNotSupportedInPivotError(): Throwable = {
new AnalysisException("Pandas UDF aggregate expressions are currently not supported in pivot.")
Copy link
Contributor Author

@itholic itholic Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message is changed from "Pandas UDF aggregate expressions are currently not supported in pivot." to "Pivot does not support Pandas UDF aggregate expressions." as suggested in Error Message Guidelines.

new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE",
messageParameters = Array("Pandas UDF aggregate expressions don't support pivot."))
}

def aggregateExpressionRequiredForPivotError(sql: String): Throwable = {
Expand Down Expand Up @@ -1330,18 +1332,15 @@ object QueryCompilationErrors {
s"Expected: ${dataType.typeName}; Found: ${expression.dataType.typeName}")
}

def groupAggPandasUDFUnsupportedByStreamingAggError(): Throwable = {
new AnalysisException("Streaming aggregation doesn't support group aggregate pandas UDF")
}

def streamJoinStreamWithoutEqualityPredicateUnsupportedError(plan: LogicalPlan): Throwable = {
new AnalysisException(
"Stream-stream join without equality predicate is not supported", plan = Some(plan))
}

def cannotUseMixtureOfAggFunctionAndGroupAggPandasUDFError(): Throwable = {
new AnalysisException(
"Cannot use a mixture of aggregate function and group aggregate pandas UDF")
errorClass = "CANNOT_USE_MIXTURE",
messageParameters = Array.empty)
}

def ambiguousAttributesInSelfJoinError(
Expand Down Expand Up @@ -1570,8 +1569,10 @@ object QueryCompilationErrors {
}

def usePythonUDFInJoinConditionUnsupportedError(joinType: JoinType): Throwable = {
new AnalysisException("Using PythonUDF in join condition of join type" +
s" $joinType is not supported.")
new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE",
messageParameters = Array(
s"Using PythonUDF in join condition of join type $joinType is not supported"))
}

def conflictingAttributesInJoinConditionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ class ExtractPythonUDFFromJoinConditionSuite extends PlanTest {
Optimize.execute(query.analyze)
}
assert(e.message.contentEquals(
s"Using PythonUDF in join condition of join type $joinType is not supported."))
"The feature is not supported: " +
s"Using PythonUDF in join condition of join type $joinType is not supported"))

val query2 = testRelationLeft.join(
testRelationRight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import java.util.Locale

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, Strategy}
import org.apache.spark.sql.{execution, AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -373,7 +373,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) =>

if (aggregateExpressions.exists(PythonUDF.isGroupedAggPandasUDF)) {
throw QueryCompilationErrors.groupAggPandasUDFUnsupportedByStreamingAggError()
throw new AnalysisException(
"Streaming aggregation doesn't support group aggregate pandas UDF")
Comment on lines +376 to +377
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify why did you make this replacement?

Copy link
Contributor Author

@itholic itholic Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the existing groupAggPandasUDFUnsupportedByStreamingAggError is not user-facing error, so I firstly remove it from QueryCompilationErrors since I think maybe we don't want to reveal the non-user-facing error to the error class.

And, also refer to https://github.com/apache/spark/pull/35656/files#r814517495.

Btw, maybe do we also want to leave some comment to the code base?

}

val sessionWindowOption = namedGroupingExpressions.find { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.sql.errors

import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.functions.{grouping, grouping_id}
import org.apache.spark.sql.{AnalysisException, IntegratedUDFTestUtils, QueryTest}
import org.apache.spark.sql.functions.{grouping, grouping_id, sum}
import org.apache.spark.sql.test.SharedSparkSession

case class StringLongClass(a: String, b: Long)
Expand Down Expand Up @@ -101,4 +101,71 @@ class QueryCompilationErrorsSuite extends QueryTest with SharedSparkSession {
assert(e.message ===
"The argument_index of string format cannot contain position 0$.")
}

test("CANNOT_USE_MIXTURE: Using aggregate function with grouped aggregate pandas UDF") {
import IntegratedUDFTestUtils._

val df = Seq(
(536361, "85123A", 2, 17850),
(536362, "85123B", 4, 17850),
(536363, "86123A", 6, 17851)
).toDF("InvoiceNo", "StockCode", "Quantity", "CustomerID")
val e = intercept[AnalysisException] {
val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf")
df.groupBy("CustomerId")
.agg(pandasTestUDF(df("Quantity")), sum(df("Quantity"))).collect()
}

assert(e.errorClass === Some("CANNOT_USE_MIXTURE"))
assert(e.message ===
"Cannot use a mixture of aggregate function and group aggregate pandas UDF")
}

test("UNSUPPORTED_FEATURE: Using Python UDF with unsupported join condition") {
import IntegratedUDFTestUtils._

val df1 = Seq(
(536361, "85123A", 2, 17850),
(536362, "85123B", 4, 17850),
(536363, "86123A", 6, 17851)
).toDF("InvoiceNo", "StockCode", "Quantity", "CustomerID")
val df2 = Seq(
("Bob", 17850),
("Alice", 17850),
("Tom", 17851)
).toDF("CustomerName", "CustomerID")

val e = intercept[AnalysisException] {
val pythonTestUDF = TestPythonUDF(name = "python_udf")
df1.join(
df2, pythonTestUDF(df1("CustomerID") === df2("CustomerID")), "leftouter").collect()
}

assert(e.errorClass === Some("UNSUPPORTED_FEATURE"))
assert(e.getSqlState === "0A000")
assert(e.message ===
"The feature is not supported: " +
"Using PythonUDF in join condition of join type LeftOuter is not supported")
}

test("UNSUPPORTED_FEATURE: Using pandas UDF aggregate expression with pivot") {
import IntegratedUDFTestUtils._

val df = Seq(
(536361, "85123A", 2, 17850),
(536362, "85123B", 4, 17850),
(536363, "86123A", 6, 17851)
).toDF("InvoiceNo", "StockCode", "Quantity", "CustomerID")

val e = intercept[AnalysisException] {
val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf")
df.groupBy(df("CustomerID")).pivot(df("CustomerID")).agg(pandasTestUDF(df("Quantity")))
}

assert(e.errorClass === Some("UNSUPPORTED_FEATURE"))
assert(e.getSqlState === "0A000")
assert(e.message ===
"The feature is not supported: " +
"Pandas UDF aggregate expressions don't support pivot.")
}
}