Skip to content

Commit

Permalink
finish base client
Browse files Browse the repository at this point in the history
  • Loading branch information
revol.cai committed Feb 9, 2018
1 parent b778714 commit 3a45cca
Show file tree
Hide file tree
Showing 21 changed files with 3,316 additions and 24 deletions.
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ insert_final_newline = true
charset = utf-8
end_of_line = lf

[*.json]
indent_size = 2

[*.bat]
indent_style = tab
end_of_line = crlf
Expand Down
11 changes: 1 addition & 10 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ etcd3-py
:target: https://pyup.io/repos/github/Revolution1/etcd3-py/
:alt: Python 3

Python client for etcd v3 (Using grpc-json-gateway) Edit
Python client for etcd v3 (Using grpc-json-gateway)


* Free software: Apache Software License 2.0
Expand All @@ -32,12 +32,3 @@ Features
--------

* TODO

Credits
---------

This package was created with Cookiecutter_ and the `audreyr/cookiecutter-pypackage`_ project template.

.. _Cookiecutter: https://github.com/audreyr/cookiecutter
.. _`audreyr/cookiecutter-pypackage`: https://github.com/audreyr/cookiecutter-pypackage

5 changes: 1 addition & 4 deletions etcd3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# -*- coding: utf-8 -*-

"""Top-level package for etcd3-py."""

__author__ = """Renjie Cai"""
__email__ = 'revol.cai@gmail.com'
__version__ = '0.0.1'
from __version__ import *
3 changes: 3 additions & 0 deletions etcd3/__version__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__author__ = 'Renjie Cai'
__email__ = 'revol.cai@gmail.com'
__version__ = '0.0.1'
10 changes: 10 additions & 0 deletions etcd3/apis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from abc import abstractmethod, ABCMeta

import six


@six.add_metaclass(ABCMeta)
class BaseAPI(object):
@abstractmethod
def call_rpc(self, method, data=None, stream=False, **kwargs):
pass
58 changes: 58 additions & 0 deletions etcd3/apis/maintenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from . import BaseAPI
from ..models import AlarmAction


class MaintenanceAPI(BaseAPI):

def alarm_request(self, alarm_action, member_id, alarm_type):
"""
Alarm activates, deactivates, and queries alarms regarding cluster health.
:param alarm_action: AlarmAction
:param member_id: int
:param alarm_type:
"""
method = '/v3alpha/maintenance/alarm'
data = {
"action": alarm_action,
"memberID": member_id,
"alarm": alarm_type
}
return self.call_rpc(method, data=data)

def get_alarm(self, member_id, alarm_type):
return self.alarm_request(AlarmAction.GET, member_id, alarm_type)

def activate_alarm(self, member_id, alarm_type):
return self.alarm_request(AlarmAction.ACTIVATE, member_id, alarm_type)

def deactivate_alarm(self, member_id, alarm_type):
return self.alarm_request(AlarmAction.DEACTIVATE, member_id, alarm_type)

def defragment(self):
"""
Defragment defragments a member's backend database to recover storage space.
"""
method = '/v3alpha/maintenance/defragment'
return self.call_rpc(method)

def hash(self):
"""
Hash returns the hash of the local KV state for consistency checking purpose.
This is designed for testing; do not use this in production when there are ongoing transactions.
"""
method = '/v3alpha/maintenance/hash'
return self.call_rpc(method)

def snapshot(self):
"""
Snapshot sends a snapshot of the entire backend from a member over a stream to a client.
"""
method = '/v3alpha/maintenance/snapshot'
return self.call_rpc(method)

def status(self):
"""
Status gets the status of the member.
"""
method = '/v3alpha/maintenance/status'
return self.call_rpc(method)
173 changes: 173 additions & 0 deletions etcd3/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import json
import os
import sys
import traceback

import requests
from six.moves import urllib_parse

from __version__ import __version__
from apis import BaseAPI
from errors import Etcd3Exception
from swagger_helper import SwaggerSpec

rpc_swagger_json = os.path.join(os.path.dirname(__file__), 'rpc.swagger.json')

swaggerSpec = SwaggerSpec(rpc_swagger_json)


def iter_response(resp):
"""
yield response content by every json object
we don't yield by line, because the content of gRPC-JSON-Gateway stream response
does not have a delimiter between each object by default.
https://github.com/grpc-ecosystem/grpc-gateway/pull/497/files
:param resp: Response
:return: dict
"""
buf = []
bracket_flag = 0
for c in resp.iter_content():
buf.append(c)
if c == '{':
bracket_flag += 1
elif c == '}':
bracket_flag -= 1
if bracket_flag == 0:
s = ''.join(buf)
buf = []
if s:
yield json.loads(s)


class Etcd3APIClient(BaseAPI):
response_class = requests.models.Response

