Skip to content

Commit

Permalink
Merge branch 'release/2.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
wolph committed Jan 30, 2021
2 parents 921b8a6 + 34a8c1b commit c23f874
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 46 deletions.
7 changes: 5 additions & 2 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[report]
ignore_errors = True
fail_under = 100
exclude_lines =
pragma: no cover
Expand All @@ -10,8 +11,10 @@ exclude_lines =
if 0:
if __name__ == .__main__.:

omit =
portalocker/redis.py

[run]
source = src
branch = True
omit =
portalocker/redis.py

2 changes: 1 addition & 1 deletion portalocker/__about__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__package_name__ = 'portalocker'
__author__ = 'Rick van Hattem'
__email__ = 'wolph@wol.ph'
__version__ = '2.1.0'
__version__ = '2.2.0'
__description__ = '''Wraps the portalocker recipe for easy usage'''
__url__ = 'https://github.com/WoLpH/portalocker'

2 changes: 1 addition & 1 deletion portalocker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#: Current author's email address
__email__ = __about__.__email__
#: Version number
__version__ = '2.1.0'
__version__ = '2.2.0'
#: Package description for Pypi
__description__ = __about__.__description__
#: Package homepage
Expand Down
145 changes: 138 additions & 7 deletions portalocker/redis.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,47 @@
import _thread
import json
import logging
import random
import time
import typing
from typing import Any
from typing import Dict

from redis import client

from . import exceptions
from . import utils

logger = logging.getLogger(__name__)

DEFAULT_UNAVAILABLE_TIMEOUT = 1
DEFAULT_THREAD_SLEEP_TIME = 0.1


class PubSubWorkerThread(client.PubSubWorkerThread):

def run(self):
try:
super().run()
except Exception: # pragma: no cover
_thread.interrupt_main()
raise


class RedisLock(utils.LockBase):
'''
An extremely reliable Redis lock based on pubsub
An extremely reliable Redis lock based on pubsub with a keep-alive thread
As opposed to most Redis locking systems based on key/value pairs,
this locking method is based on the pubsub system. The big advantage is
that if the connection gets killed due to network issues, crashing
processes or otherwise, it will still immediately unlock instead of
waiting for a lock timeout.
To make sure both sides of the lock know about the connection state it is
recommended to set the `health_check_interval` when creating the redis
connection..
Args:
channel: the redis channel to use as locking key.
connection: an optional redis connection if you already have one
Expand All @@ -24,42 +50,91 @@ class RedisLock(utils.LockBase):
check_interval: check interval while waiting
fail_when_locked: after the initial lock failed, return an error
or lock the file. This does not wait for the timeout.
thread_sleep_time: sleep time between fetching messages from redis to
prevent a busy/wait loop. In the case of lock conflicts this
increases the time it takes to resolve the conflict. This should
be smaller than the `check_interval` to be useful.
unavailable_timeout: If the conflicting lock is properly connected
this should never exceed twice your redis latency. Note that this
will increase the wait time possibly beyond your `timeout` and is
always executed if a conflict arises.
redis_kwargs: The redis connection arguments if no connection is
given. The `DEFAULT_REDIS_KWARGS` are used as default, if you want
to override these you need to explicitly specify a value (e.g.
`health_check_interval=0`)
'''

redis_kwargs: Dict[str, Any]
thread: typing.Optional[PubSubWorkerThread]
channel: str
timeout: float
connection: typing.Optional[client.Redis]
pubsub: typing.Optional[client.PubSub] = None
close_connection: bool

DEFAULT_REDIS_KWARGS = dict(
health_check_interval=10,
)

def __init__(
self,
channel: str,
connection: typing.Optional[client.Redis] = None,
timeout: typing.Optional[float] = None,
check_interval: typing.Optional[float] = None,
fail_when_locked: typing.Optional[bool] = False,
thread_sleep_time: float = DEFAULT_THREAD_SLEEP_TIME,
unavailable_timeout: float = DEFAULT_UNAVAILABLE_TIMEOUT,
redis_kwargs: typing.Optional[typing.Dict] = None,
):
# We don't want to close connections given as an argument
self.close_connection = not connection

