Skip to content

Commit

Permalink
Merge pull request #15 from Revolution1/feature/lock
Browse files Browse the repository at this point in the history
Feature/lock
  • Loading branch information
Revolution1 committed Mar 21, 2018
2 parents 8559273 + fd983e6 commit 6635467
Show file tree
Hide file tree
Showing 20 changed files with 540 additions and 133 deletions.
33 changes: 31 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ Notice: The authentication header through gRPC-JSON-Gateway only supported in [e
* [x] Lease
* [x] Maintenance
* [x] Extra APIs
* [ ] stateful utilities
* [x] stateful utilities
* [x] Watch
* [x] Lease
* [x] Transaction
* [ ] Lock
* [x] Lock

## Quick Start

Expand Down Expand Up @@ -107,6 +107,35 @@ b'foz' b'bar'
>>> w.stop()
```

**Lock Util**
```python
>>> import time
>>> from threading import Thread
>>> from etcd3 import Client
>>> client = Client()
>>> name = 'lock_name'
>>> def user1():
... with client.Lock(name, lock_ttl=5):
... print('user1 got the lock')
... time.sleep(5)
... print('user1 releasing the lock')
>>> def user2():
... with client.Lock(name, lock_ttl=5):
... print('user2 got the lock')
... time.sleep(5)
... print('user2 releasing the lock')
>>> t1 = Thread(target=user1, daemon=True)
>>> t2 = Thread(target=user2, daemon=True)
>>> t1.start()
>>> t2.start()
>>> t1.join()
>>> t2.join()
user1 got the lock
user1 releasing the lock
user2 got the lock
user2 releasing the lock
```

## TODO

- [ ] benchmark
Expand Down
2 changes: 2 additions & 0 deletions etcd3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
from .stateful import Txn
from .stateful import Watcher
from .stateful import Lease
from .stateful import Lock

from .stateful.watch import EventType

__all__.extend([
'Txn',
'Watcher',
'Lease',
'Lock',
'EventType'
])
12 changes: 6 additions & 6 deletions etcd3/apis/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class WatchAPI(BaseAPI):
@check_param(at_least_one_of=['create_request', 'cancel_request'],
at_most_one_of=['create_request', 'cancel_request'])
def watch(self, create_request=None, cancel_request=None):
def watch(self, create_request=None, cancel_request=None, **kwargs):
"""
PLEASE USE THE WATCH UTIL
Expand All @@ -29,11 +29,11 @@ def watch(self, create_request=None, cancel_request=None):
"cancel_request": cancel_request
}
data = {k: v for k, v in data.items() if v is not None}
return self.call_rpc(method, data=data, stream=True)
return self.call_rpc(method, data=data, stream=True, **kwargs)

@check_param(at_least_one_of=['key', 'all'], at_most_one_of=['range_end', 'prefix', 'all'])
def watch_create(self, key=None, range_end=None, start_revision=None, progress_notify=None, prev_kv=None,
prefix=False, all=False, no_put=False, no_delete=False):
prefix=False, all=False, no_put=False, no_delete=False, **kwargs):
"""
WatchCreate creates a watch stream on given key or key_range
Expand Down Expand Up @@ -82,9 +82,9 @@ def watch_create(self, key=None, range_end=None, start_revision=None, progress_n
"prev_kv": prev_kv
}
data = {k: v for k, v in data.items() if v is not None}
return self.watch(create_request=data)
return self.watch(create_request=data, **kwargs)

def watch_cancel(self, watch_id): # pragma: no cover
def watch_cancel(self, watch_id, **kwargs): # pragma: no cover
"""
NOT SUPPORTED UNDER ETCD 3.3-
Expand All @@ -103,4 +103,4 @@ def watch_cancel(self, watch_id): # pragma: no cover
data = {
"watch_id": watch_id
}
return self.watch(cancel_request=data)
return self.watch(cancel_request=data, **kwargs)
8 changes: 6 additions & 2 deletions etcd3/baseclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .apis import MaintenanceAPI
from .apis import WatchAPI
from .stateful import Lease
from .stateful import Lock
from .stateful import Txn
from .stateful import Watcher
from .swagger_helper import SwaggerSpec
Expand Down Expand Up @@ -209,7 +210,7 @@ def Lease(self, ttl, ID=0, new=True):
return Lease(self, ttl=ttl, ID=ID, new=new)

def Watcher(self, key=None, range_end=None, max_retries=-1, start_revision=None, progress_notify=None,
prev_kv=None, prefix=None, all=None, no_put=False, no_delete=False):
prev_kv=None, prefix=None, all=None, no_put=False, no_delete=False, timeout=None):
"""
Initialize a Watcher
Expand Down Expand Up @@ -246,4 +247,7 @@ def Watcher(self, key=None, range_end=None, max_retries=-1, start_revision=None,
return Watcher(client=self, key=key, range_end=range_end, max_retries=max_retries,
start_revision=start_revision,
progress_notify=progress_notify, prev_kv=prev_kv, prefix=prefix, all=all, no_put=no_put,
no_delete=no_delete)
no_delete=no_delete, timeout=timeout)

def Lock(self, lock_name, lock_ttl=Lock.DEFAULT_LOCK_TTL, reentrant=None, lock_prefix='_locks'):
return Lock(self, lock_name=lock_name, lock_ttl=lock_ttl, reentrant=reentrant, lock_prefix=lock_prefix)
4 changes: 2 additions & 2 deletions etcd3/errors/go_etcd_rpctypes_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ class Etcd3Exception(Exception):

def Error(err, name): # pragma: no cover
class ClientError(Etcd3Exception):
def __init__(self, error, code, status, response=None):
def __init__(self, error=None, code=None, status=None, response=None):
self.code = err[0]
self.error = err[1]
self.codeText = codeText[code]
self.codeText = codeText[code] if code else None
self.status = status
self.response = response

Expand Down
3 changes: 2 additions & 1 deletion etcd3/stateful/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# flake8: noqa
from .lease import Lease
from .lock import Lock
from .transaction import Txn
from .watch import Watcher

__all__ = ['Txn', 'Lease', 'Watcher']
__all__ = ['Txn', 'Lease', 'Watcher', 'Lock']
83 changes: 5 additions & 78 deletions etcd3/stateful/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ def keepalive_once(self):

refresh = keepalive_once

# def keepalive(self, stream_cb=None, cancel_cb=None):
# self.fifo = produce_stream(b'{"ID":%d}\n' % self.ID, self.grantedTTL / 4.0,
# stream_cb=stream_cb, cancel_cb=cancel_cb)
# self.stream_conn = self.client.lease_keep_alive(self.fifo)
# return self.stream_conn

def keepalive(self, keep_cb=None, cancel_cb=None):
self.keeping = True

Expand Down Expand Up @@ -106,8 +100,10 @@ def keepalived():
t.setDaemon(True)
t.start()

def cancel_keepalive(self):
def cancel_keepalive(self, join=True):
self.keeping = False
if join and self._thread and self._thread.is_alive():
self._thread.join()

def jammed(self):
"""
Expand All @@ -118,6 +114,8 @@ def jammed(self):
return time.time() - self.last_keep > self.grantedTTL / 4.0

def revoke(self):
log.debug("revoking lease %d" % self.ID)
self.cancel_keepalive(False)
return self.client.lease_revoke(self.ID)

def __enter__(self):
Expand All @@ -128,74 +126,3 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.cancel_keepalive()
self.revoke()

# def produce_stream(data, interval, q=None, stream_cb=None, cancel_cb=None):
# """
# :param data: data to put into stream
# :param interval: put interval
# :param q: queue that handles put
# :param stream_cb: callback when put
# :param cancel_cb: callback when stream canceled
# :return: StreamFIFO
# """
# q = q or StreamFIFO(maxsize=128)
#
# def _put():
# while True:
# try:
# q.put(data)
# log.debug("produced a stream chunk")
# if stream_cb:
# stream_cb()
# time.sleep(interval)
# except StreamClosed:
# log.debug("exiting due to stream closed")
# if cancel_cb:
# cancel_cb()
# break
# except Exception as e:
# raise
#
# t = threading.Thread(target=_put)
# t.setDaemon(True)
# t.start()
# return q


# class StreamClosed(ValueError):
# pass


# class StreamFIFO(Queue):
# def __init__(self, maxsize=0):
# super(StreamFIFO, self).__init__(maxsize=maxsize)
# self._closed = False
# self.last_get = None
#
# @property
# def closed(self):
# return self._closed
#
# def put(self, item, block=True, timeout=None):
# if self.closed:
# raise StreamClosed("put on a closed stream")
# return super(StreamFIFO, self).put(item, block, timeout)
#
# def get(self, block=True, timeout=None):
# if self.closed:
# raise StreamClosed("get on a closed stream")
# self.last_get = time.time()
# return super(StreamFIFO, self).get(block, timeout)
#
# def __iter__(self):
# r = self.get()
# if not r:
# raise StopIteration
# yield r
#
# def close(self):
# self.queue.clear()
# self._closed = True

# def read(self, n):
# return self.get()

0 comments on commit 6635467

Please sign in to comment.