diff --git a/python/pyspark/errors/error-conditions.json b/python/pyspark/errors/error-conditions.json index ebc4a7f4db38d..4cdddb7da3efe 100644 --- a/python/pyspark/errors/error-conditions.json +++ b/python/pyspark/errors/error-conditions.json @@ -174,8 +174,8 @@ "`` does not allow a Column in a list." ] }, - "CONFLICTING_PIPELINE_REFRESH_OPTIONS" : { - "message" : [ + "CONFLICTING_PIPELINE_REFRESH_OPTIONS": { + "message": [ "--full-refresh-all option conflicts with ", "The --full-refresh-all option performs a full refresh of all datasets, ", "so specifying individual datasets with is not allowed." @@ -379,6 +379,11 @@ " index out of range, got ''." ] }, + "INPUT_NOT_FULLY_CONSUMED": { + "message": [ + "The input iterator must be fully consumed." + ] + }, "INVALID_ARROW_UDTF_RETURN_TYPE": { "message": [ "The return type of the arrow-optimized Python UDTF should be of type 'pandas.DataFrame', but the '' method returned a value of type with value: ." @@ -869,6 +874,11 @@ " is only supported with Spark Connect; however, the current Spark session does not use Spark Connect." ] }, + "OUTPUT_EXCEEDS_INPUT_ROWS": { + "message": [ + "The number of output rows must not exceed the number of input rows." + ] + }, "PACKAGE_NOT_INSTALLED": { "message": [ " >= must be installed; however, it was not found." @@ -881,11 +891,6 @@ "Alternatively set a pandas-on-spark option 'compute.fail_on_ansi_mode' to `False` to force it to work, although it can cause unexpected behavior." ] }, - "PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS": { - "message": [ - "The Pandas SCALAR_ITER UDF outputs more rows than input rows." - ] - }, "PIPELINE_SPEC_DICT_KEY_NOT_STRING": { "message": [ "For pipeline spec field ``, key should be a string, got ." @@ -982,29 +987,24 @@ " on the server but responses were already received from it." ] }, - "RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF": { - "message": [ - "Column names of the returned pyarrow.Table do not match specified schema." - ] - }, "RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDTF": { "message": [ "Column names of the returned pyarrow.Table or pyarrow.RecordBatch do not match specified schema. Expected: Actual: " ] }, - "RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF": { + "RESULT_COLUMN_NAMES_MISMATCH": { "message": [ - "Column names of the returned pandas.DataFrame do not match specified schema." + "Column names of the returned data do not match specified schema." ] }, - "RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF": { + "RESULT_COLUMN_SCHEMA_MISMATCH": { "message": [ - "Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: Actual: " + "Number of columns of the returned data doesn't match specified schema. Expected: Actual: " ] }, - "RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF": { + "RESULT_ROWS_MISMATCH": { "message": [ - "The length of output in Scalar iterator pandas UDF should be the same with the input's; however, the length of output was and the length of input was ." + "The number of output rows () must match the number of input rows ()." ] }, "RESULT_TYPE_MISMATCH_FOR_ARROW_UDF": { @@ -1037,24 +1037,24 @@ "Session mutation is not allowed in declarative pipelines." ], "sub_class": { - "SET_RUNTIME_CONF": { + "CREATE_GLOBAL_TEMP_VIEW": { "message": [ - "Instead set configuration via the pipeline spec or use the 'spark_conf' argument in various decorators." + "Instead use the @temporary_view decorator to define temporary views." ] }, - "SET_CURRENT_CATALOG": { + "CREATE_OR_REPLACE_GLOBAL_TEMP_VIEW": { "message": [ - "Instead set catalog via the pipeline spec or the 'name' argument on the dataset decorators." + "Instead use the @temporary_view decorator to define temporary views." ] }, - "SET_CURRENT_DATABASE": { + "CREATE_OR_REPLACE_TEMP_VIEW": { "message": [ - "Instead set database via the pipeline spec or the 'name' argument on the dataset decorators." + "Instead use the @temporary_view decorator to define temporary views." ] }, - "DROP_TEMP_VIEW": { + "CREATE_TEMP_VIEW": { "message": [ - "Instead remove the temporary view definition directly." + "Instead use the @temporary_view decorator to define temporary views." ] }, "DROP_GLOBAL_TEMP_VIEW": { @@ -1062,39 +1062,39 @@ "Instead remove the temporary view definition directly." ] }, - "CREATE_TEMP_VIEW": { + "DROP_TEMP_VIEW": { "message": [ - "Instead use the @temporary_view decorator to define temporary views." + "Instead remove the temporary view definition directly." ] }, - "CREATE_OR_REPLACE_TEMP_VIEW": { + "REGISTER_JAVA_UDAF": { "message": [ - "Instead use the @temporary_view decorator to define temporary views." + "" ] }, - "CREATE_GLOBAL_TEMP_VIEW": { + "REGISTER_JAVA_UDF": { "message": [ - "Instead use the @temporary_view decorator to define temporary views." + "" ] }, - "CREATE_OR_REPLACE_GLOBAL_TEMP_VIEW": { + "REGISTER_UDF": { "message": [ - "Instead use the @temporary_view decorator to define temporary views." + "" ] }, - "REGISTER_UDF": { + "SET_CURRENT_CATALOG": { "message": [ - "" + "Instead set catalog via the pipeline spec or the 'name' argument on the dataset decorators." ] }, - "REGISTER_JAVA_UDF": { + "SET_CURRENT_DATABASE": { "message": [ - "" + "Instead set database via the pipeline spec or the 'name' argument on the dataset decorators." ] }, - "REGISTER_JAVA_UDAF": { + "SET_RUNTIME_CONF": { "message": [ - "" + "Instead set configuration via the pipeline spec or use the 'spark_conf' argument in various decorators." ] } } @@ -1139,33 +1139,28 @@ "Caught StopIteration thrown from user's code; failing the task: " ] }, - "STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF": { - "message": [ - "pandas iterator UDF should exhaust the input iterator." - ] - }, "STREAMING_CONNECT_SERIALIZATION_ERROR": { "message": [ "Cannot serialize the function ``. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`." ] }, - "ST_INVALID_ALGORITHM_VALUE" : { - "message" : [ + "ST_INVALID_ALGORITHM_VALUE": { + "message": [ "Invalid or unsupported edge interpolation algorithm value: ''." ], - "sqlState" : "22023" + "sqlState": "22023" }, - "ST_INVALID_CRS_VALUE" : { - "message" : [ + "ST_INVALID_CRS_VALUE": { + "message": [ "Invalid or unsupported CRS (coordinate reference system) value: ''." ], - "sqlState" : "22023" + "sqlState": "22023" }, - "ST_INVALID_SRID_VALUE" : { - "message" : [ + "ST_INVALID_SRID_VALUE": { + "message": [ "Invalid or unsupported SRID (spatial reference identifier) value: ." ], - "sqlState" : "22023" + "sqlState": "22023" }, "TEST_CLASS_NOT_COMPILED": { "message": [ diff --git a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py index abfd1af7a741e..5f256ece6e91d 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py @@ -195,8 +195,8 @@ def stats(key, left, right): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m. Unexpected: v, v2.\n", + "Column names of the returned data do not match specified schema. " + "Missing: m. Unexpected: v, v2.", ): # stats returns three columns while here we set schema with two columns self.cogrouped.applyInArrow(stats, schema="id long, m double").collect() @@ -231,8 +231,7 @@ def odd_means(key, left, _): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m.\n", + "Column names of the returned data do not match specified schema. Missing: m.", ): # stats returns one column for even keys while here we set schema with two columns self.cogrouped.applyInArrow(odd_means, schema="id long, m double").collect() diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py index cb8d74f724270..09554a5fa9ea9 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py @@ -223,8 +223,8 @@ def stats(key, table): for func_variation in function_variations(stats): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m. Unexpected: v, v2.\n", + "Column names of the returned data do not match specified schema. " + "Missing: m. Unexpected: v, v2.", ): # stats returns three columns while here we set schema with two columns df.groupby("id").applyInArrow( @@ -264,8 +264,7 @@ def odd_means(key, table): with self.quiet(): with self.assertRaisesRegex( PythonException, - "Column names of the returned pyarrow.Table do not match specified schema. " - "Missing: m.\n", + "Column names of the returned data do not match specified schema. Missing: m.", ): # stats returns one column for even keys while here we set schema with two columns df.groupby("id").applyInArrow(odd_means, schema="id long, m double").collect() diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index 31080cbe6e0c8..b946bc62a4922 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -208,8 +208,8 @@ def merge_pandas(lft, rgt): self._test_merge_error( fn=merge_pandas, errorClass=PythonException, - error_message_regex="Column names of the returned pandas.DataFrame " - "do not match specified schema. Unexpected: add, more.\n", + error_message_regex="Column names of the returned data " + "do not match specified schema. Unexpected: add, more.", ) def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): @@ -229,8 +229,8 @@ def merge_pandas(lft, rgt): self._test_merge_error( fn=merge_pandas, errorClass=PythonException, - error_message_regex="Number of columns of the returned pandas.DataFrame " - "doesn't match specified schema. Expected: 4 Actual: 6\n", + error_message_regex="Number of columns of the returned data " + "doesn't match specified schema. Expected: 4 Actual: 6", ) def test_apply_in_pandas_returning_empty_dataframe(self): diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index 7b35714b3528c..9b9425e9fd182 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -302,8 +302,8 @@ def test_apply_in_pandas_returning_wrong_column_names(self): def check_apply_in_pandas_returning_wrong_column_names(self): with self.assertRaisesRegex( PythonException, - "Column names of the returned pandas.DataFrame do not match specified schema. " - "Missing: mean. Unexpected: median, std.\n", + "Column names of the returned data do not match specified schema. " + "Missing: mean. Unexpected: median, std.", ): self._test_apply_in_pandas( lambda key, pdf: pd.DataFrame( @@ -318,8 +318,8 @@ def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): def check_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): with self.assertRaisesRegex( PythonException, - "Number of columns of the returned pandas.DataFrame doesn't match " - "specified schema. Expected: 2 Actual: 3\n", + "Number of columns of the returned data doesn't match " + "specified schema. Expected: 2 Actual: 3", ): self._test_apply_in_pandas( lambda key, pdf: pd.DataFrame([key + (pdf.v.mean(), pdf.v.std())]) @@ -664,8 +664,8 @@ def invalid_positional_types(pdf): with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}): with self.assertRaisesRegex( PythonException, - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + "Column names of the returned data do not match " + "specified schema. Missing: id. Unexpected: iid.", ): grouped_df.apply(column_name_typo).collect() with self.assertRaisesRegex(Exception, "[D|d]ecimal.*got.*date"): diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index fd3eb7b7dc7a1..6142ee03eea4f 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -200,9 +200,9 @@ def dataframes_with_other_column_names(iterator): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id. Unexpected: iid.", ): ( self.spark.range(10, numPartitions=3) @@ -222,9 +222,9 @@ def dataframes_with_other_column_names(iterator): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id2.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id2.", ): ( self.spark.range(10, numPartitions=3) @@ -243,18 +243,18 @@ def check_dataframes_with_less_columns(self): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id2.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id2.", ): f = self.identity_dataframes_iter("id", "value") (df.mapInPandas(f, "id int, id2 long, value int").collect()) with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF\\] " - "Number of columns of the returned pandas.DataFrame doesn't match " - "specified schema. Expected: 3 Actual: 2\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_SCHEMA_MISMATCH\\] " + "Number of columns of the returned data doesn't match " + "specified schema. Expected: 3 Actual: 2", ): f = self.identity_dataframes_wo_column_names_iter("id", "value") (df.mapInPandas(f, "id int, id2 long, value int").collect()) @@ -361,9 +361,9 @@ def test_empty_dataframes_with_less_columns(self): def check_empty_dataframes_with_less_columns(self): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: value.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: value.", ): f = self.dataframes_and_empty_dataframe_iter("id") ( @@ -390,9 +390,9 @@ def empty_dataframes_with_other_columns(iterator): with self.assertRaisesRegex( PythonException, - "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " - "Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + "PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] " + "Column names of the returned data do not match " + "specified schema. Missing: id. Unexpected: iid.", ): ( self.spark.range(10, numPartitions=3) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py index b936a9240e529..637bc6854a6a1 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py @@ -688,7 +688,7 @@ def iter_udf_wong_output_size(it): yield pd.Series(1) with self.assertRaisesRegex( - Exception, "The length of output in Scalar iterator.*" "the length of output was 1" + Exception, "The number of output rows.*must match the number of input rows" ): df.select(iter_udf_wong_output_size(col("id"))).collect() @@ -701,7 +701,7 @@ def iter_udf_not_reading_all_input(it): with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 3}): df1 = self.spark.range(10).repartition(1) - with self.assertRaisesRegex(Exception, "pandas iterator UDF should exhaust"): + with self.assertRaisesRegex(Exception, "The input iterator must be fully consumed"): df1.select(iter_udf_not_reading_all_input(col("id"))).collect() def test_vectorized_udf_chained(self): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 4bae9f6dc48f5..f9cc0e68e237a 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -435,7 +435,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu extra = f" Unexpected: {', '.join(extra)}." if extra else "" raise PySparkRuntimeError( - errorClass="RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF", + errorClass="RESULT_COLUMN_NAMES_MISMATCH", messageParameters={ "missing": missing, "extra": extra, @@ -444,7 +444,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu # otherwise the number of columns of result have to match the return type elif len(result_columns) != len(return_type): raise PySparkRuntimeError( - errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", + errorClass="RESULT_COLUMN_SCHEMA_MISMATCH", messageParameters={ "expected": str(len(return_type)), "actual": str(len(result.columns)), @@ -602,7 +602,7 @@ def verify_arrow_result(result, assign_cols_by_name, expected_cols_and_types): extra = f" Unexpected: {', '.join(extra)}." if extra else "" raise PySparkRuntimeError( - errorClass="RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF", + errorClass="RESULT_COLUMN_NAMES_MISMATCH", messageParameters={ "missing": missing, "extra": extra, @@ -908,7 +908,7 @@ def verify_element(result): or (len(result.columns) == 0 and result.empty) ): raise PySparkRuntimeError( - errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", + errorClass="RESULT_COLUMN_SCHEMA_MISMATCH", messageParameters={ "expected": str(len(return_type)), "actual": str(len(result.columns)), @@ -2915,7 +2915,7 @@ def map_batch(batch): # input length. if is_scalar_iter and num_output_rows > num_input_rows: raise PySparkRuntimeError( - errorClass="PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={} + errorClass="OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={} ) yield (result_batch, result_type) @@ -2926,13 +2926,13 @@ def map_batch(batch): pass else: raise PySparkRuntimeError( - errorClass="STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF", + errorClass="INPUT_NOT_FULLY_CONSUMED", messageParameters={}, ) if num_output_rows != num_input_rows: raise PySparkRuntimeError( - errorClass="RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF", + errorClass="RESULT_ROWS_MISMATCH", messageParameters={ "output_length": str(num_output_rows), "input_length": str(num_input_rows),