-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Close channel on grpc TRANSIENT_FAILURE #75
Conversation
It appears the channel should be closed on |
Doing code review. When running
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to load the branch to reproduce the error described in #74 - it still would keep the thread open.
Spending some time trying different things (which I don't have a good understanding of), I came across that setting:
@ pyzeebe/grpc_internals/zeebe_adapter_base.py:86 @ class ZeebeAdapterBase(object):
return self._max_connection_retries == -1 or self._current_connection_retries < self._max_connection_retries
def _common_zeebe_grpc_errors(self, rpc_error: grpc.RpcError):
+ try:
+ self._channel.close()
+ except Exception as err:
+ logger.exception(f"Failed to close channel, {type(err).__name__} exception was raised")
if self.is_error_status(rpc_error, grpc.StatusCode.RESOURCE_EXHAUSTED):
raise ZeebeBackPressure()
elif self.is_error_status(rpc_error, grpc.StatusCode.UNAVAILABLE):
would resolve the issue.
I've put the self._channel.close()
in a try/except block, not sure if it can throw an exception. In that case, it should be logged at least (could use logging.error|warning
as well, but this is an exceptional situation, so exception
seems to make sense).
Here's the output with this in place:
❯ python main.py
[2020-11-20 17:57:27,295] [18550 - MainThread] [DEBUG] asyncio - Using selector: KqueueSelector
[2020-11-20 17:57:27,323] [18550 - MainThread] [INFO] uvicorn.error - Started server process [18550]
[2020-11-20 17:57:27,323] [18550 - MainThread] [INFO] uvicorn.error - Waiting for application startup.
[2020-11-20 17:57:27,323] [18550 - MainThread] [INFO] uvicorn.error - Application startup complete.
[2020-11-20 17:57:27,324] [18550 - MainThread] [INFO] uvicorn.error - Uvicorn running on http://0.0.0.0:5001 (Press CTRL+C to quit)
[2020-11-20 17:57:38,519] [18550 - Thread-2] [DEBUG] pyzeebe.grpc_internals.zeebe_adapter_base - Grpc channel connectivity changed to: ChannelConnectivity.IDLE
[2020-11-20 17:57:38,520] [18550 - Thread-2] [DEBUG] pyzeebe.grpc_internals.zeebe_adapter_base - Connected to localhost:26599
[2020-11-20 17:57:38,521] [18550 - MainThread] [INFO] uvicorn.access - 127.0.0.1:59761 - "GET / HTTP/1.1" 500
[2020-11-20 17:57:38,521] [18550 - MainThread] [ERROR] uvicorn.error - Exception in ASGI application
Traceback (most recent call last):
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 389, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/uvicorn/middleware/message_logger.py", line 65, in __call__
raise exc from None
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/uvicorn/middleware/message_logger.py", line 61, in __call__
await self.app(scope, inner_receive, inner_send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/fastapi/applications.py", line 179, in __call__
await super().__call__(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/routing.py", line 566, in __call__
await route.handle(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/routing.py", line 227, in handle
await self.app(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
response = await func(request)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/fastapi/routing.py", line 183, in app
dependant=dependant, values=values, is_coroutine=is_coroutine
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/fastapi/routing.py", line 135, in run_endpoint_function
return await run_in_threadpool(dependant.call, **values)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
return await loop.run_in_executor(None, func, *args)
File "/Users/kristofferb/.pyenv/versions/3.7.6/lib/python3.7/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "main.py", line 14, in read_root
queued = client.run_workflow("test-workflow", {})
File "/Users/kristofferb/Code/fork/pyzeebe/pyzeebe/client/client.py", line 47, in run_workflow
version=version)
File "/Users/kristofferb/Code/fork/pyzeebe/pyzeebe/grpc_internals/zeebe_workflow_adapter.py", line 22, in create_workflow_instance
self._create_workflow_errors(rpc_error, bpmn_process_id, version, variables)
File "/Users/kristofferb/Code/fork/pyzeebe/pyzeebe/grpc_internals/zeebe_workflow_adapter.py", line 46, in _create_workflow_errors
self._common_zeebe_grpc_errors(rpc_error)
File "/Users/kristofferb/Code/fork/pyzeebe/pyzeebe/grpc_internals/zeebe_adapter_base.py", line 93, in _common_zeebe_grpc_errors
raise ZeebeGatewayUnavailable()
pyzeebe.exceptions.zeebe_exceptions.ZeebeGatewayUnavailable
^C[2020-11-20 17:57:45,904] [18550 - MainThread] [INFO] uvicorn.error - Shutting down
[2020-11-20 17:57:46,011] [18550 - MainThread] [INFO] uvicorn.error - Waiting for application shutdown.
[2020-11-20 17:57:46,012] [18550 - MainThread] [INFO] uvicorn.error - Application shutdown complete.
[2020-11-20 17:57:46,012] [18550 - MainThread] [INFO] uvicorn.error - Finished server process [18550]
Might a context manager be needed, to be able to ensure that the connection is closed on exception / when client instance is deleted? https://stackoverflow.com/questions/865115/how-do-i-correctly-clean-up-a-python-object |
I guess Pycharm doesn't use pylint even with the plugin installed :/ |
Could you take another look at this @kbakk? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well done, thanks!
When trying to reproduce with the following changes
@app.get("/")
def read_root():
- client = pyzeebe.ZeebeClient(hostname="localhost", port=NON_LISTENING_PORT)
+ client = pyzeebe.ZeebeClient(
+ hostname="localhost", port=NON_LISTENING_PORT, max_connection_retries=0
+ )
queued = client.run_workflow("test-workflow", {})
return {"queued": queued}
it works as desired:
python main.py
[2021-01-12 11:13:11,030] [67550 - MainThread] [DEBUG] asyncio - Using selector: KqueueSelector
[2021-01-12 11:13:11,071] [67550 - MainThread] [INFO] uvicorn.error - Started server process [67550]
[2021-01-12 11:13:11,071] [67550 - MainThread] [INFO] uvicorn.error - Waiting for application startup.
[2021-01-12 11:13:11,072] [67550 - MainThread] [INFO] uvicorn.error - Application startup complete.
[2021-01-12 11:13:11,072] [67550 - MainThread] [INFO] uvicorn.error - Uvicorn running on http://0.0.0.0:5001 (Press CTRL+C to quit)
[2021-01-12 11:13:18,977] [67550 - Thread-2] [DEBUG] pyzeebe.grpc_internals.zeebe_adapter_base - Grpc channel connectivity changed to: ChannelConnectivity.IDLE
[2021-01-12 11:13:18,977] [67550 - Thread-2] [DEBUG] pyzeebe.grpc_internals.zeebe_adapter_base - Connected to localhost:26599
[2021-01-12 11:13:19,187] [67550 - MainThread] [INFO] uvicorn.access - 127.0.0.1:51637 - "GET / HTTP/1.1" 500
[2021-01-12 11:13:19,188] [67550 - MainThread] [ERROR] uvicorn.error - Exception in ASGI application
Traceback (most recent call last):
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 389, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/uvicorn/middleware/message_logger.py", line 65, in __call__
raise exc from None
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/uvicorn/middleware/message_logger.py", line 61, in __call__
await self.app(scope, inner_receive, inner_send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/fastapi/applications.py", line 179, in __call__
await super().__call__(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/routing.py", line 566, in __call__
await route.handle(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/routing.py", line 227, in handle
await self.app(scope, receive, send)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
response = await func(request)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/fastapi/routing.py", line 183, in app
dependant=dependant, values=values, is_coroutine=is_coroutine
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/fastapi/routing.py", line 135, in run_endpoint_function
return await run_in_threadpool(dependant.call, **values)
File "/Users/kristofferb/Code/temp/pyzeebe-fastapi/.venv/lib/python3.7/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
return await loop.run_in_executor(None, func, *args)
File "/Users/kristofferb/.pyenv/versions/3.7.6/lib/python3.7/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "main.py", line 16, in read_root
queued = client.run_workflow("test-workflow", {})
File "/Users/kristofferb/Code/fork/pyzeebe/pyzeebe/client/client.py", line 47, in run_workflow
version=version)
File "/Users/kristofferb/Code/fork/pyzeebe/pyzeebe/grpc_internals/zeebe_workflow_adapter.py", line 22, in create_workflow_instance
self._create_workflow_errors(rpc_error, bpmn_process_id, version, variables)
File "/Users/kristofferb/Code/fork/pyzeebe/pyzeebe/grpc_internals/zeebe_workflow_adapter.py", line 46, in _create_workflow_errors
self._common_zeebe_grpc_errors(rpc_error)
File "/Users/kristofferb/Code/fork/pyzeebe/pyzeebe/grpc_internals/zeebe_adapter_base.py", line 89, in _common_zeebe_grpc_errors
raise ZeebeGatewayUnavailable()
pyzeebe.exceptions.zeebe_exceptions.ZeebeGatewayUnavailable
^C[2021-01-12 11:38:43,072] [67550 - MainThread] [INFO] uvicorn.error - Shutting down
[2021-01-12 11:38:43,181] [67550 - MainThread] [INFO] uvicorn.error - Waiting for application shutdown.
[2021-01-12 11:38:43,183] [67550 - MainThread] [INFO] uvicorn.error - Application shutdown complete.
[2021-01-12 11:38:43,184] [67550 - MainThread] [INFO] uvicorn.error - Finished server process [67550]
Close grpc channel to Zeebe on TRANSIENT_FAILURE status.
Changes
API Updates
New Features (required)
None.
Deprecations (required)
None.
Enhancements (optional)
grpc.ChannelConnectivity.TRANSIENT_FAILURE
signal. This fixes an issue where pyzeebe would try to reconnect over and over.Checklist
References
Fixes #74