self.thread = None
self.channel = channel
self.connection = connection
self.thread_sleep_time = thread_sleep_time
self.unavailable_timeout = unavailable_timeout
self.redis_kwargs = redis_kwargs or dict()

for key, value in self.DEFAULT_REDIS_KWARGS.items():
self.redis_kwargs.setdefault(key, value)

super(RedisLock, self).__init__(timeout=timeout,
check_interval=check_interval,
fail_when_locked=fail_when_locked)

def get_connection(self) -> client.Redis:
if not self.connection:
self.connection = client.Redis()
self.connection = client.Redis(**self.redis_kwargs)

return self.connection

def channel_handler(self, message):
if message.get('type') != 'message': # pragma: no cover
return

try:
data = json.loads(message.get('data'))
except TypeError: # pragma: no cover
logger.debug('TypeError while parsing: %r', message)
return

self.connection.publish(data['response_channel'], str(time.time()))

@property
def client_name(self):
return self.channel + '-lock'

def acquire(
self, timeout: float = None, check_interval: float = None,
fail_when_locked: typing.Optional[bool] = True):
fail_when_locked: typing.Optional[bool] = None):

timeout = utils.coalesce(timeout, self.timeout, 0.0)
check_interval = utils.coalesce(check_interval, self.check_interval,
0.0)
fail_when_locked = utils.coalesce(fail_when_locked,
self.fail_when_locked)

