Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@
"<arg_name> index out of range, got '<index>'."
]
},
"INPUT_NOT_FULLY_CONSUMED": {
"message": [
"The input iterator must be fully consumed."
]
},
"INVALID_ARROW_UDTF_RETURN_TYPE": {
"message": [
"The return type of the arrow-optimized Python UDTF should be of type 'pandas.DataFrame', but the '<func>' method returned a value of type <return_type> with value: <value>."
Expand Down Expand Up @@ -807,6 +812,11 @@
"<feature> is only supported with Spark Connect; however, the current Spark session does not use Spark Connect."
]
},
"OUTPUT_EXCEEDS_INPUT_ROWS": {
"message": [
"The number of output rows must not exceed the number of input rows."
]
},
"PACKAGE_NOT_INSTALLED": {
"message": [
"<package_name> >= <minimum_version> must be installed; however, it was not found."
Expand All @@ -819,11 +829,6 @@
"Alternatively set a pandas-on-spark option 'compute.fail_on_ansi_mode' to `False` to force it to work, although it can cause unexpected behavior."
]
},
"PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS" : {
"message": [
"The Pandas SCALAR_ITER UDF outputs more rows than input rows."
]
},
"PIPE_FUNCTION_EXITED": {
"message": [
"Pipe function `<func_name>` exited with error code <error_code>."
Expand Down Expand Up @@ -875,24 +880,19 @@
"OPERATION_NOT_FOUND on the server but responses were already received from it."
]
},
"RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF": {
"message": [
"Column names of the returned pyarrow.Table do not match specified schema.<missing><extra>"
]
},
"RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF": {
"RESULT_COLUMN_NAMES_MISMATCH": {
"message": [
"Column names of the returned pandas.DataFrame do not match specified schema.<missing><extra>"
"Column names of the returned data do not match specified schema.<missing><extra>"
]
},
"RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF": {
"RESULT_COLUMN_SCHEMA_MISMATCH": {
"message": [
"Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: <expected> Actual: <actual>"
"Number of columns of the returned data doesn't match specified schema. Expected: <expected> Actual: <actual>"
]
},
"RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF": {
"RESULT_ROWS_MISMATCH": {
"message": [
"The length of output in Scalar iterator pandas UDF should be the same with the input's; however, the length of output was <output_length> and the length of input was <input_length>."
"The number of output rows (<output_length>) must match the number of input rows (<input_length>)."
]
},
"RESULT_TYPE_MISMATCH_FOR_ARROW_UDF": {
Expand Down Expand Up @@ -960,11 +960,6 @@
"Caught StopIteration thrown from user's code; failing the task: <exc>"
]
},
"STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF": {
"message": [
"pandas iterator UDF should exhaust the input iterator."
]
},
"STREAMING_CONNECT_SERIALIZATION_ERROR": {
"message": [
"Cannot serialize the function `<name>`. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`."
Expand Down
7 changes: 3 additions & 4 deletions python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ def stats(key, left, right):
with self.quiet():
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pyarrow.Table do not match specified schema. "
"Missing: m. Unexpected: v, v2.\n",
"Column names of the returned data do not match specified schema. "
"Missing: m. Unexpected: v, v2.",
):
# stats returns three columns while here we set schema with two columns
self.cogrouped.applyInArrow(stats, schema="id long, m double").collect()
Expand Down Expand Up @@ -228,8 +228,7 @@ def odd_means(key, left, _):
with self.quiet():
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pyarrow.Table do not match specified schema. "
"Missing: m.\n",
"Column names of the returned data do not match specified schema. Missing: m.",
):
# stats returns one column for even keys while here we set schema with two columns
self.cogrouped.applyInArrow(odd_means, schema="id long, m double").collect()
Expand Down
7 changes: 3 additions & 4 deletions python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ def stats(key, table):
with self.quiet():
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pyarrow.Table do not match specified schema. "
"Missing: m. Unexpected: v, v2.\n",
"Column names of the returned data do not match specified schema. "
"Missing: m. Unexpected: v, v2.",
):
# stats returns three columns while here we set schema with two columns
df.groupby("id").applyInArrow(stats, schema="id long, m double").collect()
Expand Down Expand Up @@ -215,8 +215,7 @@ def odd_means(key, table):
with self.quiet():
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pyarrow.Table do not match specified schema. "
"Missing: m.\n",
"Column names of the returned data do not match specified schema. Missing: m.",
):
# stats returns one column for even keys while here we set schema with two columns
df.groupby("id").applyInArrow(odd_means, schema="id long, m double").collect()
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ def merge_pandas(lft, rgt):
self._test_merge_error(
fn=merge_pandas,
errorClass=PythonException,
error_message_regex="Column names of the returned pandas.DataFrame "
"do not match specified schema. Unexpected: add, more.\n",
error_message_regex="Column names of the returned data "
"do not match specified schema. Unexpected: add, more.",
)

