diff --git a/python/pyspark/errors/error-conditions.json b/python/pyspark/errors/error-conditions.json index 511e81cc08b63..49c5856934d3c 100644 --- a/python/pyspark/errors/error-conditions.json +++ b/python/pyspark/errors/error-conditions.json @@ -342,6 +342,11 @@ " index out of range, got ''." ] }, + "INPUT_NOT_FULLY_CONSUMED": { + "message": [ + "The input iterator must be fully consumed." + ] + }, "INVALID_ARROW_UDTF_RETURN_TYPE": { "message": [ "The return type of the arrow-optimized Python UDTF should be of type 'pandas.DataFrame', but the '' method returned a value of type with value: ." @@ -807,6 +812,11 @@ " is only supported with Spark Connect; however, the current Spark session does not use Spark Connect." ] }, + "OUTPUT_EXCEEDS_INPUT_ROWS": { + "message": [ + "The number of output rows must not exceed the number of input rows." + ] + }, "PACKAGE_NOT_INSTALLED": { "message": [ " >= must be installed; however, it was not found." @@ -819,11 +829,6 @@ "Alternatively set a pandas-on-spark option 'compute.fail_on_ansi_mode' to `False` to force it to work, although it can cause unexpected behavior." ] }, - "PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS" : { - "message": [ - "The Pandas SCALAR_ITER UDF outputs more rows than input rows." - ] - }, "PIPE_FUNCTION_EXITED": { "message": [ "Pipe function `` exited with error code ." @@ -875,24 +880,19 @@ "OPERATION_NOT_FOUND on the server but responses were already received from it." ] }, - "RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF": { - "message": [ - "Column names of the returned pyarrow.Table do not match specified schema." - ] - }, - "RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF": { + "RESULT_COLUMN_NAMES_MISMATCH": { "message": [ - "Column names of the returned pandas.DataFrame do not match specified schema." + "Column names of the returned data do not match specified schema." ] }, - "RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF": { + "RESULT_COLUMN_SCHEMA_MISMATCH": { "message": [ - "Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: Actual: " + "Number of columns of the returned data doesn't match specified schema. Expected: Actual: " ] }, - "RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF": { + "RESULT_ROWS_MISMATCH": { "message": [ - "The length of output in Scalar iterator pandas UDF should be the same with the input's; however, the length of output was and the length of input was ." + "The number of output rows () must match the number of input rows ()." ] }, "RESULT_TYPE_MISMATCH_FOR_ARROW_UDF": { @@ -960,11 +960,6 @@ "Caught StopIteration thrown from user's code; failing the task: " ] }, - "STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF": { - "message": [ - "pandas iterator UDF should exhaust the input iterator." - ] - }, "STREAMING_CONNECT_SERIALIZATION_ERROR": { "message": [ "Cannot serialize the function ``. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`." diff --git a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py index 80b12d3a7798b..bc45e59639d1f 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py @@ -192,8 +192,8 @@ def stats(key, left, right): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m. Unexpected: v, v2.\n", + "Column names of the returned data do not match specified schema. " + "Missing: m. Unexpected: v, v2.", ): # stats returns three columns while here we set schema with two columns self.cogrouped.applyInArrow(stats, schema="id long, m double").collect() @@ -228,8 +228,7 @@ def odd_means(key, left, _): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m.\n", + "Column names of the returned data do not match specified schema. Missing: m.", ): # stats returns one column for even keys while here we set schema with two columns self.cogrouped.applyInArrow(odd_means, schema="id long, m double").collect() diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py index c9ad602edfd27..251c60a27f227 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py @@ -179,8 +179,8 @@ def stats(key, table): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m. Unexpected: v, v2.\n", + "Column names of the returned data do not match specified schema. " + "Missing: m. Unexpected: v, v2.", ): # stats returns three columns while here we set schema with two columns df.groupby("id").applyInArrow(stats, schema="id long, m double").collect() @@ -215,8 +215,7 @@ def odd_means(key, table): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m.\n", + "Column names of the returned data do not match specified schema. Missing: m.", ): # stats returns one column for even keys while here we set schema with two columns df.groupby("id").applyInArrow(odd_means, schema="id long, m double").collect() diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index 61000762e75c6..d072b1243fb01 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -204,8 +204,8 @@ def merge_pandas(lft, rgt): self._test_merge_error( fn=merge_pandas, errorClass=PythonException, - error_message_regex="Column names of the returned pandas.DataFrame " - "do not match specified schema. Unexpected: add, more.\n", + error_message_regex="Column names of the returned data " + "do not match specified schema. Unexpected: add, more.", ) def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): @@ -225,8 +225,8 @@ def merge_pandas(lft, rgt): self._test_merge_error( fn=merge_pandas, errorClass=PythonException, - error_message_regex="Number of columns of the returned pandas.DataFrame " - "doesn't match specified schema. Expected: 4 Actual: 6\n", + error_message_regex="Number of columns of the returned data " + "doesn't match specified schema. Expected: 4 Actual: 6", ) def test_apply_in_pandas_returning_empty_dataframe(self): diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index c55eae0b2d62f..17bd2a3377337 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -313,8 +313,8 @@ def test_apply_in_pandas_returning_wrong_column_names(self): def check_apply_in_pandas_returning_wrong_column_names(self): with self.assertRaisesRegex( PythonException, - "Column names of the returned pandas.DataFrame do not match specified schema. " - "Missing: mean. Unexpected: median, std.\n", + "Column names of the returned data do not match specified schema. " + "Missing: mean. Unexpected: median, std.", ): self._test_apply_in_pandas( lambda key, pdf: pd.DataFrame( @@ -329,8 +329,8 @@ def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): def check_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): with self.assertRaisesRegex( PythonException, - "Number of columns of the returned pandas.DataFrame doesn't match " - "specified schema. Expected: 2 Actual: 3\n", + "Number of columns of the returned data doesn't match " + "specified schema. Expected: 2 Actual: 3", ): self._test_apply_in_pandas( lambda key, pdf: pd.DataFrame([key + (pdf.v.mean(), pdf.v.std())]) @@ -644,8 +644,8 @@ def invalid_positional_types(pdf): with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}): with self.assertRaisesRegex( PythonException, - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + "Column names of the returned data do not match " + "specified schema. Missing: id. Unexpected: iid.", ): grouped_df.apply(column_name_typo).collect() with self.assertRaisesRegex(Exception, "[D|d]ecimal.*got.*date"): diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index aa7ca70833e00..777d6d167048e 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -173,9 +173,9 @@ def dataframes_with_other_column_names(iterator): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id. Unexpected: iid.", ): ( self.spark.range(10, numPartitions=3) @@ -195,9 +195,9 @@ def dataframes_with_other_column_names(iterator): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id2.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id2.", ): ( self.spark.range(10, numPartitions=3) @@ -216,18 +216,18 @@ def check_dataframes_with_less_columns(self): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id2.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id2.", ): f = self.identity_dataframes_iter("id", "value") (df.mapInPandas(f, "id int, id2 long, value int").collect()) with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF\\] " - "Number of columns of the returned pandas.DataFrame doesn't match " - "specified schema. Expected: 3 Actual: 2\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_SCHEMA_MISMATCH\\] " + "Number of columns of the returned data doesn't match " + "specified schema. Expected: 3 Actual: 2", ): f = self.identity_dataframes_wo_column_names_iter("id", "value") (df.mapInPandas(f, "id int, id2 long, value int").collect()) @@ -334,9 +334,9 @@ def test_empty_dataframes_with_less_columns(self): def check_empty_dataframes_with_less_columns(self): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: value.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: value.", ): f = self.dataframes_and_empty_dataframe_iter("id") ( @@ -363,9 +363,9 @@ def empty_dataframes_with_other_columns(iterator): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id. Unexpected: iid.", ): ( self.spark.range(10, numPartitions=3) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py index 0128ae4840703..d7af5dd146327 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py @@ -678,7 +678,7 @@ def iter_udf_wong_output_size(it): yield pd.Series(1) with self.assertRaisesRegex( - Exception, "The length of output in Scalar iterator.*" "the length of output was 1" + Exception, "The number of output rows.*must match the number of input rows" ): df.select(iter_udf_wong_output_size(col("id"))).collect() @@ -691,7 +691,7 @@ def iter_udf_not_reading_all_input(it): with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 3}): df1 = self.spark.range(10).repartition(1) - with self.assertRaisesRegex(Exception, "pandas iterator UDF should exhaust"): + with self.assertRaisesRegex(Exception, "The input iterator must be fully consumed"): df1.select(iter_udf_not_reading_all_input(col("id"))).collect() def test_vectorized_udf_chained(self): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 29dfd65c0e2b8..6162854799cda 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -302,7 +302,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu extra = f" Unexpected: {', '.join(extra)}." if extra else "" raise PySparkRuntimeError( - errorClass="RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF", + errorClass="RESULT_COLUMN_NAMES_MISMATCH", messageParameters={ "missing": missing, "extra": extra, @@ -311,7 +311,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu # otherwise the number of columns of result have to match the return type elif len(result_columns) != len(return_type): raise PySparkRuntimeError( - errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", + errorClass="RESULT_COLUMN_SCHEMA_MISMATCH", messageParameters={ "expected": str(len(return_type)), "actual": str(len(result.columns)), @@ -445,7 +445,7 @@ def verify_arrow_result(table, assign_cols_by_name, expected_cols_and_types): extra = f" Unexpected: {', '.join(extra)}." if extra else "" raise PySparkRuntimeError( - errorClass="RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF", + errorClass="RESULT_COLUMN_NAMES_MISMATCH", messageParameters={ "missing": missing, "extra": extra, @@ -626,7 +626,7 @@ def verify_element(result): or (len(result.columns) == 0 and result.empty) ): raise PySparkRuntimeError( - errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", + errorClass="RESULT_COLUMN_SCHEMA_MISMATCH", messageParameters={ "expected": str(len(return_type)), "actual": str(len(result.columns)), @@ -1675,7 +1675,7 @@ def map_batch(batch): # input length. if is_scalar_iter and num_output_rows > num_input_rows: raise PySparkRuntimeError( - errorClass="PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={} + errorClass="OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={} ) yield (result_batch, result_type) @@ -1686,13 +1686,13 @@ def map_batch(batch): pass else: raise PySparkRuntimeError( - errorClass="STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF", + errorClass="INPUT_NOT_FULLY_CONSUMED", messageParameters={}, ) if num_output_rows != num_input_rows: raise PySparkRuntimeError( - errorClass="RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF", + errorClass="RESULT_ROWS_MISMATCH", messageParameters={ "output_length": str(num_output_rows), "input_length": str(num_input_rows),