From cb520b0947faf8be2678bc0f1cb7f1a8656beb03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E7=A5=96=E9=B9=8F?= Date: Tue, 7 Apr 2015 19:37:42 +0800 Subject: [PATCH] first upload --- README.md | 8 ++++ dubbo_client/__init__.py | 1 + dubbo_client/config.py | 1 + dubbo_client/registry.py | 85 +++++++++++++++++++++++++++++++++++ dubbo_client/rpc.py | 35 +++++++++++++++ dubbo_client/tools.py | 2 + test_dubbo_client/__init__.py | 1 + 7 files changed, 133 insertions(+) create mode 100644 README.md create mode 100644 dubbo_client/__init__.py create mode 100644 dubbo_client/config.py create mode 100644 dubbo_client/registry.py create mode 100644 dubbo_client/rpc.py create mode 100644 dubbo_client/tools.py create mode 100644 test_dubbo_client/__init__.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..9b0c6d6 --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +### Python Dubbo Client + +##Python调用Dubbo接口的jsonrpc协议 +请使用dubbo-rpc-jsonrpc + +##在客户端实现负载均衡,服务发现 +通过注册中心的zookeeper,获取服务的注册信息 +然后通过代理实现负载均衡算法,调用服务端 \ No newline at end of file diff --git a/dubbo_client/__init__.py b/dubbo_client/__init__.py new file mode 100644 index 0000000..681fccd --- /dev/null +++ b/dubbo_client/__init__.py @@ -0,0 +1 @@ +__author__ = 'caozupeng' diff --git a/dubbo_client/config.py b/dubbo_client/config.py new file mode 100644 index 0000000..681fccd --- /dev/null +++ b/dubbo_client/config.py @@ -0,0 +1 @@ +__author__ = 'caozupeng' diff --git a/dubbo_client/registry.py b/dubbo_client/registry.py new file mode 100644 index 0000000..ae98d80 --- /dev/null +++ b/dubbo_client/registry.py @@ -0,0 +1,85 @@ +# encoding=utf-8 +import urllib +from urlparse import urlparse, parse_qsl + +from kazoo.protocol.states import KazooState + + +__author__ = 'caozupeng' +from kazoo.client import KazooClient +import logging + +logging.basicConfig() +zk = KazooClient(hosts='172.19.65.33:2181', read_only=True) +service_provides = {} + + +def state_listener(state): + if state == KazooState.LOST: + # Register somewhere that the session was lost + print 'session lost' + elif state == KazooState.SUSPENDED: + # Handle being disconnected from Zookeeper + print 'disconnect from zookeeper' + else: + # Handle being connected/reconnected to Zookeeper + print 'connected' + + +zk.add_listener(state_listener) + + +class JsonProvide(object): + protocol = 'jsonrpc' + location = '' + path = '' + ip = '127.0.0.1' + port = '9090' + + def __init__(self, url): + result = urlparse(url) + self.protocol = result[0] + self.location = result[1] + self.path = result[2] + if self.location.find(':') > -1: + self.ip, self.port = result[1].split(':') + params = parse_qsl(result[4]) + for key, value in params: + # url has a default.timeout property, but it can not add in python object + # so keep the last one + pos = key.find('.') + if pos > -1: + key = key[pos + 1:] + print key + self.__dict__[key] = value + + +def node_listener(event): + print event + + +def add_provider_listener(provide_name): + children = zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'), watch=node_listener) + for child_node in children: + url = urllib.unquote(child_node).decode('utf8') + if url.startswith('jsonrpc'): + provide = JsonProvide(url) + service_key = service_provides.get(provide.interface) + if service_key: + service_key[provide.location] = provide + else: + service_provides[provide.interface] = {provide.location: provide} + + +zk.start() + +if __name__ == '__main__': + zk.start() + if zk.exists("/dubbo"): + # Print the version of a node and its data + # children = zk.get_children("/dubbo") + # print "There are {0} children".format(len(children)) + # for node in children: + # print node + add_provider_listener('com.ofpay.demo.api.UserProvider') + diff --git a/dubbo_client/rpc.py b/dubbo_client/rpc.py new file mode 100644 index 0000000..fb2b7fa --- /dev/null +++ b/dubbo_client/rpc.py @@ -0,0 +1,35 @@ +import httplib +import json + +from dubbo_client.registry import service_provides, add_provider_listener + + +__author__ = 'caozupeng' + + +def raw_client(service_interface, app_params): + headers = {"Content-type": "application/json-rpc", + "Accept": "text/json"} + provides = service_provides.get(service_interface, ()) + if len(provides) > 0: + location, first = provides.items().pop() + h1 = httplib.HTTPConnection(first.ip, port=int(first.port)) + h1.request("POST", first.path, json.dumps(app_params), headers) + response = h1.getresponse() + return response.read(), None + else: + return None, 'can not find the provide of {0}'.format(service_interface) + + +if __name__ == '__main__': + app_params = { + "jsonrpc": "2.0", + "method": "getUser", + "params": ["A001"], + "id": 1 + } + service_interface = 'com.ofpay.demo.api.UserProvider' + add_provider_listener(service_interface) + ret, error = raw_client(service_interface, app_params) + if not error: + print json.loads(ret, encoding='utf-8') \ No newline at end of file diff --git a/dubbo_client/tools.py b/dubbo_client/tools.py new file mode 100644 index 0000000..bfc1c30 --- /dev/null +++ b/dubbo_client/tools.py @@ -0,0 +1,2 @@ +#encoding=utf-8 +__author__ = 'caozupeng' diff --git a/test_dubbo_client/__init__.py b/test_dubbo_client/__init__.py new file mode 100644 index 0000000..681fccd --- /dev/null +++ b/test_dubbo_client/__init__.py @@ -0,0 +1 @@ +__author__ = 'caozupeng'