From def68f3fc38a68be964088b61530316cc81b2f5b Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 26 Mar 2026 13:51:21 +0800 Subject: [PATCH] [SPARK-55579][PYTHON] Rename PySpark error classes to be eval-type-agnostic Rename six PySpark error conditions to be generic and not tied to specific UDF eval types: | Old Name | New Name | |---|---| | `PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS` | `OUTPUT_EXCEEDS_INPUT_ROWS` | | `RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF` | `RESULT_ROWS_MISMATCH` | | `STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF` | `INPUT_NOT_FULLY_CONSUMED` | | `RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF` | `RESULT_COLUMN_SCHEMA_MISMATCH` | | `RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF` | `RESULT_COLUMN_NAMES_MISMATCH` | | `RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF` | `RESULT_COLUMN_NAMES_MISMATCH` (merged) | Also updated error messages to not reference specific eval types or data structures (e.g., "pandas.DataFrame", "pyarrow.Table" -> "data"). These error conditions were originally created for Pandas UDFs, but are now shared by Arrow UDFs as well. The names and messages should be generic so they can be reused across different eval types without confusion. Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388). Yes. Error condition names and messages are updated. Users who catch specific error conditions by name will need to update their references. Existing tests updated to match new error condition names and messages. No Closes #54996 from Yicong-Huang/SPARK-55579/rename-error-classes. Authored-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Signed-off-by: Ruifeng Zheng --- python/pyspark/errors/error-conditions.json | 37 ++++++++----------- .../tests/arrow/test_arrow_cogrouped_map.py | 7 ++-- .../sql/tests/arrow/test_arrow_grouped_map.py | 7 ++-- .../tests/pandas/test_pandas_cogrouped_map.py | 8 ++-- .../tests/pandas/test_pandas_grouped_map.py | 12 +++--- .../sql/tests/pandas/test_pandas_map.py | 36 +++++++++--------- .../tests/pandas/test_pandas_udf_scalar.py | 4 +- python/pyspark/worker.py | 14 +++---- 8 files changed, 59 insertions(+), 66 deletions(-) diff --git a/python/pyspark/errors/error-conditions.json b/python/pyspark/errors/error-conditions.json index 511e81cc08b63..49c5856934d3c 100644 --- a/python/pyspark/errors/error-conditions.json +++ b/python/pyspark/errors/error-conditions.json @@ -342,6 +342,11 @@ " index out of range, got ''." ] }, + "INPUT_NOT_FULLY_CONSUMED": { + "message": [ + "The input iterator must be fully consumed." + ] + }, "INVALID_ARROW_UDTF_RETURN_TYPE": { "message": [ "The return type of the arrow-optimized Python UDTF should be of type 'pandas.DataFrame', but the '' method returned a value of type with value: ." @@ -807,6 +812,11 @@ " is only supported with Spark Connect; however, the current Spark session does not use Spark Connect." ] }, + "OUTPUT_EXCEEDS_INPUT_ROWS": { + "message": [ + "The number of output rows must not exceed the number of input rows." + ] + }, "PACKAGE_NOT_INSTALLED": { "message": [ " >= must be installed; however, it was not found." @@ -819,11 +829,6 @@ "Alternatively set a pandas-on-spark option 'compute.fail_on_ansi_mode' to `False` to force it to work, although it can cause unexpected behavior." ] }, - "PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS" : { - "message": [ - "The Pandas SCALAR_ITER UDF outputs more rows than input rows." - ] - }, "PIPE_FUNCTION_EXITED": { "message": [ "Pipe function `` exited with error code ." @@ -875,24 +880,19 @@ "OPERATION_NOT_FOUND on the server but responses were already received from it." ] }, - "RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF": { - "message": [ - "Column names of the returned pyarrow.Table do not match specified schema." - ] - }, - "RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF": { + "RESULT_COLUMN_NAMES_MISMATCH": { "message": [ - "Column names of the returned pandas.DataFrame do not match specified schema." + "Column names of the returned data do not match specified schema." ] }, - "RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF": { + "RESULT_COLUMN_SCHEMA_MISMATCH": { "message": [ - "Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: Actual: " + "Number of columns of the returned data doesn't match specified schema. Expected: Actual: " ] }, - "RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF": { + "RESULT_ROWS_MISMATCH": { "message": [ - "The length of output in Scalar iterator pandas UDF should be the same with the input's; however, the length of output was and the length of input was ." + "The number of output rows () must match the number of input rows ()." ] }, "RESULT_TYPE_MISMATCH_FOR_ARROW_UDF": { @@ -960,11 +960,6 @@ "Caught StopIteration thrown from user's code; failing the task: " ] }, - "STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF": { - "message": [ - "pandas iterator UDF should exhaust the input iterator." - ] - }, "STREAMING_CONNECT_SERIALIZATION_ERROR": { "message": [ "Cannot serialize the function ``. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`." diff --git a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py index 80b12d3a7798b..bc45e59639d1f 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py @@ -192,8 +192,8 @@ def stats(key, left, right): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m. Unexpected: v, v2.\n", + "Column names of the returned data do not match specified schema. " + "Missing: m. Unexpected: v, v2.", ): # stats returns three columns while here we set schema with two columns self.cogrouped.applyInArrow(stats, schema="id long, m double").collect() @@ -228,8 +228,7 @@ def odd_means(key, left, _): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m.\n", + "Column names of the returned data do not match specified schema. Missing: m.", ): # stats returns one column for even keys while here we set schema with two columns self.cogrouped.applyInArrow(odd_means, schema="id long, m double").collect() diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py index c9ad602edfd27..251c60a27f227 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py @@ -179,8 +179,8 @@ def stats(key, table): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m. Unexpected: v, v2.\n", + "Column names of the returned data do not match specified schema. " + "Missing: m. Unexpected: v, v2.", ): # stats returns three columns while here we set schema with two columns df.groupby("id").applyInArrow(stats, schema="id long, m double").collect() @@ -215,8 +215,7 @@ def odd_means(key, table): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m.\n", + "Column names of the returned data do not match specified schema. Missing: m.", ): # stats returns one column for even keys while here we set schema with two columns df.groupby("id").applyInArrow(odd_means, schema="id long, m double").collect() diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index 61000762e75c6..d072b1243fb01 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -204,8 +204,8 @@ def merge_pandas(lft, rgt): self._test_merge_error( fn=merge_pandas, errorClass=PythonException, - error_message_regex="Column names of the returned pandas.DataFrame " - "do not match specified schema. Unexpected: add, more.\n", + error_message_regex="Column names of the returned data " + "do not match specified schema. Unexpected: add, more.", ) def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): @@ -225,8 +225,8 @@ def merge_pandas(lft, rgt): self._test_merge_error( fn=merge_pandas, errorClass=PythonException, - error_message_regex="Number of columns of the returned pandas.DataFrame " - "doesn't match specified schema. Expected: 4 Actual: 6\n", + error_message_regex="Number of columns of the returned data " + "doesn't match specified schema. Expected: 4 Actual: 6", ) def test_apply_in_pandas_returning_empty_dataframe(self): diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index c55eae0b2d62f..17bd2a3377337 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -313,8 +313,8 @@ def test_apply_in_pandas_returning_wrong_column_names(self): def check_apply_in_pandas_returning_wrong_column_names(self): with self.assertRaisesRegex( PythonException, - "Column names of the returned pandas.DataFrame do not match specified schema. " - "Missing: mean. Unexpected: median, std.\n", + "Column names of the returned data do not match specified schema. " + "Missing: mean. Unexpected: median, std.", ): self._test_apply_in_pandas( lambda key, pdf: pd.DataFrame( @@ -329,8 +329,8 @@ def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): def check_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): with self.assertRaisesRegex( PythonException, - "Number of columns of the returned pandas.DataFrame doesn't match " - "specified schema. Expected: 2 Actual: 3\n", + "Number of columns of the returned data doesn't match " + "specified schema. Expected: 2 Actual: 3", ): self._test_apply_in_pandas( lambda key, pdf: pd.DataFrame([key + (pdf.v.mean(), pdf.v.std())]) @@ -644,8 +644,8 @@ def invalid_positional_types(pdf): with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}): with self.assertRaisesRegex( PythonException, - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + "Column names of the returned data do not match " + "specified schema. Missing: id. Unexpected: iid.", ): grouped_df.apply(column_name_typo).collect() with self.assertRaisesRegex(Exception, "[D|d]ecimal.*got.*date"): diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index aa7ca70833e00..777d6d167048e 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -173,9 +173,9 @@ def dataframes_with_other_column_names(iterator): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id. Unexpected: iid.", ): ( self.spark.range(10, numPartitions=3) @@ -195,9 +195,9 @@ def dataframes_with_other_column_names(iterator): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id2.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id2.", ): ( self.spark.range(10, numPartitions=3) @@ -216,18 +216,18 @@ def check_dataframes_with_less_columns(self): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id2.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id2.", ): f = self.identity_dataframes_iter("id", "value") (df.mapInPandas(f, "id int, id2 long, value int").collect()) with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF\\] " - "Number of columns of the returned pandas.DataFrame doesn't match " - "specified schema. Expected: 3 Actual: 2\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_SCHEMA_MISMATCH\\] " + "Number of columns of the returned data doesn't match " + "specified schema. Expected: 3 Actual: 2", ): f = self.identity_dataframes_wo_column_names_iter("id", "value") (df.mapInPandas(f, "id int, id2 long, value int").collect()) @@ -334,9 +334,9 @@ def test_empty_dataframes_with_less_columns(self): def check_empty_dataframes_with_less_columns(self): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: value.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: value.", ): f = self.dataframes_and_empty_dataframe_iter("id") ( @@ -363,9 +363,9 @@ def empty_dataframes_with_other_columns(iterator): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id. Unexpected: iid.", ): ( self.spark.range(10, numPartitions=3) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py index 0128ae4840703..d7af5dd146327 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py @@ -678,7 +678,7 @@ def iter_udf_wong_output_size(it): yield pd.Series(1) with self.assertRaisesRegex( - Exception, "The length of output in Scalar iterator.*" "the length of output was 1" + Exception, "The number of output rows.*must match the number of input rows" ): df.select(iter_udf_wong_output_size(col("id"))).collect() @@ -691,7 +691,7 @@ def iter_udf_not_reading_all_input(it): with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 3}): df1 = self.spark.range(10).repartition(1) - with self.assertRaisesRegex(Exception, "pandas iterator UDF should exhaust"): + with self.assertRaisesRegex(Exception, "The input iterator must be fully consumed"): df1.select(iter_udf_not_reading_all_input(col("id"))).collect() def test_vectorized_udf_chained(self): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 29dfd65c0e2b8..6162854799cda 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -302,7 +302,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu extra = f" Unexpected: {', '.join(extra)}." if extra else "" raise PySparkRuntimeError( - errorClass="RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF", + errorClass="RESULT_COLUMN_NAMES_MISMATCH", messageParameters={ "missing": missing, "extra": extra, @@ -311,7 +311,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu # otherwise the number of columns of result have to match the return type elif len(result_columns) != len(return_type): raise PySparkRuntimeError( - errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", + errorClass="RESULT_COLUMN_SCHEMA_MISMATCH", messageParameters={ "expected": str(len(return_type)), "actual": str(len(result.columns)), @@ -445,7 +445,7 @@ def verify_arrow_result(table, assign_cols_by_name, expected_cols_and_types): extra = f" Unexpected: {', '.join(extra)}." if extra else "" raise PySparkRuntimeError( - errorClass="RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF", + errorClass="RESULT_COLUMN_NAMES_MISMATCH", messageParameters={ "missing": missing, "extra": extra, @@ -626,7 +626,7 @@ def verify_element(result): or (len(result.columns) == 0 and result.empty) ): raise PySparkRuntimeError( - errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", + errorClass="RESULT_COLUMN_SCHEMA_MISMATCH", messageParameters={ "expected": str(len(return_type)), "actual": str(len(result.columns)), @@ -1675,7 +1675,7 @@ def map_batch(batch): # input length. if is_scalar_iter and num_output_rows > num_input_rows: raise PySparkRuntimeError( - errorClass="PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={} + errorClass="OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={} ) yield (result_batch, result_type) @@ -1686,13 +1686,13 @@ def map_batch(batch): pass else: raise PySparkRuntimeError( - errorClass="STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF", + errorClass="INPUT_NOT_FULLY_CONSUMED", messageParameters={}, ) if num_output_rows != num_input_rows: raise PySparkRuntimeError( - errorClass="RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF", + errorClass="RESULT_ROWS_MISMATCH", messageParameters={ "output_length": str(num_output_rows), "input_length": str(num_input_rows),