Skip to content
Permalink
Browse files
添加group和version的支持
  • Loading branch information
JoeCao committed Apr 10, 2015
1 parent d6ee551 commit 6286c955ed387a4dd0966fb5bb979e14be16437b
Showing 6 changed files with 82 additions and 30 deletions.
@@ -10,6 +10,8 @@ class ServiceURL(object):
path = '' # like /com.qianmi.dubbo.UserProvider
ip = '127.0.0.1'
port = '9090'
version = ''
group = ''

def __init__(self, url):
result = urlparse(url)
@@ -1 +1,8 @@
# coding=utf-8
__author__ = 'caozupeng'


class ReferenceConfig(object):
registry = None
interface = ''
version = ''
@@ -2,8 +2,10 @@

__author__ = 'caozupeng'
import urllib

from kazoo.protocol.states import KazooState
from kazoo.client import KazooClient

from dubbo_client.common import ServiceURL


@@ -16,7 +18,7 @@ def add_provider_listener(self, provide_name):
"""
pass

def get_provides(self, provide_name, default=None):
def get_provides(self, provide_name, default=None, **kwargs):
"""
获取已经注册的服务URL对象
:param provide_name: com.ofpay.demo.api.UserProvider
@@ -29,8 +31,11 @@ def get_provides(self, provide_name, default=None):
class ZookeeperRegistry(Registry):
"""
所有注册过的服务端将在这里
格式为{providername:{ip+port:service}}
providername = group_version_servicename
interface=com.ofpay.demo.DemoService
location = ip:port/url 比如 172.19.20.111:38080/com.ofpay.demo.DemoService2
providername = servicename|version|group
dict 格式为{interface:{providername:{ip+port:service_url}}}
"""
__service_provides = {}
__connect_state = 'UNCONNECT'
@@ -61,57 +66,71 @@ def __event_listener(self, event):
"""
self.__do_event(event)

def __handler_nodes(self, nodes):
def __to_key(self, interface, versioin, group):
return '{0}|{1}|{2}'.format(interface, versioin, group)

def __handler_nodes(self, interface, nodes):
"""
将zookeeper中查询到的服务节点列表加入到一个dict中
:param nodes: 节点列表
:return: 不需要返回
"""
# 如果已经存在,首先删除原有的服务的集合
if interface in self.__service_provides:
del self.__service_provides[interface]
for child_node in nodes:
node = urllib.unquote(child_node).decode('utf8')
if node.startswith('jsonrpc'):
service_url = ServiceURL(node)
service_key = self.__service_provides.get(service_url.interface)
if service_key:
service_key[service_url.location] = service_url
key = self.__to_key(service_url.interface, service_url.version, service_url.group)
second_dict = self.__service_provides.get(interface)
if second_dict:
# 获取最内层的nest的dict
inner_dict = second_dict.get(key)
if inner_dict:
inner_dict[service_url.location] = service_url
else:
second_dict[key] = {service_url.location: service_url}
else:
self.__service_provides[service_url.interface] = {service_url.location: service_url}
# create the second dict
self.__service_provides[interface] = {key: {service_url.location: service_url}}

def __do_event(self, event):
# event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
provide_name = event.path[7:event.path.rfind('/')]
if provide_name in self.__service_provides:
del self.__service_provides[provide_name]
if event.state == 'CONNECTED':
children = self.__zk.get_children(event.path, watch=self.__event_listener)
self.__handler_nodes(children)
self.__handler_nodes(provide_name, children)
if event.state == 'DELETED':
children = self.__zk.get_children(event.path, watch=self.__event_listener)
self.__handler_nodes(children)
self.__handler_nodes(provide_name, children)

def add_provider_listener(self, provide_name):
def add_provider_listener(self, interface, **kwargs):
"""
监听注册中心的服务上下线
:param provide_name: 类似com.ofpay.demo.api.UserProvider这样的服务名
:param interface: 类似com.ofpay.demo.api.UserProvider这样的服务名
:return: 无返回
"""
# 如果已经存在,首先删除原有的服务的集合
if provide_name in self.__service_provides:
del self.__service_provides[provide_name]
children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'),
version = kwargs.get('version', '')
group = kwargs.get('group', '')
children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'providers'),
watch=self.__event_listener)
# 全部重新添加
self.__handler_nodes(children)
self.__handler_nodes(interface, children)

def get_provides(self, provide_name, default=None):
def get_provides(self, interface, default=None, **kwargs):
"""
获取已经注册的服务URL对象
:param provide_name: com.ofpay.demo.api.UserProvider
:param interface: com.ofpay.demo.api.UserProvider
:param default:
:return: 返回一个dict的服务集合
"""
return self.__service_provides.get(provide_name, default)
group = kwargs.get('group', '')
version = kwargs.get('version', '')
key = self.__to_key(interface, version, group)
second = self.__service_provides.get(interface, {})
return second.get(key, default)


if __name__ == '__main__':
@@ -1,13 +1,19 @@
# coding=utf-8
import random
from urllib2 import HTTPError

from pyjsonrpc import HttpClient, JsonRpcError

from dubbo_client.rpcerror import NoProvider, ConnectionFail, dubbo_client_errors, InternalError


__author__ = 'caozupeng'


class DubboClient(object):
interface = ''
group = ''
version = ''

class _Method(object):

@@ -18,17 +24,20 @@ def __init__(self, client_instance, method):
def __call__(self, *args, **kwargs):
return self.client_instance.call(self.method, *args, **kwargs)

def __init__(self, interface, registry):
def __init__(self, interface, registry, **kwargs):
self.interface = interface
self.registry = registry
self.group = kwargs.get('group', '')
self.version = kwargs.get('version', '')
self.registry.add_provider_listener(interface)

def call(self, method, *args, **kwargs):
provides = self.registry.get_provides(self.interface, {})
provides = self.registry.get_provides(self.interface, {}, version=self.version, group=self.group)
if len(provides) == 0:
raise NoProvider('can not find provide', self.interface)
location, provide = random.choice(provides.items())
client = HttpClient(url="http://{0}{1}".format(location, provide.path))
ip_port, service_url = random.choice(provides.items())
print service_url.location
client = HttpClient(url="http://{0}{1}".format(ip_port, service_url.path))
try:
return client.call(method, *args, **kwargs)
except HTTPError, e:
@@ -48,7 +57,6 @@ def __getattr__(self, method):
"""
Allows the usage of attributes as *method* names.
"""

return self._Method(client_instance=self, method=method)


@@ -0,0 +1,16 @@
from pyjsonrpc import HttpClient

__author__ = 'caozupeng'

def test_client_every_new():
user_provider = HttpClient(url="http://{0}{1}".format('172.19.3.111:38081/', 'com.ofpay.demo.api.UserProvider2'))
print user_provider.getUser('A003')
print user_provider.queryUser(
{u'age': 18, u'time': 1428463514153, u'sex': u'MAN', u'id': u'A003', u'name': u'zhangsan'})
print user_provider.queryAll()
print user_provider.isLimit('MAN', 'Joe')
print user_provider('getUser', 'A005')


if __name__ == '__main__':
test_client_every_new()
@@ -1,16 +1,16 @@
# coding=utf-8
import time

from user_provider import DubboClient, DubboClientError
from user_provider import ZookeeperRegistry
from dubbo_client import ZookeeperRegistry, DubboClient, DubboClientError


__author__ = 'caozupeng'

if __name__ == '__main__':
service_interface = 'com.ofpay.demo.api.UserProvider'
# 该对象较重,有zookeeper的连接,需要保存使用
registry = ZookeeperRegistry('172.19.65.33:2181')
user_provider = DubboClient(service_interface, registry)
user_provider = DubboClient(service_interface, registry, version='3.0')
for i in range(1000):
try:
print user_provider.getUser('A003')

0 comments on commit 6286c95

Please sign in to comment.