Skip to content

Commit

Permalink
Issue 71 fix lease util keep (#90)
Browse files Browse the repository at this point in the history
* clean code

* fix keepalive; and use lock instead of time.sleep()

* fix test of py2

* fix typo
  • Loading branch information
Revolution1 committed May 8, 2019
1 parent 658478b commit a3498e7
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 21 deletions.
68 changes: 51 additions & 17 deletions etcd3/stateful/lease.py
Expand Up @@ -4,10 +4,35 @@
import threading
import time

import six

from ..errors import ErrLeaseNotFound
from ..utils import log
from ..utils import retry

if six.PY2: # pragma: no cover
def wait_lock(lock, timeout):
"""
Hack for python2.7 since it's lock not support timeout
:param lock: threading.Lock
:param timeout: seconds of timeout
:return: bool
"""
cond = threading.Condition(threading.Lock())
with cond:
current_time = start_time = time.time()
while current_time < start_time + timeout:
if lock.acquire(False):
return True
else:
cond.wait(timeout - current_time + start_time)
current_time = time.time()
return False
else:
def wait_lock(lock, timeout):
return lock.acquire(True, timeout=timeout)


class Lease(object):
def __init__(self, client, ttl, ID=0, new=True):
Expand All @@ -31,6 +56,7 @@ def __init__(self, client, ttl, ID=0, new=True):
self.keeping = False
self.last_keep = None
self._thread = None
self._lock = threading.Lock()

@property
def ID(self):
Expand Down Expand Up @@ -109,28 +135,32 @@ def keepalive(self, keep_cb=None, cancel_cb=None):
:type cancel_cb: callable
:param cancel_cb: callback function that will be called after cancel keepalive
"""
if self.keeping:
raise RuntimeError("already keeping")
self.keeping = True

def keepalived():
while self.keeping:
retry(self.keepalive_once, max_tries=3, log=log)
self.last_keep = time.time()
log.debug("keeping lease %d" % self.ID)
if keep_cb:
with self._lock:
while self.keeping:
retry(self.keepalive_once, max_tries=3, log=log)
self.last_keep = time.time()
log.debug("keeping lease %d" % self.ID)
if keep_cb:
try:
keep_cb()
except Exception:
log.exception("keep_cb() raised an error")
for _ in range(int(self.grantedTTL / 2.0)): # keep per grantedTTL/4 seconds
if not self.keeping:
break
# self._lock.acquire(True, timeout=0.5)
wait_lock(self._lock, timeout=0.5)
log.debug("canceled keeping lease %d" % self.ID)
if cancel_cb:
try:
keep_cb()
cancel_cb()
except Exception:
log.exception("stream_cb() raised an error")
for _ in range(int(self.grantedTTL / 2.0)): # keep per grantedTTL/4 seconds
if self.keeping:
break
time.sleep(0.5)
log.debug("canceled keeping lease %d" % self.ID)
if cancel_cb:
try:
cancel_cb()
except Exception:
log.exception("cancel_cb() raised an error")
log.exception("cancel_cb() raised an error")

t = self._thread = threading.Thread(target=keepalived)
t.setDaemon(True)
Expand All @@ -144,6 +174,10 @@ def cancel_keepalive(self, join=True):
:param join: whether to wait the keepalive thread to exit
"""
self.keeping = False
try:
self._lock.acquire(False)
finally:
self._lock.release()
if join and self._thread and self._thread.is_alive():
self._thread.join()

Expand Down
2 changes: 1 addition & 1 deletion tests/test_lease_apis.py
Expand Up @@ -5,7 +5,7 @@

from etcd3.client import Client
from tests.docker_cli import docker_run_etcd_main
from .envs import protocol, host, port
from .envs import protocol, host
from .etcd_go_cli import NO_ETCD_SERVICE, etcdctl


Expand Down
11 changes: 8 additions & 3 deletions tests/test_lease_util.py
@@ -1,13 +1,13 @@
import random
import time

import mock
import pytest
import random

from etcd3.client import Client
from tests.docker_cli import docker_run_etcd_main
from .envs import protocol, host
from .etcd_go_cli import NO_ETCD_SERVICE, etcdctl
from .etcd_go_cli import etcdctl


@pytest.fixture(scope='module')
Expand All @@ -21,7 +21,6 @@ def client():
c.close()


@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available")
def test_lease_util(client):
ID = random.randint(10000, 100000)
TTL = 2 # min is 2sec
Expand All @@ -46,6 +45,8 @@ def test_lease_util(client):
assert not lease.keeping
assert not lease._thread.is_alive()


def test_lease_keep(client):
ID = random.randint(10000, 100000)
TTL = 5 # min is 2sec
keep_cb = mock.Mock()
Expand All @@ -54,8 +55,12 @@ def test_lease_util(client):
lease = client.Lease(ttl=TTL, ID=ID)
lease.grant()
lease.keepalive(keep_cb=keep_cb, cancel_cb=cancel_cb)
with pytest.raises(RuntimeError):
lease.keepalive()
time.sleep(1)
lease.cancel_keepalive()
assert keep_cb.called
assert keep_cb.call_count < 2 # or it keep too fast
assert cancel_cb.called

lease.keepalive_once()
Expand Down

0 comments on commit a3498e7

Please sign in to comment.