Skip to content

Commit

Permalink
Add a method for signalling unexpected termination of greenlets
Browse files Browse the repository at this point in the history
Poller/watcher greenlets can crash unexpectedly, causing the system to function incorrectly.
Previously users had no way of discovering such a condition.
This PR adds an optional event object that will be signaled when such an exception occurs.
Users can then handle the issue appropriately for their application.
  • Loading branch information
nonirosenfeldredis committed Mar 4, 2024
1 parent e60dfa0 commit a175693
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 26 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ jobs:
- run:
name: Install dependencies
command: |
sudo apt-get update
sudo apt-get install -y python-dev
- run:
name: Install pyenv
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ pip install .
python setup.py test
```

### Packaging Source Distribution

```
python setup.py sdist
```

## Usage Examples

The framework comes with sample client/server applications to demonstrate its use. They use Python's included
Expand Down
12 changes: 10 additions & 2 deletions examples/info_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from __future__ import print_function
import gevent.hub
from gevent.event import AsyncResult

import redis_info_provider
import argparse
from threading import Thread
Expand Down Expand Up @@ -46,12 +48,18 @@ def main():
Thread(target=server.serve_forever).start()

try:
with redis_info_provider.LocalShardWatcher(), redis_info_provider.InfoPoller():
with redis_info_provider.LocalShardWatcher() as watcher , redis_info_provider.InfoPoller() as poller:
wait_event = AsyncResult()
watcher.set_exception_event(wait_event)
poller.set_exception_event(wait_event)
print('INFO server running.')
gevent.sleep(float('inf'))
wait_event.get()
except KeyboardInterrupt:
print('INFO server stopping.')
except Exception as e:
print(f'INFO server caught exception:{e}')
finally:
print('INFO server shutting.')
server.shutdown()
server.server_close()

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

setup(
name='redis-info-provider',
version='0.13.0',
version='0.14.0',
python_requires='>=2.7',
package_dir={'': 'src'},
packages=find_packages('src'),
Expand Down
49 changes: 40 additions & 9 deletions src/redis_info_provider/info_poller.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from gevent.event import AsyncResult
from greenlet import GreenletExit

from .shard_pub import ShardPublisher
import gevent
import gevent.socket
Expand All @@ -21,8 +24,8 @@ class InfoPoller(object):
and made available via the ShardPublisher.
"""

def __init__(self, grace=3):
# type: (int) -> None
def __init__(self, grace=3, logger=None):
# type: (int,Logger) -> None

"""
:param grace: Number of consecutive times a shard polling needs to fail before
Expand All @@ -31,8 +34,9 @@ def __init__(self, grace=3):
errors being logged unnecessarily.
"""
self._greenlets = {} # type: Dict[str, gevent.Greenlet]
self.logger = logging.getLogger(__name__)
self.logger = logger or logging.getLogger(__name__)
self._grace = grace
self.exception_event = None

# Subscribe to receive shard event notifications
ShardPublisher.subscribe_shard_event(ShardPublisher.ShardEvent.ADDED, self._add_shard)
Expand All @@ -42,6 +46,18 @@ def __init__(self, grace=3):
for shard in ShardPublisher.get_live_shards():
self._add_shard(shard)

def set_exception_event(self, exception_evt):
# type: (AsyncResult) -> None

"""
Configure an external event object that will be signaled if an unexpected exception occurs
inside a poller greenlet. This allows users of the module to be alerted if a poller terminates
due to an unhandled exception.
:param exception_evt: The event object. `exception_evt.set_exception` will be called if an unhandled
exception occurs in a poller.
"""
self.exception_event = exception_evt

def stop(self):
# type: () -> None

Expand Down Expand Up @@ -101,7 +117,8 @@ def _poll_shard(self, shard):
Shard-polling greenlet main().
"""

consecutive_failures = 0
consecutive_redis_failures = 0
consecutive_general_failures = 0

# Retry loop. Redis errors (disconnects etc.) shouldn't stop us from polling as
# long as the shard lives. However, other unexpected problems should at the
Expand All @@ -118,19 +135,33 @@ def _poll_shard(self, shard):
self.logger.debug('Polled shard %s', shard.id)
info['meta'] = {}
shard.info = info
consecutive_failures = 0
consecutive_redis_failures = 0; consecutive_general_failures = 0
gevent.sleep(shard.polling_interval())
except redis.RedisError:
consecutive_failures += 1
if consecutive_failures < self._grace:
consecutive_redis_failures += 1
if consecutive_redis_failures < self._grace:
self.logger.debug(
'Redis error polling shard %s for %d consecutive times; still within grace period; will retry',
shard.id, consecutive_failures
shard.id, consecutive_redis_failures
)
else:
self.logger.warning(
'Redis error polling shard %s for %d consecutive times; will retry',
shard.id, consecutive_failures
shard.id, consecutive_redis_failures
)
gevent.sleep(1) # Cool-off period
continue # Retry
except GreenletExit:
self.logger.info("poller %s exiting..", shard.id)
raise
except Exception as e:
consecutive_general_failures += 1
self.logger.error(" info_poller shard %s caught exception: %s",shard.id,e)
if consecutive_general_failures < self._grace:
gevent.sleep(2)
else:
self.logger.error("general exception caught more than %s consecutive times; poller %s exiting..",
consecutive_general_failures, shard.id)
if self.exception_event:
self.exception_event.set_exception(e)
raise
51 changes: 37 additions & 14 deletions src/redis_info_provider/local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import psutil
import logging
import redis
from greenlet import GreenletExit

from .shard_pub import ShardPublisher
from .redis_shard import RedisShard
from typing import Mapping, Iterator, Any
from gevent.event import AsyncResult


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -36,8 +39,21 @@ def __init__(self, update_frequency=1.0):
"""
self._update_freq = update_frequency
self._should_stop = gevent.event.Event()
self.exception_event = None
self._greenlet = gevent.spawn(self._greenlet_main)

def set_exception_event(self, exception_evt):
# type: (AsyncResult) -> None

"""
Configure an external event object that will be signaled if an unexpected exception occurs
inside a poller greenlet. This allows users of the module to be alerted if a poller terminates
due to an unhandled exception.
:param exception_evt: The event object. `exception_evt.set_exception` will be called if an unhandled
exception occurs in a poller.
"""
self.exception_event = exception_evt

def stop(self):
# type: () -> None

Expand All @@ -59,20 +75,27 @@ def __exit__(self, exception_type, exception_value, traceback):
def _greenlet_main(self):
# type: () -> None

while not self._should_stop.is_set():
published_shard_ids = ShardPublisher.get_live_shard_ids()
live_shards = self._get_live_shards()
live_shard_ids = set(live_shards.keys())

logger.info('Updated Redis shards: %s', live_shard_ids)

for shard_id in live_shard_ids - published_shard_ids:
# New shard
ShardPublisher.add_shard(live_shards[shard_id])
for shard_id in published_shard_ids - live_shard_ids:
# Removed shard
ShardPublisher.del_shard(shard_id)
gevent.sleep(self._update_freq)
try:
while not self._should_stop.is_set():
published_shard_ids = ShardPublisher.get_live_shard_ids()
live_shards = self._get_live_shards()
live_shard_ids = set(live_shards.keys())

logger.info('Updated Redis shards: %s', live_shard_ids)

for shard_id in live_shard_ids - published_shard_ids:
# New shard
ShardPublisher.add_shard(live_shards[shard_id])
for shard_id in published_shard_ids - live_shard_ids:
# Removed shard
ShardPublisher.del_shard(shard_id)
gevent.sleep(self._update_freq)
except GreenletExit:
self.logger.info("local watcher exiting..")
raise
except Exception as e:
if self.exception_event:
self.exception_event.set_exception(e)

@classmethod
def _get_live_shards(cls):
Expand Down

0 comments on commit a175693

Please sign in to comment.