assert not self.pubsub, 'This lock is already active'
connection = self.get_connection()
Expand All @@ -68,9 +143,26 @@ def acquire(
for _ in timeout_generator: # pragma: no branch
subscribers = connection.pubsub_numsub(self.channel)[0][1]

if subscribers:
logger.debug('Found %d lock subscribers for %s',
subscribers, self.channel)

if self.check_or_kill_lock(
connection,
self.unavailable_timeout): # pragma: no branch
continue
else: # pragma: no cover
subscribers = None

# Note: this should not be changed to an elif because the if
# above can still end up here
if not subscribers:
connection.client_setname(self.client_name)
self.pubsub = connection.pubsub()
self.pubsub.subscribe(self.channel)
self.pubsub.subscribe(**{self.channel: self.channel_handler})
self.thread = PubSubWorkerThread(
self.pubsub, sleep_time=self.thread_sleep_time)
self.thread.start()

subscribers = connection.pubsub_numsub(self.channel)[0][1]
if subscribers == 1: # pragma: no branch
Expand All @@ -79,10 +171,49 @@ def acquire(
# Race condition, let's try again
self.release()

if fail_when_locked: # pragma: no branch
if fail_when_locked: # pragma: no cover
raise exceptions.AlreadyLocked(exceptions)

raise exceptions.AlreadyLocked(exceptions)

def check_or_kill_lock(self, connection, timeout):
# Random channel name to get messages back from the lock
response_channel = f'{self.channel}-{random.random()}'

pubsub = connection.pubsub()
pubsub.subscribe(response_channel)
connection.publish(self.channel, json.dumps(dict(
response_channel=response_channel,
message='ping',
)))

check_interval = min(self.thread_sleep_time, timeout / 10)
for _ in self._timeout_generator(
timeout, check_interval): # pragma: no branch
message = pubsub.get_message(timeout=check_interval)
if message: # pragma: no branch
pubsub.close()
return True

for client_ in connection.client_list('pubsub'): # pragma: no cover
if client_.get('name') == self.client_name:
logger.warning(
'Killing unavailable redis client: %r', client_)
connection.client_kill_filter(client_.get('id'))

def release(self):
if self.pubsub:
logger.error('releasing: %r', self.thread)
if self.thread: # pragma: no branch
self.thread.stop()
self.thread.join()
self.thread = None
time.sleep(0.01)

if self.pubsub: # pragma: no branch
self.pubsub.unsubscribe(self.channel)
self.pubsub.close()
self.pubsub = None

def __del__(self):
self.release()

67 changes: 46 additions & 21 deletions portalocker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@
Filename = typing.Union[str, pathlib.Path]


def coalesce(*args, test_value=None):
'''Simple coalescing function that returns the first value that is not
equal to the `test_value`. Or `None` if no value is valid. Usually this
means that the last given value is the default value.
Note that the `test_value` is compared using an identity check
(i.e. `value is not test_value`) so changing the `test_value` won't work
for all values.
>>> coalesce(None, 1)
1
>>> coalesce()
>>> coalesce(0, False, True)
0
>>> coalesce(0, False, True, test_value=0)
False
# This won't work because of the `is not test_value` type testing:
>>> coalesce([], dict(spam='eggs'), test_value=[])
[]
'''
for arg in args:
if arg is not test_value:
return arg


@contextlib.contextmanager
def open_atomic(filename: Filename, binary: bool = True):
'''Open a file for atomic writing. Instead of locking this method allows
Expand Down Expand Up @@ -79,19 +106,19 @@ def open_atomic(filename: Filename, binary: bool = True):

class LockBase(abc.ABC): # pragma: no cover
#: timeout when trying to acquire a lock
timeout: typing.Optional[float] = DEFAULT_TIMEOUT
timeout: float
#: check interval while waiting for `timeout`
check_interval: typing.Optional[float] = DEFAULT_CHECK_INTERVAL
check_interval: float
#: skip the timeout and immediately fail if the initial lock fails
fail_when_locked: typing.Optional[bool] = DEFAULT_FAIL_WHEN_LOCKED
fail_when_locked: bool

def __init__(self, timeout: typing.Optional[float] = None,
check_interval: typing.Optional[float] = None,
fail_when_locked: typing.Optional[bool] = None):
if timeout is not None:
self.timeout = timeout
if check_interval is not None:
self.check_interval: float = check_interval
self.timeout = coalesce(timeout, DEFAULT_TIMEOUT)
self.check_interval = coalesce(check_interval, DEFAULT_CHECK_INTERVAL)
self.fail_when_locked = coalesce(fail_when_locked,
DEFAULT_FAIL_WHEN_LOCKED)

@abc.abstractmethod
def acquire(
Expand All @@ -100,21 +127,20 @@ def acquire(
return NotImplemented

def _timeout_generator(self, timeout, check_interval):
if timeout is None:
timeout = self.timeout

if timeout is None:
timeout = 0
timeout = coalesce(timeout, self.timeout, 0.0)
check_interval = coalesce(check_interval, self.check_interval, 0.0)

if check_interval is None:
check_interval = self.check_interval
yield 0
i = 0

yield
start_time = time.perf_counter()
while start_time + timeout > time.perf_counter():
i += 1
yield i

timeout_end = time.perf_counter() + timeout
while timeout_end > time.perf_counter():
yield
time.sleep(check_interval)
# Take low lock checks into account to stay within the interval
since_start_time = time.perf_counter() - start_time
time.sleep(max(0.001, (i * check_interval) - since_start_time))

@abc.abstractmethod
def release(self):
Expand Down Expand Up @@ -181,8 +207,7 @@ def acquire(
fail_when_locked: bool = None) -> typing.IO:
'''Acquire the locked filehandle'''

if fail_when_locked is None:
fail_when_locked = self.fail_when_locked
fail_when_locked = coalesce(fail_when_locked, self.fail_when_locked)

# If we already have a filehandle, return it
fh = self.fh
Expand Down
5 changes: 4 additions & 1 deletion portalocker_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import py
import logging
import pytest


logger = logging.getLogger(__name__)


@pytest.fixture
def tmpfile(tmpdir_factory):
tmpdir = tmpdir_factory.mktemp('temp')
Expand All @@ -11,4 +15,3 @@ def tmpfile(tmpdir_factory):
filename.remove(ignore_errors=True)
except (py.error.EBUSY, py.error.ENOENT):
pass

0 comments on commit c23f874

Please sign in to comment.