Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48054][PYTHON][CONNECT][INFRA] Backward compatibility test for Spark Connect #46298

Closed
wants to merge 12 commits into from

Conversation

HyukjinKwon
Copy link
Member

What changes were proposed in this pull request?

This PR is a tentative try to run Spark 3.5 tests with Python Client 3.5 against Spark Connect server 4.0.

Why are the changes needed?

In order to make sure the compatibility between 3.5 and 4.0.

Does this PR introduce any user-facing change?

No, testing-only.

How was this patch tested?

Within my own fork.

Was this patch authored or co-authored using generative AI tooling?

No.

@HyukjinKwon HyukjinKwon marked this pull request as draft April 30, 2024 06:30
@github-actions github-actions bot added the INFRA label Apr 30, 2024
.github/workflows/build_python_connect35.yml Outdated Show resolved Hide resolved
.github/workflows/build_python_connect35.yml Outdated Show resolved Hide resolved
.github/workflows/build_python_connect35.yml Outdated Show resolved Hide resolved
.github/workflows/build_python_connect35.yml Show resolved Hide resolved
@pan3793
Copy link
Member

pan3793 commented Apr 30, 2024

A basic question about the "backward compatibility" policy: does it mean that the user can

  • use a lower version client to connect to a higher version server? or
  • use a higher version client to connect to a lower version server? or
  • use an arbitrary version client to connect to an arbitrary version server?

@HyukjinKwon
Copy link
Member Author

For now, I am testing old client -> newer server case only.

@nija-at
Copy link
Contributor

nija-at commented Apr 30, 2024

@pan3793 - user must use a lower (or same) version client to connect to a server ONLY.

Copy link
Contributor

@nija-at nija-at left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Consider the minor comments above.

@pan3793
Copy link
Member

pan3793 commented Apr 30, 2024

@pan3793 - user must use a lower (or same) version client to connect to a server ONLY.

make sense, would be great to clarify that on the docs :)

@HyukjinKwon HyukjinKwon force-pushed the SPARK-48054 branch 2 times, most recently from b9498ff to ac4d2d1 Compare April 30, 2024 11:08
@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented May 1, 2024

For notes to my self:

Tests being skipped for now

