Skip to content

Commit

Permalink
Improve logging for distributed algorithms
Browse files Browse the repository at this point in the history
Wrap each `future.result()` call in a `try`/`except`.  Then in the
`except` block, log the exception but don't re-raise it.

This keeps our distributed algorithms robust, but gives us exception
logging.
  • Loading branch information
brainix committed Dec 4, 2020
1 parent fb29409 commit 04a7495
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
22 changes: 12 additions & 10 deletions pottery/nextid.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

from redis import Redis
from redis.client import Script
from redis.exceptions import ConnectionError
from redis.exceptions import TimeoutError
from typing_extensions import Final

from .base import Primitive
Expand Down Expand Up @@ -77,7 +75,8 @@ class NextId(Primitive):
_set_id_script: ClassVar[Optional[Script]] = None

def __init__(self,
*, key: str = KEY,
*,
key: str = KEY,
num_tries: int = NUM_TRIES,
masters: Iterable[Redis] = frozenset(),
) -> None:
Expand Down Expand Up @@ -132,18 +131,19 @@ def __repr__(self) -> str:

@property
def __current_id(self) -> int:
futures, current_id, num_masters_gotten = set(), 0, 0
futures, current_ids = set(), []
with concurrent.futures.ThreadPoolExecutor() as executor:
for master in self.masters:
futures.add(executor.submit(master.get, self.key))
for future in concurrent.futures.as_completed(futures):
with contextlib.suppress(TimeoutError, ConnectionError):
current_id = max(current_id, int(future.result()))
num_masters_gotten += 1
if num_masters_gotten < len(self.masters) // 2 + 1:
try:
current_ids.append(int(future.result()))
except Exception as error:
_logger.error(error, exc_info=True)
if len(current_ids) < len(self.masters) // 2 + 1:
raise QuorumNotAchieved(self.masters, self.key)
else:
return current_id
return max(current_ids)

@__current_id.setter
def __current_id(self, value: int) -> None:
Expand All @@ -158,8 +158,10 @@ def __current_id(self, value: int) -> None:
)
futures.add(future)
for future in concurrent.futures.as_completed(futures):
with contextlib.suppress(TimeoutError, ConnectionError):
try:
num_masters_set += future.result() == value
except Exception as error:
_logger.error(error, exc_info=True)
if num_masters_set < len(self.masters) // 2 + 1:
raise QuorumNotAchieved(self.masters, self.key)

Expand Down
25 changes: 15 additions & 10 deletions pottery/redlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

from redis import Redis
from redis.client import Script
from redis.exceptions import ConnectionError
from redis.exceptions import TimeoutError
from typing_extensions import Final

from .base import Primitive
Expand Down Expand Up @@ -254,8 +252,10 @@ def __acquire_masters(self) -> bool:
for master in self.masters:
futures.add(executor.submit(self.__acquire_master, master))
for future in concurrent.futures.as_completed(futures):
with contextlib.suppress(TimeoutError, ConnectionError):
try:
num_masters_acquired += future.result()
except Exception as error: # pragma: no cover
_logger.error(error, exc_info=True)
quorum = num_masters_acquired >= len(self.masters) // 2 + 1
elapsed = timer.elapsed() - self.__drift()
validity_time = self.auto_release_time - elapsed
Expand Down Expand Up @@ -351,16 +351,17 @@ def locked(self) -> int:
>>> printer_lock_1.release()
'''
futures, num_masters_acquired, ttls = set(), 0, []
futures, ttls = set(), []
with ContextTimer() as timer, \
concurrent.futures.ThreadPoolExecutor() as executor:
for master in self.masters:
futures.add(executor.submit(self.__acquired_master, master))
for future in concurrent.futures.as_completed(futures):
with contextlib.suppress(TimeoutError, ConnectionError):
ttl = future.result()
num_masters_acquired += ttl > 0
ttls.append(ttl)
try:
ttls.append(future.result())
except Exception as error: # pragma: no cover
_logger.error(error, exc_info=True)
num_masters_acquired = sum(1 for ttl in ttls if ttl > 0)
quorum = num_masters_acquired >= len(self.masters) // 2 + 1
if quorum:
ttls = sorted(ttls, reverse=True)
Expand Down Expand Up @@ -398,8 +399,10 @@ def extend(self) -> None:
for master in self.masters:
futures.add(executor.submit(self.__extend_master, master))
for future in concurrent.futures.as_completed(futures):
with contextlib.suppress(TimeoutError, ConnectionError):
try:
num_masters_extended += future.result()
except Exception as error: # pragma: no cover
_logger.error(error, exc_info=True)
quorum = num_masters_extended >= len(self.masters) // 2 + 1
self._extension_num += quorum
if not quorum:
Expand All @@ -426,8 +429,10 @@ def release(self) -> None:
for master in self.masters:
futures.add(executor.submit(self.__release_master, master))
for future in concurrent.futures.as_completed(futures):
with contextlib.suppress(TimeoutError, ConnectionError):
try:
num_masters_released += future.result()
except Exception as error: # pragma: no cover
_logger.error(error, exc_info=True)
quorum = num_masters_released >= len(self.masters) // 2 + 1
if not quorum:
raise ReleaseUnlockedLock(self.masters, self.key)
Expand Down

0 comments on commit 04a7495

Please sign in to comment.