def __init__(self, host='localhost', port=2379, protocol='http',
ca_cert=None, cert_key=None, cert_cert=None,
timeout=None, headers=None, user_agent=None, pool_size=30,
user=None, password=None, token=None):
self.session = requests.session()
self.host = host
self.port = port
self.cert = None
self.ca_cert = ca_cert
self.cert_key = cert_key
self.cert_cert = cert_cert
if ca_cert or cert_key and cert_cert:
self.protocol = 'https'
if ca_cert:
self.cert = ca_cert
if cert_cert and cert_key:
self.cert = (cert_cert, cert_key)
self.user_agent = user_agent
if not user_agent:
self.user_agent = 'etcd3-py/' + __version__
self.protocol = protocol
self.timeout = timeout
self.headers = headers or {}
self.user = user
self.password = password
self.token = token

@property
def baseurl(self):
return '{}://{}:{}'.format(self.protocol, self.host, self.port)

def _url(self, method):
return urllib_parse.urljoin(self.baseurl, method)

def encodeRPCRequest(self, method, data):
swpath = swaggerSpec.getPath(method)
if not swpath:
return data
reqSchema = swpath.post.parameters[0].schema
return reqSchema.encode(data)

@classmethod
def decodeRPCResponse(cls, method, data):
if isinstance(data, cls.response_class):
data = data.json()
swpath = swaggerSpec.getPath(method)
if not swpath:
return data
reqSchema = swpath.post.responses._200.schema
return reqSchema.decode(data)

@classmethod
def modelizeStreamResponse(cls, method, resp, decode=True):
swpath = swaggerSpec.getPath(method)
respModel = swpath.post.responses._200.schema.getModel()
for data in iter_response(resp):
data = data.get('result', {}) # the real data is been put under the key: 'result'
if decode:
data = cls.decodeRPCResponse(method, data)
yield respModel(data)

@classmethod
def modelizeResponse(cls, method, resp, decode=True):
if isinstance(resp, cls.response_class):
resp = resp.json()
if decode:
resp = cls.decodeRPCResponse(method, resp)
swpath = swaggerSpec.getPath(method)
respModel = swpath.post.responses._200.schema.getModel()
return respModel(resp)

@staticmethod
def raise_for_status(resp):
status = resp.status_code
if status < 400:
return
try:
data = resp.json()
except:
_, _, tb = sys.exc_info()
error = ''.join(traceback.format_tb(tb))
code = -1
else:
error = data.get('error')
code = data.get('code')
raise Etcd3Exception(error, code, status)

def call_rpc(self, method, data=None, stream=False, encode=True, raw=False, **kwargs):
"""
call ETCDv3 RPC and return response object
:type method: str
:param method: the rpc method, which is a path of RESTful API
:type data: dict
:param data: request payload to be post to ETCD's gRPC-JSON-Gateway default: {}
:type stream: bool
:param stream: whether return a stream response object, default: False
:type encode: bool
:param encode: whether encode the data before post, default: True
:param kwargs: additional params to pass to the http request, like headers, timeout etc.
:return: Etcd3RPCResponseModel or Etcd3StreamingResponse
"""
data = data or {}
kwargs.setdefault('timeout', self.timeout)
kwargs.setdefault('cert', self.cert)
kwargs.setdefault('headers', {}).setdefault('user_agent', self.user_agent)
kwargs.setdefault('headers', {}).update(self.headers)
if encode:
data = self.encodeRPCRequest(method, data)
resp = self.session.post(self._url(method), json=data or {}, stream=stream, **kwargs)
self.raise_for_status(resp)
if raw:
return resp
if stream:
return self.modelizeStreamResponse(method, resp)
return self.modelizeResponse(method, resp)


if __name__ == '__main__':
client = Etcd3APIClient()
# print(client.call_rpc('/v3alpha/maintenance/status').json())
# print(client.call_rpc('/v3alpha/kv/put', {'key': 'foo', 'value': 'bar'}).json())
# print(client.call_rpc('/v3alpha/kv/put', {'key': 'foo', 'value': 'bar', 'prev_kv': True}).json())
# print(client.call_rpc('/v3alpha/kv/range', {'key': 'foo'}).json())
# print(client.call_rpc('/v3alpha/kv/range', {'key': 'foa'}).json())
r = client.call_rpc('/v3alpha/watch', {'create_request': {'key': 'foo'}}, stream=True)
for i in r:
print(i)
print(client.call_rpc('/v3alpha/kv/rang', {'key': 'foa'}).json())
1 change: 1 addition & 0 deletions etcd3/errors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from errors import Etcd3Exception
20 changes: 20 additions & 0 deletions etcd3/errors/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from go_grpc_codes import codeText


class Etcd3Exception(Exception):
def __init__(self, error, code, status):
self.code = code
self.codeText = codeText[code]
self.status = status
self.error = error

def __repr__(self):
return "Etcd3Exception(code='%s', message='%s')" % (self.code, self.error)

def as_dict(self):
return {
'error': self.error,
'code': self.code,
'codeText': self.codeText,
'status': self.status
}

0 comments on commit 3a45cca

Please sign in to comment.