New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45733][CONNECT][PYTHON] Support multiple retry policies #43591
Conversation
meow |
@grundprinzip and @nija-at let me know if this is good to go. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks quite good overall. Someone knowing python should also review.
Do you plan to work on a scala equivalent later as well?
""" | ||
Conceptually the remote spark session that communicates with the server | ||
""" | ||
class DefaultPolicy(RetryPolicy): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DefaultRetryPolicy
def register_retry_policy(self, policy: RetryPolicy): | ||
if policy.name in self._known_retry_policies: | ||
raise ValueError("Already known policy") | ||
self._known_retry_policies[policy.name] = policy | ||
|
||
def set_retry_policies(self, policies: List[str]): | ||
self._retry_policies = [self._known_retry_policies[name] for name in policies] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the functionality of registering policies that we are not going to be using?
is there a use case for adding and removing policies?
unless there's a good reason to have a separate register
and set
, maybe simplify it and just set is enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider more options, but my rationale here was that it makes it easier for users to configure retry policies without needing to toss objects around. Policy consists not just of the class but also parameters set on the instance.
For example, suppose you have a client which shipped with policies ["policyA", "policyB", "policyC"], and for some reason you aren't happy with policyB. Than it's easier to configure this in only one call without having to obtain objects for policyA, policyC. Also convenient to add you own PolicyD in the mix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just wondering if any user that is a "power" user enough to work with changing these policies would not be comfortable with tossing these objects around anyway. E.g. if they would want to tweak the parameters that you mention, they would anyway have to deal with the object, instantiate it with different parameters, register it and then set it.
For such a power user, it actually may be cleaner to just work with these objects directly instead of having a two step registration and setting.
But I don't have a strong opinion here, I am also fine with keeping these extra APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they would anyway have to deal with the object
probably not really needs obtaining the actualy object, but rather import statement from somewhere, and a new instance construction
Don't really object other ways to do it, just explaining why I did that :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a change in API which now also allows passing object directly
class RetryException(Exception): | ||
""" | ||
An exception that can be thrown upstream when inside retry and which will be retryable | ||
regardless of policy. | ||
An exception that can be thrown upstream when inside retry and which is always retryable | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe move to retries.py? I think it shouldn't have landed in this file in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved
self._done = False | ||
|
||
def can_retry(self, exception: BaseException) -> bool: | ||
return any(policy.can_retry(exception) for policy in self._policies) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
different policies can have different attempt numbers.
by passing only exception
how can a policy determine if it can retry, without e.g. knowing that this is the 10th attempt, while that policy allows only 4 attempts?
edit: I see, this will check if the exception can even possibly be retried, and the
wait_time = policy.next_attempt()
if wait_time is not None:
will actually check if next attempt is allowed.
maybe add this to can_retry documentation, and a comment here, that it only checks if the exception can be retried here, and whether it has more retry attempts is checked later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a behavior I've implemented specifically to match the current design -- it also does classify errors into "retryable" and "not retryable", and the logic of spacing retries is separate.
I can add some more comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it matches the current design. Just took me a moment to understand so some additional comments might be useful.
# Exceeded retries | ||
logger.debug(f"Given up on retrying. error: {exception}") | ||
raise exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from offline discussion / unrelated to the core "support multiple retry policies":
maybe it would be nice to throw a specific exception type like "RetryExceeded", with all the previously retried exceptions wrapped inside? A user is unlikely to catch on exceptions that are normally retried, but they may want to catch one special exception type that will give them information what transient errors persisted and for how long, over how many retries etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems sensible, we can do that
Also yes, scala follow-up is definitely planned, as soon as we finalize the design here |
def name(self): | ||
return self.__class__.__name__ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using self.__class__.__name__
will prevent us from registering a separate instance of the same class with different parameters, e.g. different number of retries, or different timing.
def register_retry_policy(self, policy: RetryPolicy): | ||
""" | ||
Registers specified policy in the dictionary of known policies. | ||
|
||
To activate it, use set_retry_policies(). | ||
""" | ||
if policy.name in self._known_retry_policies: | ||
raise ValueError("Already known policy") | ||
self._known_retry_policies[policy.name] = policy | ||
|
||
def set_retry_policies(self, policies: List[Union[str, RetryPolicy]]): | ||
""" | ||
Sets list of policies to be used for retries. | ||
I.e. set_retry_policies(["DefaultPolicy", "CustomPolicy"]). | ||
|
||
If policy is given as string, the policy object is sourced from previously | ||
registered policies. | ||
Specifying policy directly bypasses the registration mechanism. | ||
""" | ||
self._retry_policies = [ | ||
policy if isinstance(policy, RetryPolicy) else self._known_retry_policies[policy] | ||
for policy in policies | ||
] | ||
|
||
def get_retry_policies(self) -> List[str]: | ||
""" | ||
Return list of currently used policies, i.e. ["DefaultPolicy", "MyCustomPolicy"]. | ||
""" | ||
return [policy.name for policy in self._retry_policies] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion the registering of policies by name is still superfluous. Especially that policy.name
is currently generated only from policy classname, so it's currently not possible to use a difference instance of the same policy with different parameters (e.g. different sleep timings). So I still think that any power-user who wants to customize it would just define it with their own instances of the objects.
I think that now having some policies be registered under a name in known_retry_policies, but the key there being derived only from class name so registering a different instance will overwrite the entry is confusing, and having the set_retry_policies being either a string from known_retry_policies, or an instance (which may not be part of known_retry_policies, so when it's returned by-name by get_retry_policies, the user can't do anything with that name) is also confusing.
WDYT @HyukjinKwon @grundprinzip ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I continue to not see sense in having the register_retry_policy
API and see confusion from either having known_retry_policies
identified by a string which is just a class name, and different instances of the same class name being easily confused, or by specific instance of the object. I see this as unnecessarily convoluted and not intuitive.
I would propose to leave just
def set_retry_policies(self, policies: List[RetryPolicy]):
def get_retry_policies(self) -> List[RetryPolicy]:
and get rid of the _known_retry_policies
.
I leave it to other interviewers if they disagree and think that the current API makes sense.
Otherwise, LGTM.
@juliuszsompolski I made a change to make it more easy to override and yet still keep the class name as default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for one comment, but someone who knows python like @HyukjinKwon should review.
def register_retry_policy(self, policy: RetryPolicy): | ||
""" | ||
Registers specified policy in the dictionary of known policies. | ||
|
||
To activate it, use set_retry_policies(). | ||
""" | ||
if policy.name in self._known_retry_policies: | ||
raise ValueError("Already known policy") | ||
self._known_retry_policies[policy.name] = policy | ||
|
||
def set_retry_policies(self, policies: List[Union[str, RetryPolicy]]): | ||
""" | ||
Sets list of policies to be used for retries. | ||
I.e. set_retry_policies(["DefaultPolicy", "CustomPolicy"]). | ||
|
||
If policy is given as string, the policy object is sourced from previously | ||
registered policies. | ||
Specifying policy directly bypasses the registration mechanism. | ||
""" | ||
self._retry_policies = [ | ||
policy if isinstance(policy, RetryPolicy) else self._known_retry_policies[policy] | ||
for policy in policies | ||
] | ||
|
||
def get_retry_policies(self) -> List[str]: | ||
""" | ||
Return list of currently used policies, i.e. ["DefaultPolicy", "MyCustomPolicy"]. | ||
""" | ||
return [policy.name for policy in self._retry_policies] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I continue to not see sense in having the register_retry_policy
API and see confusion from either having known_retry_policies
identified by a string which is just a class name, and different instances of the same class name being easily confused, or by specific instance of the object. I see this as unnecessarily convoluted and not intuitive.
I would propose to leave just
def set_retry_policies(self, policies: List[RetryPolicy]):
def get_retry_policies(self) -> List[RetryPolicy]:
and get rid of the _known_retry_policies
.
I leave it to other interviewers if they disagree and think that the current API makes sense.
Otherwise, LGTM.
Ok, I don't object to switch to what's proposed above, that seems also to do what's needed. EDIT: Done |
Should be good to go |
argh, seems there's a conflict against latest master branch 😢 |
Apparently conflicted with some @grundprinzip's changes you merged just before mine :). I resolved, should be mergeable after CI |
Ready to merge again |
Merged to master. |
### What changes were proposed in this pull request? Support multiple retry policies defined at the same time. Each policy determines which error types it can retry and how exactly those should be spread out. Scala parity for apache#43591 ### Why are the changes needed? Different error types should be treated differently For instance, networking connectivity errors and remote resources being initialized should be treated separately. ### Does this PR introduce _any_ user-facing change? No (as long as user doesn't poke within client internals). ### How was this patch tested? Unit tests, some hand testing. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43757 from cdkrot/SPARK-45851-scala-multiple-policies. Authored-by: Alice Sayutina <alice.sayutina@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…o multiple policies) ### What changes were proposed in this pull request? Follow up to #43591. Refactor default policy arguments into being an arguments on the class, not within core.py ### Why are the changes needed? General refactoring, also makes it easier for other policies to derive. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing coverage ### Was this patch authored or co-authored using generative AI tooling? No Closes #43800 from cdkrot/SPARK-45922. Authored-by: Alice Sayutina <alice.sayutina@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…t.client.test_client` if not should_test_connect ### What changes were proposed in this pull request? This is a follow-up of the following. - #43591 ### Why are the changes needed? This test requires `pandas` which is an optional dependency in Apache Spark. ``` $ python/run-tests --modules=pyspark-connect --parallelism=1 --python-executables=python3.10 --testnames 'pyspark.sql.tests.connect.client.test_client' Running PySpark tests. Output is in /Users/dongjoon/APACHE/spark-merge/python/unit-tests.log Will test against the following Python executables: ['python3.10'] Will test the following Python tests: ['pyspark.sql.tests.connect.client.test_client'] python3.10 python_implementation is CPython python3.10 version is: Python 3.10.13 Starting test(python3.10): pyspark.sql.tests.connect.client.test_client (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/216a8716-3a1f-4cf9-9c7c-63087f29f892/python3.10__pyspark.sql.tests.connect.client.test_client__tydue4ck.log) Traceback (most recent call last): File "/Users/dongjoon/.pyenv/versions/3.10.13/lib/python3.10/runpy.py", line 196, in _run_module_as_main return _run_code(code, main_globals, None, File "/Users/dongjoon/.pyenv/versions/3.10.13/lib/python3.10/runpy.py", line 86, in _run_code exec(code, run_globals) File "/Users/dongjoon/APACHE/spark-merge/python/pyspark/sql/tests/connect/client/test_client.py", line 137, in <module> class TestPolicy(DefaultPolicy): NameError: name 'DefaultPolicy' is not defined ``` ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Pass the CIs and manually test without `pandas`. ``` $ pip3 uninstall pandas $ python/run-tests --modules=pyspark-connect --parallelism=1 --python-executables=python3.10 --testnames 'pyspark.sql.tests.connect.client.test_client' Running PySpark tests. Output is in /Users/dongjoon/APACHE/spark-merge/python/unit-tests.log Will test against the following Python executables: ['python3.10'] Will test the following Python tests: ['pyspark.sql.tests.connect.client.test_client'] python3.10 python_implementation is CPython python3.10 version is: Python 3.10.13 Starting test(python3.10): pyspark.sql.tests.connect.client.test_client (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/acf07ed5-938a-4272-87e1-47e3bf8b988e/python3.10__pyspark.sql.tests.connect.client.test_client__sfdosnek.log) Finished test(python3.10): pyspark.sql.tests.connect.client.test_client (0s) ... 13 tests were skipped Tests passed in 0 seconds Skipped tests in pyspark.sql.tests.connect.client.test_client with python3.10: test_basic_flow (pyspark.sql.tests.connect.client.test_client.SparkConnectClientReattachTestCase) ... skip (0.002s) test_fail_and_retry_during_execute (pyspark.sql.tests.connect.client.test_client.SparkConnectClientReattachTestCase) ... skip (0.000s) test_fail_and_retry_during_reattach (pyspark.sql.tests.connect.client.test_client.SparkConnectClientReattachTestCase) ... skip (0.000s) test_fail_during_execute (pyspark.sql.tests.connect.client.test_client.SparkConnectClientReattachTestCase) ... skip (0.000s) test_channel_builder (pyspark.sql.tests.connect.client.test_client.SparkConnectClientTestCase) ... skip (0.000s) test_channel_builder_with_session (pyspark.sql.tests.connect.client.test_client.SparkConnectClientTestCase) ... skip (0.000s) test_interrupt_all (pyspark.sql.tests.connect.client.test_client.SparkConnectClientTestCase) ... skip (0.000s) test_is_closed (pyspark.sql.tests.connect.client.test_client.SparkConnectClientTestCase) ... skip (0.000s) test_properties (pyspark.sql.tests.connect.client.test_client.SparkConnectClientTestCase) ... skip (0.000s) test_retry (pyspark.sql.tests.connect.client.test_client.SparkConnectClientTestCase) ... skip (0.000s) test_retry_client_unit (pyspark.sql.tests.connect.client.test_client.SparkConnectClientTestCase) ... skip (0.000s) test_user_agent_default (pyspark.sql.tests.connect.client.test_client.SparkConnectClientTestCase) ... skip (0.000s) test_user_agent_passthrough (pyspark.sql.tests.connect.client.test_client.SparkConnectClientTestCase) ... skip (0.000s) ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45830 from dongjoon-hyun/SPARK-45733. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
Support multiple retry policies defined at the same time. Each policy determines which error types it can retry and how exactly those should be spread out.
Why are the changes needed?
Different error types should be treated differently For instance, networking connectivity errors and remote resources being initialized should be treated separately.
Does this PR introduce any user-facing change?
No (as long as user doesn't poke within client internals).
How was this patch tested?
Unit tests, some hand testing.
Was this patch authored or co-authored using generative AI tooling?
No