def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self):
Expand All @@ -225,8 +225,8 @@ def merge_pandas(lft, rgt):
self._test_merge_error(
fn=merge_pandas,
errorClass=PythonException,
error_message_regex="Number of columns of the returned pandas.DataFrame "
"doesn't match specified schema. Expected: 4 Actual: 6\n",
error_message_regex="Number of columns of the returned data "
"doesn't match specified schema. Expected: 4 Actual: 6",
)

def test_apply_in_pandas_returning_empty_dataframe(self):
Expand Down
12 changes: 6 additions & 6 deletions python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ def test_apply_in_pandas_returning_wrong_column_names(self):
def check_apply_in_pandas_returning_wrong_column_names(self):
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pandas.DataFrame do not match specified schema. "
"Missing: mean. Unexpected: median, std.\n",
"Column names of the returned data do not match specified schema. "
"Missing: mean. Unexpected: median, std.",
):
self._test_apply_in_pandas(
lambda key, pdf: pd.DataFrame(
Expand All @@ -329,8 +329,8 @@ def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self):
def check_apply_in_pandas_returning_no_column_names_and_wrong_amount(self):
with self.assertRaisesRegex(
PythonException,
"Number of columns of the returned pandas.DataFrame doesn't match "
"specified schema. Expected: 2 Actual: 3\n",
"Number of columns of the returned data doesn't match "
"specified schema. Expected: 2 Actual: 3",
):
self._test_apply_in_pandas(
lambda key, pdf: pd.DataFrame([key + (pdf.v.mean(), pdf.v.std())])
Expand Down Expand Up @@ -644,8 +644,8 @@ def invalid_positional_types(pdf):
with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
with self.assertRaisesRegex(
PythonException,
"Column names of the returned pandas.DataFrame do not match "
"specified schema. Missing: id. Unexpected: iid.\n",
"Column names of the returned data do not match "
"specified schema. Missing: id. Unexpected: iid.",
):
grouped_df.apply(column_name_typo).collect()
with self.assertRaisesRegex(Exception, "[D|d]ecimal.*got.*date"):
Expand Down
36 changes: 18 additions & 18 deletions python/pyspark/sql/tests/pandas/test_pandas_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ def dataframes_with_other_column_names(iterator):

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"specified schema. Missing: id. Unexpected: iid.\n",
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: id. Unexpected: iid.",
):
(
self.spark.range(10, numPartitions=3)
Expand All @@ -195,9 +195,9 @@ def dataframes_with_other_column_names(iterator):

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"specified schema. Missing: id2.\n",
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: id2.",
):
(
self.spark.range(10, numPartitions=3)
Expand All @@ -216,18 +216,18 @@ def check_dataframes_with_less_columns(self):

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"specified schema. Missing: id2.\n",
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: id2.",
):
f = self.identity_dataframes_iter("id", "value")
(df.mapInPandas(f, "id int, id2 long, value int").collect())

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF\\] "
"Number of columns of the returned pandas.DataFrame doesn't match "
"specified schema. Expected: 3 Actual: 2\n",
"PySparkRuntimeError: \\[RESULT_COLUMN_SCHEMA_MISMATCH\\] "
"Number of columns of the returned data doesn't match "
"specified schema. Expected: 3 Actual: 2",
):
f = self.identity_dataframes_wo_column_names_iter("id", "value")
(df.mapInPandas(f, "id int, id2 long, value int").collect())
Expand Down Expand Up @@ -334,9 +334,9 @@ def test_empty_dataframes_with_less_columns(self):
def check_empty_dataframes_with_less_columns(self):
with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"specified schema. Missing: value.\n",
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: value.",
):
f = self.dataframes_and_empty_dataframe_iter("id")
(
Expand All @@ -363,9 +363,9 @@ def empty_dataframes_with_other_columns(iterator):

with self.assertRaisesRegex(
PythonException,
"PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] "
"Column names of the returned pandas.DataFrame do not match "
"specified schema. Missing: id. Unexpected: iid.\n",
"PySparkRuntimeError: \\[RESULT_COLUMN_NAMES_MISMATCH\\] "
"Column names of the returned data do not match "
"specified schema. Missing: id. Unexpected: iid.",
):
(
self.spark.range(10, numPartitions=3)
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ def iter_udf_wong_output_size(it):
yield pd.Series(1)

with self.assertRaisesRegex(
Exception, "The length of output in Scalar iterator.*" "the length of output was 1"
Exception, "The number of output rows.*must match the number of input rows"
):
df.select(iter_udf_wong_output_size(col("id"))).collect()

Expand All @@ -691,7 +691,7 @@ def iter_udf_not_reading_all_input(it):

with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 3}):
df1 = self.spark.range(10).repartition(1)
with self.assertRaisesRegex(Exception, "pandas iterator UDF should exhaust"):
with self.assertRaisesRegex(Exception, "The input iterator must be fully consumed"):
df1.select(iter_udf_not_reading_all_input(col("id"))).collect()