======================================================================
ERROR [1.120s]: test_save_load (pyspark.ml.tests.connect.test_connect_classification.ClassificationTestsOnConnect.test_save_load)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py", line 144, in test_save_load
    estimator.save(fs_path)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/io_utils.py", line 248, in save
    _copy_dir_from_local_to_fs(tmp_local_dir, path)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/io_utils.py", line 57, in _copy_dir_from_local_to_fs
    _copy_file_from_local_to_fs(file_path, dest_file_path)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/io_utils.py", line 39, in _copy_file_from_local_to_fs
    session.copyFromLocalToFs(local_path, dest_path)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/session.py", line 756, in copyFromLocalToFs
    self._client.copy_from_local_to_fs(local_path, dest_path)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1549, in copy_from_local_to_fs
    self._artifact_manager._add_forward_to_fs_artifacts(local_path, dest_path)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/artifact.py", line 280, in _add_forward_to_fs_artifacts
    self._request_add_artifacts(requests)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/artifact.py", line 259, in _request_add_artifacts
    response: proto.AddArtifactsResponse = self._retrieve_responses(requests)
                                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/artifact.py", line 256, in _retrieve_responses
    return self._stub.AddArtifacts(requests, metadata=self._metadata)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/grpc/_channel.py", line 1536, in __call__
    return _end_unary_response_blocking(state, call, False, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "Uploading artifact file to local file system destination path is not supported."
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Uploading artifact file to local file system destination path is not supported.", grpc_status:13, created_time:"2024-05-01T03:01:32.[558](https://github.com/HyukjinKwon/spark/actions/runs/8904629949/job/24454181142#step:9:559)489983+00:00"}"
>
======================================================================
FAIL [24.378s]: test_binary_classes_logistic_regression (pyspark.ml.tests.connect.test_connect_classification.ClassificationTestsOnConnect.test_binary_classes_logistic_regression)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py", line 81, in test_binary_classes_logistic_regression
    self._check_result(result, expected_predictions, expected_probabilities)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py", line 41, in _check_result
    np.testing.assert_allclose(
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numpy/testing/_private/utils.py", line 1504, in assert_allclose
    assert_array_compare(compare, actual, desired, err_msg=str(err_msg),
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/contextlib.py", line 81, in inner
    return func(*args, **kwds)
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numpy/testing/_private/utils.py", line 797, in assert_array_compare
    raise AssertionError(msg)
AssertionError: 
Not equal to tolerance rtol=0.1, atol=0
Mismatched elements: 1 / 4 (25%)
Max absolute difference: 0.03528662
Max relative difference: 0.22001187
 x: array([[0.217866, 0.782134],
       [0.804328, 0.19[567](https://github.com/HyukjinKwon/spark/actions/runs/8904629949/job/24454181142#step:9:568)2]])
 y: array([[0.217875, 0.782125],
       [0.839615, 0.160385]])
----------------------------------------------------------------------
======================================================================
ERROR [3.966s]: test_regressor_evaluator (pyspark.ml.tests.connect.test_connect_evaluation.EvaluationTestsOnConnect.test_regressor_evaluator)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py", line 69, in test_regressor_evaluator
    rmse = rmse_evaluator.evaluate(df1)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/base.py", line 255, in evaluate
    return self._evaluate(dataset)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/evaluation.py", line 70, in _evaluate
    return aggregate_dataframe(
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/util.py", line 93, in aggregate_dataframe
    state = cloudpickle.loads(state)
            ^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute '_class_setstate' on <module 'pyspark.cloudpickle.cloudpickle' from '/home/runner/work/spark/spark-3.5/python/pyspark/cloudpickle/cloudpickle.py'>
----------------------------------------------------------------------
======================================================================
ERROR [4.664s]: test_copy (pyspark.ml.tests.connect.test_connect_tuning.CrossValidatorTestsOnConnect.test_copy)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py", line 115, in test_copy
    cvModel = cv.fit(dataset)
              ^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/base.py", line 106, in fit
    return self._fit(dataset)
           ^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/tuning.py", line 437, in _fit
    for j, metric in pool.imap_unordered(lambda f: f(), tasks):
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/multiprocessing/pool.py", line 873, in next
    raise value
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    ^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/tuning.py", line 437, in <lambda>
    for j, metric in pool.imap_unordered(lambda f: f(), tasks):
                                                   ^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/tuning.py", line 188, in single_task
    metric = evaluator.evaluate(
             ^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/base.py", line 255, in evaluate
    return self._evaluate(dataset)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/evaluation.py", line 70, in _evaluate
    return aggregate_dataframe(
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/util.py", line 93, in aggregate_dataframe
    state = cloudpickle.loads(state)
            ^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute '_class_setstate' on <module 'pyspark.cloudpickle.cloudpickle' from '/home/runner/work/spark/spark-3.5/python/pyspark/cloudpickle/cloudpickle.py'>

======================================================================
ERROR [3.938s]: test_fit_minimize_metric (pyspark.ml.tests.connect.test_connect_tuning.CrossValidatorTestsOnConnect.test_fit_minimize_metric)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py", line 149, in test_fit_minimize_metric
    cvModel = cv.fit(dataset)
              ^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/base.py", line 106, in fit
    return self._fit(dataset)
           ^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/tuning.py", line 437, in _fit
    for j, metric in pool.imap_unordered(lambda f: f(), tasks):
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/multiprocessing/pool.py", line 873, in next
    raise value
  File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    ^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/tuning.py", line 437, in <lambda>
    for j, metric in pool.imap_unordered(lambda f: f(), tasks):
                                                   ^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/tuning.py", line 188, in single_task
    metric = evaluator.evaluate(
             ^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/base.py", line 255, in evaluate
    return self._evaluate(dataset)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/evaluation.py", line 70, in _evaluate
    return aggregate_dataframe(
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/util.py", line 93, in aggregate_dataframe
    state = cloudpickle.loads(state)
            ^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute '_class_setstate' on <module 'pyspark.cloudpickle.cloudpickle' from '/home/runner/work/spark/spark-3.5/python/pyspark/cloudpickle/cloudpickle.py'>
======================================================================
ERROR [1.488s]: test_listener_events (pyspark.sql.tests.connect.streaming.test_parity_listener.StreamingListenerParityTests.test_listener_events)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py", line 53, in test_listener_events
    self.spark.streams.addListener(test_listener)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 244, in addListener
    self._execute_streaming_query_manager_cmd(cmd)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 260, in _execute_streaming_query_manager_cmd
    (_, properties) = self._session.client.execute_command(exec_cmd)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 982, in execute_command
    data, _, _, _, properties = self._execute_and_fetch(req)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1503, in _handle_error
    self._handle_rpc_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
    raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.EOFException) 
----------------------------------------------------------------------
======================================================================
FAIL [1.975s]: test_stream_exception (pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 287, in test_stream_exception
    sq.processAllAvailable()
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 129, in processAllAvailable
    self._execute_streaming_query_cmd(cmd)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 177, in _execute_streaming_query_cmd
    (_, properties) = self._session.client.execute_command(exec_cmd)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 982, in execute_command
    data, _, _, _, properties = self._execute_and_fetch(req)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line [150](https://github.com/HyukjinKwon/spark/actions/runs/8907172876/job/24460568471#step:9:151)3, in _handle_error
    self._handle_rpc_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
    raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] Query [id = 38d0d145-1f57-4b92-b317-d9de727d9468, runId = 2b963119-d391-4c62-abea-970274859b80] terminated with exception: Job aborted due to stage failure: Task 0 in stage 79.0 failed 1 times, most recent failure: Lost task 0.0 in stage 79.0 (TID 116) (fv-az1144-341.tm43j05r3bqe3lauap1nzddazg.ex.internal.cloudapp.net executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 224, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 145, in dump_stream
    for obj in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 213, in _batched
    for item in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 112, in <lambda>
    return args_kwargs_offsets, lambda *a: func(*a)
                                           ^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/util.py", line 118, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3....
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 291, in test_stream_exception
    self._assert_exception_tree_contains_msg(e, "ZeroDivisionError")
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 300, in _assert_exception_tree_contains_msg
    self._assert_exception_tree_contains_msg_connect(exception, msg)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 305, in _assert_exception_tree_contains_msg_connect
    self.assertTrue(
AssertionError: False is not true : Exception tree doesn't contain the expected message: ZeroDivisionError
----------------------------------------------------------------------
======================================================================
FAIL [0.169s]: test_checking_csv_header (pyspark.sql.tests.connect.test_parity_datasources.DataSourcesParityTests.test_checking_csv_header)
----------------------------------------------------------------------
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) [FAILED_READ_FILE.NO_HINT] Encountered error while reading file file:///home/runner/work/spark/spark-3.5/python/target/38acabf5-710b-4c21-b359-f61619e2adc7/tmpm7qyq23g/part-00000-d6c8793b-772d-44e7-bcca-6eeae9cc0ec7-c000.csv.  SQLSTATE: KD001
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/test_datasources.py", line [167](https://github.com/HyukjinKwon/spark/actions/runs/8908464265/job/24464135564#step:9:168), in test_checking_csv_header
    self.assertRaisesRegex(
AssertionError: "CSV header does not conform to the schema" does not match "(org.apache.spark.SparkException) [FAILED_READ_FILE.NO_HINT] Encountered error while reading file file:///home/runner/work/spark/spark-3.5/python/target/38acabf5-710b-4c21-b359-f61619e2adc7/tmpm7qyq23g/part-00000-d6c8793b-772d-44e7-bcca-6eeae9cc0ec7-c000.csv.  SQLSTATE: KD001"
----------------------------------------------------------------------
======================================================================
ERROR [0.059s]: test_large_variable_types (pyspark.sql.tests.connect.test_parity_pandas_map.MapInPandasParityTests.test_large_variable_types)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_map.py", line 115, in test_large_variable_types
    actual = df.mapInPandas(func, "str string, bin binary").collect()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/dataframe.py", line 1645, in collect
    table, schema = self._session.client.to_table(query)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 858, in to_table
    table, schema, _, _, _ = self._execute_and_fetch(req)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1503, in _handle_error
    self._handle_rpc_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
    raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.IllegalArgumentException: [INVALID_PARAMETER_VALUE.CHARSET] The value of parameter(s) `charset` in `encode` is invalid: expects one of the charsets 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16', but got utf8. SQLSTATE: 2[202](https://github.com/HyukjinKwon/spark/actions/runs/8909131027/job/24465959134#step:9:203)3

----------------------------------------------------------------------
======================================================================
FAIL [1.071s]: test_pandas_udf_arrow_overflow (pyspark.sql.tests.connect.test_parity_pandas_udf.PandasUDFParityTests.test_pandas_udf_arrow_overflow)
----------------------------------------------------------------------
pyspark.errors.exceptions.connect.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 302, in _create_array
    return pa.Array.from_pandas(
           ^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/array.pxi", line 1054, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 323, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Integer value 128 not in range: -128 to 127

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 104, in dump_stream
    for batch in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 525, in init_stream_yield_batches
    batch = self._create_batch(series)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 511, in _create_batch
    arrs.append(self._create_array(s, t, arrow_cast=self._arrow_cast))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 330, in _create_array
    raise PySparkValueError(error_msg % (series.dtype, series.na...

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_udf.py", line 299, in test_pandas_udf_arrow_overflow
    with self.assertRaisesRegex(
AssertionError: "Exception thrown when converting pandas.Series" does not match "
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 302, in _create_array
    return pa.Array.from_pandas(
           ^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/array.pxi", line 1054, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 323, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Integer value 128 not in range: -128 to 127

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream

Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_udf.py", line 279, in test_pandas_udf_detect_unsafe_type_conversion
    with self.assertRaisesRegex(
AssertionError: "Exception thrown when converting pandas.Series" does not match "
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line [302](https://github.com/HyukjinKwon/spark/actions/runs/8916220872/job/24487232590#step:9:303), in _create_array
    return pa.Array.from_pandas(
           ^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/array.pxi", line 1054, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 323, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Float value 0.5 was truncated converting to int32

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 104, in dump_stream
    for batch in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 525, in init_stream_yield_batches
    batch = self._create_batch(series)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 511, in _create_batch
    arrs.append(self._create_array(s, t, arrow_cast=self._arrow_cast))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 330, in _create_array
    raise PySparkValueError(error_msg % (series.dtype, ser..."

----------------------------------------------------------------------
======================================================================
FAIL [0.162s]: test_vectorized_udf_exception (pyspark.sql.tests.connect.test_parity_pandas_udf_scalar.PandasUDFScalarParityTests.test_vectorized_udf_exception)
----------------------------------------------------------------------
pyspark.errors.exceptions.connect.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 104, in dump_stream
    for batch in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 524, in init_stream_yield_batches
    for series in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 146, in <lambda>
    verify_result_length(verify_result_type(func(*a)), len(a[0])),
                                            ^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/util.py", line 118, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py", line 650, in <lambda>
    scalar_raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
                                                       ~~^~...
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py", line 35, in test_vectorized_udf_exception
    self.check_vectorized_udf_exception()
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py", line 658, in check_vectorized_udf_exception
    with self.assertRaisesRegex(Exception, "division( or modulo)? by zero"):
AssertionError: "division( or modulo)? by zero" does not match "
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 104, in dump_stream
    for batch in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 524, in init_stream_yield_batches
    for series in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 146, in <lambda>
    verify_result_length(verify_result_type(func(*a)), len(a[0])),
                                            ^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/util.py", line 118, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py", line 650, in <lambda>
    scalar_raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
                                                       ~~^~..."
----------------------------------------------------------------------
======================================================================
FAIL [0.103s]: test_udtf_init_with_additional_args (pyspark.sql.tests.connect.test_parity_udtf.ArrowUDTFParityTests.test_udtf_init_with_additional_args)
----------------------------------------------------------------------
pyspark.errors.exceptions.connect.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1816, in main
    func, profiler, deserializer, serializer = read_udtf(pickleSer, infile, eval_type)
    self._check_result_or_exception(TestUDTF, ret_type, expected)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/test_udtf.py", line 598, in _check_result_or_exception
    with self.assertRaisesRegex(err_type, expected):
AssertionError: "AttributeError" does not match "
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 224, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 145, in dump_stream
    for obj in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 213, in _batched
    for item in iterator:
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1391, in mapper
    yield eval(*[a[o] for o in args_kwargs_offsets])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1371, in evaluate
    return tuple(map(verify_and_convert_result, res))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1340, in verify_and_convert_result
    return toInternal(result)
           ^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1291, in toInternal
    return tuple(
           ^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1292, in <genexpr>
    f.toInternal(v) if c else v
    ^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 907, in toInternal
    return self.dataType.toInternal(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 372, in toInternal
    calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())
            ..."
======================================================================
FAIL [0.096s]: test_udtf_init_with_additional_args (pyspark.sql.tests.connect.test_parity_udtf.UDTFParityTests.test_udtf_init_with_additional_args)
----------------------------------------------------------------------
pyspark.errors.exceptions.connect.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1816, in main
    func, profiler, deserializer, serializer = read_udtf(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 946, in read_udtf
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_CONSTRUCTOR_INVALID_NO_ANALYZE_METHOD] Failed to evaluate the user-defined table function 'TestUDTF' because its constructor is invalid: the function does not implement the 'analyze' method, and its constructor has more than one argument (including the 'self' reference). Please update the table function so that its constructor accepts exactly one 'self' argument, and try the query again.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/test_udtf.py", line 274, in test_udtf_init_with_additional_args
    with self.assertRaisesRegex(
AssertionError: "__init__\(\) missing 1 required positional argument: 'a'" does not match "
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1816, in main
    func, profiler, deserializer, serializer = read_udtf(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 946, in read_udtf
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_CONSTRUCTOR_INVALID_NO_ANALYZE_METHOD] Failed to evaluate the user-defined table function 'TestUDTF' because its constructor is invalid: the function does not implement the 'analyze' method, and its constructor has more than one argument (including the 'self' reference). Please update the table function so that its constructor accepts exactly one 'self' argument, and try the query again.
"
======================================================================
FAIL [0.087s]: test_udtf_with_wrong_num_input (pyspark.sql.tests.connect.test_parity_udtf.UDTFParityTests.test_udtf_with_wrong_num_input)
----------------------------------------------------------------------
pyspark.errors.exceptions.connect.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1816, in main
    func, profiler, deserializer, serializer = read_udtf(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1082, in read_udtf
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EVAL_METHOD_ARGUMENTS_DO_NOT_MATCH_SIGNATURE] Failed to evaluate the user-defined table function 'TestUDTF' because the function arguments did not match the expected signature of the 'eval' method (missing a required argument: 'a'). Please update the query so that this table function call provides arguments matching the expected signature, or else update the table function so that its 'eval' method accepts the provided arguments, and then try the query again.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/test_udtf.py", line 255, in test_udtf_with_wrong_num_input
    with self.assertRaisesRegex(
AssertionError: "eval\(\) missing 1 required positional argument: 'a'" does not match "
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1816, in main
    func, profiler, deserializer, serializer = read_udtf(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1082, in read_udtf
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EVAL_METHOD_ARGUMENTS_DO_NOT_MATCH_SIGNATURE] Failed to evaluate the user-defined table function 'TestUDTF' because the function arguments did not match the expected signature of the 'eval' method (missing a required argument: 'a'). Please update the query so that this table function call provides arguments matching the expected signature, or else update the table function so that its 'eval' method accepts the provided arguments, and then try the query again.
"
----------------------------------------------------------------------
======================================================================
ERROR [0.023s]: test_assert_approx_equal_decimaltype_custom_rtol_pass (pyspark.sql.tests.connect.test_utils.ConnectUtilsTests.test_assert_approx_equal_decimaltype_custom_rtol_pass)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/test_utils.py", line 279, in test_assert_approx_equal_decimaltype_custom_rtol_pass
    assertDataFrameEqual(df1, df2, rtol=1e-1)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/testing/utils.py", line 595, in assertDataFrameEqual
    actual_list = actual.collect()
                  ^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/dataframe.py", line 1645, in collect
    table, schema = self._session.client.to_table(query)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 858, in to_table
    table, schema, _, _, _ = self._execute_and_fetch(req)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1503, in _handle_error
    self._handle_rpc_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
    raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.ArithmeticException: [NUMERIC_VALUE_OUT_OF_RANGE.WITH_SUGGESTION]  83.14 cannot be represented as Decimal(4, 3). If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error, and return NULL instead. SQLSTATE: 22003
----------------------------------------------------------------------
======================================================================
ERROR [0.024s]: test_assert_approx_equal_decimaltype_custom_rtol_pass (pyspark.sql.tests.connect.test_utils.ConnectUtilsTests.test_assert_approx_equal_decimaltype_custom_rtol_pass)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/test_utils.py", line 279, in test_assert_approx_equal_decimaltype_custom_rtol_pass
    assertDataFrameEqual(df1, df2, rtol=1e-1)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/testing/utils.py", line 595, in assertDataFrameEqual
    actual_list = actual.collect()
                  ^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/dataframe.py", line 1645, in collect
    table, schema = self._session.client.to_table(query)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 858, in to_table
    table, schema, _, _, _ = self._execute_and_fetch(req)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1503, in _handle_error
    self._handle_rpc_error(error)
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
    raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.ArithmeticException: [NUMERIC_VALUE_OUT_OF_RANGE.WITH_SUGGESTION]  83.14 cannot be represented as Decimal(4, 3). If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error, and return NULL instead. SQLSTATE: 22003
----------------------------------------------------------------------
======================================================================
ERROR [1.591s]: test_apply_batch_with_type (pyspark.pandas.tests.connect.computation.test_parity_apply_func.FrameParityApplyFunctionTests.test_apply_batch_with_type)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/tests/computation/test_apply_func.py", line 249, in test_apply_batch_with_type
    def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:
                        ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/frame.py", line 13647, in __class_getitem__
    return create_tuple_for_frame_type(params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/typedef/typehints.py", line 717, in create_tuple_for_frame_type
    return Tuple[_to_type_holders(params)]
                 ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/typedef/typehints.py", line 762, in _to_type_holders
    data_types = _new_type_holders(data_types, NameTypeHolder)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/typedef/typehints.py", line 828, in _new_type_holders
    raise TypeError(
TypeError: Type hints should be specified as one of:
  - DataFrame[type, type, ...]
  - DataFrame[name: type, name: type, ...]
  - DataFrame[dtypes instance]
  - DataFrame[zip(names, types)]
  - DataFrame[index_type, [type, ...]]
  - DataFrame[(index_name, index_type), [(name, type), ...]]
  - DataFrame[dtype instance, dtypes instance]
  - DataFrame[(index_name, index_type), zip(names, types)]
  - DataFrame[[index_type, ...], [type, ...]]
  - DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
  - DataFrame[dtypes instance, dtypes instance]
  - DataFrame[zip(index_names, index_types), zip(names, types)]
However, got (<class 'int'>, typing.List[int]).
----------------------------------------------------------------------
======================================================================
ERROR [1.619s]: test_apply_batch_with_type (pyspark.pandas.tests.connect.computation.test_parity_apply_func.FrameParityApplyFunctionTests.test_apply_batch_with_type)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/tests/computation/test_apply_func.py", line 265, in test_apply_batch_with_type
    ) -> ps.DataFrame[float, [int, ntp.NDArray[int]]]:
         ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/frame.py", line 13647, in __class_getitem__
    return create_tuple_for_frame_type(params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/typedef/typehints.py", line 717, in create_tuple_for_frame_type
    return Tuple[_to_type_holders(params)]
                 ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/typedef/typehints.py", line 762, in _to_type_holders
    data_types = _new_type_holders(data_types, NameTypeHolder)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/typedef/typehints.py", line 828, in _new_type_holders
    raise TypeError(
TypeError: Type hints should be specified as one of:
  - DataFrame[type, type, ...]
  - DataFrame[name: type, name: type, ...]
  - DataFrame[dtypes instance]
  - DataFrame[zip(names, types)]
  - DataFrame[index_type, [type, ...]]
  - DataFrame[(index_name, index_type), [(name, type), ...]]
  - DataFrame[dtype instance, dtypes instance]
  - DataFrame[(index_name, index_type), zip(names, types)]
  - DataFrame[[index_type, ...], [type, ...]]
  - DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
  - DataFrame[dtypes instance, dtypes instance]
  - DataFrame[zip(index_names, index_types), zip(names, types)]
However, got (<class 'int'>, numpy.ndarray[typing.Any, numpy.dtype[int]]).
----------------------------------------------------------------------
======================================================================
ERROR [1.897s]: test_apply_batch_with_type (pyspark.pandas.tests.connect.computation.test_parity_apply_func.FrameParityApplyFunctionTests.test_apply_batch_with_type)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/tests/computation/test_apply_func.py", line 281, in test_apply_batch_with_type
    def identify4(x) -> ps.DataFrame[[int, str], [int, List[int]]]:
                        ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/frame.py", line 13647, in __class_getitem__
    return create_tuple_for_frame_type(params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/typedef/typehints.py", line 717, in create_tuple_for_frame_type
    return Tuple[_to_type_holders(params)]
                 ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/typedef/typehints.py", line 762, in _to_type_holders
    data_types = _new_type_holders(data_types, NameTypeHolder)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/runner/work/spark/spark-35/python/pyspark/pandas/typedef/typehints.py", line 828, in _new_type_holders
    raise TypeError(
TypeError: Type hints should be specified as one of:
  - DataFrame[type, type, ...]
  - DataFrame[name: type, name: type, ...]
  - DataFrame[dtypes instance]
  - DataFrame[zip(names, types)]
  - DataFrame[index_type, [type, ...]]
  - DataFrame[(index_name, index_type), [(name, type), ...]]
  - DataFrame[dtype instance, dtypes instance]
  - DataFrame[(index_name, index_type), zip(names, types)]
  - DataFrame[[index_type, ...], [type, ...]]
  - DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
  - DataFrame[dtypes instance, dtypes instance]
  - DataFrame[zip(index_names, index_types), zip(names, types)]
However, got (<class 'int'>, typing.List[int]).
----------------------------------------------------------------------

@HyukjinKwon HyukjinKwon force-pushed the SPARK-48054 branch 5 times, most recently from 2036352 to 1ee91df Compare May 1, 2024 07:48
@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented May 2, 2024

Doctest:

File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/dataframe.py", line 1057, in pyspark.sql.connect.dataframe.DataFrame.union
Failed example:
    df3.show()
Exception raised:
    Traceback (most recent call last):
      File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/doctest.py", line 1355, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest pyspark.sql.connect.dataframe.DataFrame.union[10]>", line 1, in <module>
        df3.show()
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/dataframe.py", line 996, in show
        print(self._show_string(n, truncate, vertical))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/dataframe.py", line 753, in _show_string
        ).toPandas()
          ^^^^^^^^^^
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/dataframe.py", line 1663, in toPandas
        return self._session.client.to_pandas(query)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 873, in to_pandas
        table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
                                                      ^^^^^^^^^^^^^^^^^^^^^^^^
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
        for response in self._execute_and_fetch_as_iterator(req):
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
        self._handle_error(error)
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 1503, in _handle_error
        self._handle_rpc_error(error)
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
        raise convert_exception(info, status.message) from None
    pyspark.errors.exceptions.connect.NumberFormatException: [CAST_INVALID_INPUT] The value 'Alice' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22018
    JVM stacktrace:
    org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value 'Alice' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22018
    	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:145)
    	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51)
    	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toLongExact(UTF8StringUtils.scala:31)
    	at org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castToLong$2(Cast.scala:770)
    	at org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castToLong$2$adapted(Cast.scala:770)
    	at org.apache.spark.sql.catalyst.expressions.Cast.buildCast(Cast.scala:565)
    	at org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castToLong...
**********************************************************************
File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/functions.py", line 3546, in pyspark.sql.connect.functions.current_database
Failed example:
    spark.range(1).select(current_database()).show()
Expected:
    +------------------+
    |current_database()|
    +------------------+
    |           default|
    +------------------+
Got:
    +----------------+
    |current_schema()|
    +----------------+
    |         default|
    +----------------+
    <BLANKLINE>
**********************************************************************
File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/functions.py", line 3547, in pyspark.sql.connect.functions.current_schema
Failed example:
    spark.range(1).select(sf.current_schema()).show()
Expected:
    +------------------+
    |current_database()|
    +------------------+
    |           default|
    +------------------+
Got:
    +----------------+
    |current_schema()|
    +----------------+
    |         default|
    +----------------+
    <BLANKLINE>
**********************************************************************
File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/functions.py", line 3310, in pyspark.sql.connect.functions.to_unix_timestamp
Failed example:
    df.select(to_unix_timestamp(df.e).alias('r')).collect()
Exception raised:
    Traceback (most recent call last):
      File "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/doctest.py", line 1355, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest pyspark.sql.connect.functions.to_unix_timestamp[6]>", line 1, in <module>
        df.select(to_unix_timestamp(df.e).alias('r')).collect()
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/dataframe.py", line 1645, in collect
        table, schema = self._session.client.to_table(query)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 858, in to_table
        table, schema, _, _, _ = self._execute_and_fetch(req)
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
        for response in self._execute_and_fetch_as_iterator(req):
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
        self._handle_error(error)
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 1503, in _handle_error
        self._handle_rpc_error(error)
      File "/home/runner/work/spark/spark-35/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
        raise convert_exception(info, status.message) from None
    pyspark.errors.exceptions.connect.DateTimeException: [CANNOT_PARSE_TIMESTAMP] Text '2016-04-08' could not be parsed at index 10. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22007
    JVM stacktrace:
    org.apache.spark.SparkDateTimeException: [CANNOT_PARSE_TIMESTAMP] Text '2016-04-08' could not be parsed at index 10. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22007
    	at org.apache.spark.sql.errors.QueryExecutionErrors$.ansiDateTimeParseError(QueryExecutionErrors.scala:271)
    	at org.apache.spark.sql.catalyst.expressions.ToTimestamp.eval(datetimeExpressions.scala:1300)
    	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:159)
    	at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:89)
    	at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$48.$anonfun$applyOrElse$82(Optimizer.scala:2[208](https://github.com/HyukjinKwon/spark/actions/runs/8918871289/job/24494177776#step:9:209))
    	at scala.collection.immutable.List.map(List.scala:247)
    	at scala.collection.immutable.List.map(List.scala:79)
    	at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$48.applyOrElse(Optimizer.scala:[220](https://github.com/HyukjinKwon/spark/actions/runs/8918871289/job/24494177776#step:9:221)8)
    	at org.apache.spark.sql.catalyst.optimizer...
**********************************************************************

@HyukjinKwon HyukjinKwon force-pushed the SPARK-48054 branch 6 times, most recently from aa0bd35 to e8c0bba Compare May 2, 2024 07:23
@HyukjinKwon
Copy link
Member Author

@HyukjinKwon HyukjinKwon marked this pull request as ready for review May 3, 2024 01:30

on:
schedule:
- cron: '0 21 * * *'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Ya, +1 for adding this and this looks not a heavy.

After adding this, please monitor once if it works expected in the total budget.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer!

@HyukjinKwon
Copy link
Member Author

Merged to master.

HyukjinKwon added a commit that referenced this pull request May 3, 2024
…5 client <> 4.0 server

### What changes were proposed in this pull request?

This PR proposes to skip the tests that fail with 3.5 client and 4.0 server in Spark Connect (by adding `SPARK_SKIP_CONNECT_COMPAT_TESTS`). This is a base work for #46298. This partially backports #45870

This PR also adds `SPARK_CONNECT_TESTING_REMOTE` environment variable so developers can run PySpark unittests against a Spark Connect server.

### Why are the changes needed?

In order to set up the CI that tests 3.5 client and 4.0 server in Spark Connect.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Tested it in my fork, see #46298

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46334 from HyukjinKwon/SPARK-48088.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request May 6, 2024
…to SPARK_SKIP_CONNECT_COMPAT_TESTS

### What changes were proposed in this pull request?

This PR is a followup of #46298 that properly uses the environment variable SPARK_SKIP_CONNECT_COMPAT_TESTS added at #46334

### Why are the changes needed?

To recover the compatibility test (https://github.com/apache/spark/actions/runs/8961109352/job/24608366499).

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46394 from HyukjinKwon/SPARK-48054-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
… Spark Connect

### What changes were proposed in this pull request?

This PR is a tentative try to run Spark 3.5 tests with Python Client 3.5 against Spark Connect server 4.0.

### Why are the changes needed?

In order to make sure the compatibility between 3.5 and 4.0.

### Does this PR introduce _any_ user-facing change?

No, testing-only.

### How was this patch tested?

Within my own fork.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46298 from HyukjinKwon/SPARK-48054.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
…to SPARK_SKIP_CONNECT_COMPAT_TESTS

### What changes were proposed in this pull request?

This PR is a followup of apache#46298 that properly uses the environment variable SPARK_SKIP_CONNECT_COMPAT_TESTS added at apache#46334

### Why are the changes needed?

To recover the compatibility test (https://github.com/apache/spark/actions/runs/8961109352/job/24608366499).

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46394 from HyukjinKwon/SPARK-48054-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants