Skip to content

Commit

Permalink
Merge pull request #8 from Revolution1/feature/stateful-utils
Browse files Browse the repository at this point in the history
Stateful Utilities
  • Loading branch information
Revolution1 committed Mar 20, 2018
2 parents 5241406 + 5a933c9 commit 3f7cd72
Show file tree
Hide file tree
Showing 41 changed files with 1,642 additions and 133 deletions.
3 changes: 2 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ ignore_errors = True
source =
etcd3/
omit =
etcd3/errors
etcd3/errors/
tests/
etcd3/utils.py

[html]
directory = coverage_html_report
3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ services:
services:
- docker

after_success:
- codecov

before_install:
- docker pull quay.io/coreos/etcd:v3.3
- docker run -d -p 2379:2379 -p 2380:2380 --name etcd3 quay.io/coreos/etcd:v3.3 etcd --name node1 --initial-advertise-peer-urls http://0.0.0.0:2380 --listen-peer-urls http://0.0.0.0:2380 --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379 --initial-cluster node1=http://0.0.0.0:2380
Expand Down
55 changes: 44 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
etcd3-py
---------------------
# etcd3-py

[![pypi](https://img.shields.io/pypi/v/etcd3-py.svg)](https://pypi.python.org/pypi/etcd3-py)
[![travis](https://travis-ci.org/Revolution1/etcd3-py.svg?branch=master)](https://travis-ci.org/Revolution1/etcd3-py)
Expand All @@ -16,8 +15,7 @@ Python client for etcd v3 (Using gRPC-JSON-Gateway)

Notice: The authentication header through gRPC-JSON-Gateway only supported in [etcd v3.3+](https://github.com/coreos/etcd/pull/7999)

Features
========
## Features

* [x] Support python2.7 and python3.5+
* [x] Sync client based on requests
Expand All @@ -35,19 +33,20 @@ Features
* [x] Maintenance
* [x] Extra APIs
* [ ] stateful utilities
* [ ] Watch
* [ ] Lease
* [ ] Transaction
* [x] Watch
* [x] Lease
* [x] Transaction
* [ ] Lock

Quick Start
===========
## Quick Start

**Install**
```bash
$ pip install etcd3-py
```

---

**Sync Client**
```python
>>> from etcd3 import Client
Expand All @@ -74,9 +73,43 @@ etcdserverpbPutResponse(header=etcdserverpbResponseHeader(cluster_id=11588568905
key: b'foo' value: b'bar'
```

**Transaction Util**
```python
>>> from etcd3 import Client
>>> txn = Client().Txn()
>>> txn.compare(txn.key('foo').value == 'bar')
>>> txn.success(txn.put('foo', 'bra'))
>>> txn.commit()
etcdserverpbTxnResponse(header=etcdserverpbResponseHeader(cluster_id=11588568905070377092, member_id=128088275939295631, revision=15656, raft_term=4), succeeded=True, responses=[etcdserverpbResponseOp(response_put=etcdserverpbPutResponse(header=etcdserverpbResponseHeader(revision=15656)))])
```

**Lease Util**
```python
>>> from etcd3 import Client
>>> client = Client()
>>> with client.Lease(ttl=5) as lease:
... client.put('foo', 'bar', lease=lease.ID)
... client.put('fizz', 'buzz', lease=lease.ID)
... r = lease.time_to_live(keys=True)
... assert set(r.keys) == {b'foo', b'fizz'}
... assert lease.alive()
```

**Watch Util**
```python
>>> from etcd3 import Client
>>> client = Client()
>>> watcher=c.Watcher(all=True, progress_notify=True, prev_kv=True)
>>> w.onEvent('f.*', lambda e: print(e.key, e.value))
>>> w.runDaemon()
>>> # etcdctl put foo bar
>>> # etcdctl put foz bar
b'foo' b'bar'
b'foz' b'bar'
>>> w.stop()
```

TODO
====
## TODO

- [ ] benchmark
- [ ] python-etcd(etcd v2) compatible client
Expand Down
8 changes: 7 additions & 1 deletion etcd3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@
from .client import Client

AioClient = None
if six.PY3:
if six.PY3: # pragma: no cover
from .aio_client import AioClient

__all__.extend([
'Client',
'AioClient'
])

from .stateful import Txn

__all__.extend([
'Txn'
])
13 changes: 8 additions & 5 deletions etcd3/aio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def __anext__(self):
await AioClient._raise_for_status(self.resp)
data = await self.resp_iter.next()
data = json.loads(str(data, encoding='utf-8'))
if data.get('error'):
if data.get('error'): # pragma: no cover
# {"error":{"grpc_code":14,"http_code":503,"message":"rpc error: code = Unavailable desc = transport is closing","http_status":"Service Unavailable"}}
err = data.get('error')
raise get_client_error(err.get('message'), code=err.get('code'), status=err.get('http_code'))
Expand Down Expand Up @@ -116,7 +116,7 @@ async def next(self):
else:
self.i = 0
self.left_chunk += await self.resp.content.readany()
if not self.left_chunk:
if not self.left_chunk: # pragma: no cover
if self.buf:
raise Etcd3StreamError("Stream decode error", self.buf, self.resp)
raise StopAsyncIteration
Expand Down Expand Up @@ -219,9 +219,12 @@ def call_rpc(self, method, data=None, stream=False, encode=True, raw=False, **kw
kwargs.setdefault('headers', {}).setdefault('authorization', self.token)
kwargs.setdefault('headers', {}).setdefault('user_agent', self.user_agent)
kwargs.setdefault('headers', {}).update(self.headers)
if encode:
data = self._encodeRPCRequest(method, data)
resp = self._post(self._url(method), json=data or {}, **kwargs)
if isinstance(data, dict):
if encode:
data = self._encodeRPCRequest(method, data)
resp = self._post(self._url(method), json=data or {}, **kwargs)
else:
resp = self._post(self._url(method), data=data, **kwargs)
if raw:
return resp
if stream:
Expand Down
4 changes: 2 additions & 2 deletions etcd3/apis/extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def version(self):
self._raise_for_status(resp)
return EtcdVersion(**resp.json())

def metrics_raw(self):
def metrics_raw(self): # pragma: no cover
"""
get the raw /metrics text
Expand All @@ -26,7 +26,7 @@ def metrics_raw(self):
self._raise_for_status(resp)
return resp.content

def metrics(self):
def metrics(self): # pragma: no cover
"""
get the modelized metrics parsed by prometheus_client
"""
Expand Down
14 changes: 5 additions & 9 deletions etcd3/apis/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def lease_grant(self, TTL, ID=0):
deleted if the lease expires. Each expired key generates a delete event in the event history.
:type TTL: int
:param TTL: TTL is the advisory time-to-live in seconds.
:param TTL: TTL is the advisory time-to-live in seconds. the minimum value is 2s
:type ID: int
:param ID: ID is the requested ID for the lease. If ID is set to 0, the lessor chooses an ID.
"""
Expand All @@ -51,20 +51,16 @@ def lease_grant(self, TTL, ID=0):

# TODO: stream keepalive with context
# http://docs.python-requests.org/en/master/user/advanced/#chunk-encoded-requests
def lease_keep_alive(self, ID):
def lease_keep_alive(self, data):
"""
PLEASE USE THE Transaction util
PLEASE USE THE TRANSACTION UTIL
LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client
to the server and streaming keep alive responses from the server to the client.
:type ID: int
:param ID: ID is the lease ID for the lease to keep alive.
:param data: ID stream inputs of the lease to keep alive. which not works for now
"""
method = '/v3alpha/lease/keepalive'
data = {
"ID": ID
}
return self.call_rpc(method, data=data, stream=True)

def lease_keep_alive_once(self, ID):
Expand All @@ -78,5 +74,5 @@ def lease_keep_alive_once(self, ID):
:param ID: ID is the lease ID for the lease to keep alive.
"""

for i in self.lease_keep_alive(ID):
for i in self.lease_keep_alive(b'{"ID":%d}\n' % ID):
return i
95 changes: 78 additions & 17 deletions etcd3/apis/watch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import warnings

from .base import BaseAPI
from ..utils import check_param
from ..models import WatchCreateRequestFilterType
from ..utils import check_param, incr_last_byte, Etcd3Warning


class WatchAPI(BaseAPI):
Expand All @@ -26,20 +29,78 @@ def watch(self, create_request=None, cancel_request=None):
"cancel_request": cancel_request
}
data = {k: v for k, v in data.items() if v is not None}

# data = {
# "create_request": {
# "key": "string",
# "range_end": "string",
# "start_revision": "string",
# "progress_notify": True,
# "filters": [
# "NOPUT"
# ],
# "prev_kv": True
# },
# "cancel_request": {
# "watch_id": "string"
# }
# }
return self.call_rpc(method, data=data, stream=True)

@check_param(at_least_one_of=['key', 'all'], at_most_one_of=['range_end', 'prefix', 'all'])
def watch_create(self, key=None, range_end=None, start_revision=None, progress_notify=None, prev_kv=None,
prefix=False, all=False, no_put=False, no_delete=False):
"""
WatchCreate creates a watch stream on given key or key_range
:type key: str
:param key: key is the key to register for watching.
:type range_end: str
:param range_end: range_end is the end of the range [key, range_end) to watch. If range_end is not given,
only the key argument is watched. If range_end is equal to '\0', all keys greater than
or equal to the key argument are watched.
If the range_end is one bit larger than the given key,
then all keys with the prefix (the given key) will be watched.
:type start_revision: int
:param start_revision: start_revision is an optional revision to watch from (inclusive). No start_revision is "now".
:type progress_notify: bool
:param progress_notify: progress_notify is set so that the etcd server will periodically send a WatchResponse with
no events to the new watcher if there are no recent events. It is useful when clients
wish to recover a disconnected watcher starting from a recent known revision.
The etcd server may decide how often it will send notifications based on current load.
:type prev_kv: bool
:param prev_kv: If prev_kv is set, created watcher gets the previous KV before the event happens.
If the previous KV is already compacted, nothing will be returned.
:type prefix: bool
:param prefix: if the key is a prefix [default: False]
:type all: bool
:param all: all the keys [default: False]
:type no_put: bool
:param no_put: filter out the put events at server side before it sends back to the watcher. [default: False]
:type no_delete: bool
:param no_delete: filter out the delete events at server side before it sends back to the watcher. [default: False]
"""
if all:
key = range_end = '\0'
if prefix:
range_end = incr_last_byte(key)
filters = []
if no_put:
filters.append(WatchCreateRequestFilterType.NOPUT)
if no_delete:
filters.append(WatchCreateRequestFilterType.NODELETE)
data = {
"key": key,
"range_end": range_end,
"start_revision": start_revision,
"progress_notify": progress_notify,
"filters": filters,
"prev_kv": prev_kv
}
data = {k: v for k, v in data.items() if v is not None}
return self.watch(create_request=data)

def watch_cancel(self, watch_id): # pragma: no cover
"""
NOT SUPPORTED UNDER ETCD 3.3-
https://github.com/coreos/etcd/pull/9065
WatchCancel cancels a watch stream
:type watch_id: int
:param watch_id: watch_id is the watcher id to cancel so that no more events are transmitted.
"""
warnings.warn(
Etcd3Warning("there is no way to cancel a watch request, due to cannot get the watcher id\n"
"but it may be supported in the future: https://github.com/coreos/etcd/pull/9065")
)

data = {
"watch_id": watch_id
}
return self.watch(cancel_request=data)

0 comments on commit 3f7cd72

Please sign in to comment.