def test_vectorized_udf_chained(self):
Expand Down
14 changes: 7 additions & 7 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu
extra = f" Unexpected: {', '.join(extra)}." if extra else ""

raise PySparkRuntimeError(
errorClass="RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF",
errorClass="RESULT_COLUMN_NAMES_MISMATCH",
messageParameters={
"missing": missing,
"extra": extra,
Expand All @@ -311,7 +311,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu
# otherwise the number of columns of result have to match the return type
elif len(result_columns) != len(return_type):
raise PySparkRuntimeError(
errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF",
errorClass="RESULT_COLUMN_SCHEMA_MISMATCH",
messageParameters={
"expected": str(len(return_type)),
"actual": str(len(result.columns)),
Expand Down Expand Up @@ -445,7 +445,7 @@ def verify_arrow_result(table, assign_cols_by_name, expected_cols_and_types):
extra = f" Unexpected: {', '.join(extra)}." if extra else ""

raise PySparkRuntimeError(
errorClass="RESULT_COLUMNS_MISMATCH_FOR_ARROW_UDF",
errorClass="RESULT_COLUMN_NAMES_MISMATCH",
messageParameters={
"missing": missing,
"extra": extra,
Expand Down Expand Up @@ -626,7 +626,7 @@ def verify_element(result):
or (len(result.columns) == 0 and result.empty)
):
raise PySparkRuntimeError(
errorClass="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF",
errorClass="RESULT_COLUMN_SCHEMA_MISMATCH",
messageParameters={
"expected": str(len(return_type)),
"actual": str(len(result.columns)),
Expand Down Expand Up @@ -1675,7 +1675,7 @@ def map_batch(batch):
# input length.
if is_scalar_iter and num_output_rows > num_input_rows:
raise PySparkRuntimeError(
errorClass="PANDAS_UDF_OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={}
errorClass="OUTPUT_EXCEEDS_INPUT_ROWS", messageParameters={}
)
yield (result_batch, result_type)

Expand All @@ -1686,13 +1686,13 @@ def map_batch(batch):
pass
else:
raise PySparkRuntimeError(
errorClass="STOP_ITERATION_OCCURRED_FROM_SCALAR_ITER_PANDAS_UDF",
errorClass="INPUT_NOT_FULLY_CONSUMED",
messageParameters={},
)

if num_output_rows != num_input_rows:
raise PySparkRuntimeError(
errorClass="RESULT_LENGTH_MISMATCH_FOR_SCALAR_ITER_PANDAS_UDF",
errorClass="RESULT_ROWS_MISMATCH",
messageParameters={
"output_length": str(num_output_rows),
"input_length": str(num_input_rows),
Expand Down