From a3498e72a281095101b56df37b1e8eb85c14a3af Mon Sep 17 00:00:00 2001 From: Renjie Cai Date: Wed, 8 May 2019 20:13:30 +0800 Subject: [PATCH] Issue 71 fix lease util keep (#90) * clean code * fix keepalive; and use lock instead of time.sleep() * fix test of py2 * fix typo --- etcd3/stateful/lease.py | 68 ++++++++++++++++++++++++++++++---------- tests/test_lease_apis.py | 2 +- tests/test_lease_util.py | 11 +++++-- 3 files changed, 60 insertions(+), 21 deletions(-) diff --git a/etcd3/stateful/lease.py b/etcd3/stateful/lease.py index 4debdc4..a4e6a9f 100644 --- a/etcd3/stateful/lease.py +++ b/etcd3/stateful/lease.py @@ -4,10 +4,35 @@ import threading import time +import six + from ..errors import ErrLeaseNotFound from ..utils import log from ..utils import retry +if six.PY2: # pragma: no cover + def wait_lock(lock, timeout): + """ + Hack for python2.7 since it's lock not support timeout + + :param lock: threading.Lock + :param timeout: seconds of timeout + :return: bool + """ + cond = threading.Condition(threading.Lock()) + with cond: + current_time = start_time = time.time() + while current_time < start_time + timeout: + if lock.acquire(False): + return True + else: + cond.wait(timeout - current_time + start_time) + current_time = time.time() + return False +else: + def wait_lock(lock, timeout): + return lock.acquire(True, timeout=timeout) + class Lease(object): def __init__(self, client, ttl, ID=0, new=True): @@ -31,6 +56,7 @@ def __init__(self, client, ttl, ID=0, new=True): self.keeping = False self.last_keep = None self._thread = None + self._lock = threading.Lock() @property def ID(self): @@ -109,28 +135,32 @@ def keepalive(self, keep_cb=None, cancel_cb=None): :type cancel_cb: callable :param cancel_cb: callback function that will be called after cancel keepalive """ + if self.keeping: + raise RuntimeError("already keeping") self.keeping = True def keepalived(): - while self.keeping: - retry(self.keepalive_once, max_tries=3, log=log) - self.last_keep = time.time() - log.debug("keeping lease %d" % self.ID) - if keep_cb: + with self._lock: + while self.keeping: + retry(self.keepalive_once, max_tries=3, log=log) + self.last_keep = time.time() + log.debug("keeping lease %d" % self.ID) + if keep_cb: + try: + keep_cb() + except Exception: + log.exception("keep_cb() raised an error") + for _ in range(int(self.grantedTTL / 2.0)): # keep per grantedTTL/4 seconds + if not self.keeping: + break + # self._lock.acquire(True, timeout=0.5) + wait_lock(self._lock, timeout=0.5) + log.debug("canceled keeping lease %d" % self.ID) + if cancel_cb: try: - keep_cb() + cancel_cb() except Exception: - log.exception("stream_cb() raised an error") - for _ in range(int(self.grantedTTL / 2.0)): # keep per grantedTTL/4 seconds - if self.keeping: - break - time.sleep(0.5) - log.debug("canceled keeping lease %d" % self.ID) - if cancel_cb: - try: - cancel_cb() - except Exception: - log.exception("cancel_cb() raised an error") + log.exception("cancel_cb() raised an error") t = self._thread = threading.Thread(target=keepalived) t.setDaemon(True) @@ -144,6 +174,10 @@ def cancel_keepalive(self, join=True): :param join: whether to wait the keepalive thread to exit """ self.keeping = False + try: + self._lock.acquire(False) + finally: + self._lock.release() if join and self._thread and self._thread.is_alive(): self._thread.join() diff --git a/tests/test_lease_apis.py b/tests/test_lease_apis.py index 43ed222..a8748ed 100644 --- a/tests/test_lease_apis.py +++ b/tests/test_lease_apis.py @@ -5,7 +5,7 @@ from etcd3.client import Client from tests.docker_cli import docker_run_etcd_main -from .envs import protocol, host, port +from .envs import protocol, host from .etcd_go_cli import NO_ETCD_SERVICE, etcdctl diff --git a/tests/test_lease_util.py b/tests/test_lease_util.py index 733e0e5..e463dde 100644 --- a/tests/test_lease_util.py +++ b/tests/test_lease_util.py @@ -1,13 +1,13 @@ +import random import time import mock import pytest -import random from etcd3.client import Client from tests.docker_cli import docker_run_etcd_main from .envs import protocol, host -from .etcd_go_cli import NO_ETCD_SERVICE, etcdctl +from .etcd_go_cli import etcdctl @pytest.fixture(scope='module') @@ -21,7 +21,6 @@ def client(): c.close() -@pytest.mark.skipif(NO_ETCD_SERVICE, reason="no etcd service available") def test_lease_util(client): ID = random.randint(10000, 100000) TTL = 2 # min is 2sec @@ -46,6 +45,8 @@ def test_lease_util(client): assert not lease.keeping assert not lease._thread.is_alive() + +def test_lease_keep(client): ID = random.randint(10000, 100000) TTL = 5 # min is 2sec keep_cb = mock.Mock() @@ -54,8 +55,12 @@ def test_lease_util(client): lease = client.Lease(ttl=TTL, ID=ID) lease.grant() lease.keepalive(keep_cb=keep_cb, cancel_cb=cancel_cb) + with pytest.raises(RuntimeError): + lease.keepalive() + time.sleep(1) lease.cancel_keepalive() assert keep_cb.called + assert keep_cb.call_count < 2 # or it keep too fast assert cancel_cb.called lease.keepalive_once()