Skip to content
Permalink
Browse files
抽离providers到commons对象,注册中心使用线程进行event处理
  • Loading branch information
JoeCao committed Apr 8, 2015
1 parent 4cbf4e6 commit e7a761a7d072b2d5742173f05b508f4b3d0e557b
Showing 5 changed files with 103 additions and 48 deletions.
@@ -0,0 +1,29 @@
#encoding=utf-8
from urlparse import urlparse, parse_qsl

__author__ = 'caozupeng'


class ServiceProvider(object):
protocol = 'jsonrpc'
location = '' # ip+port
path = '' # like /com.qianmi.dubbo.UserProvider
ip = '127.0.0.1'
port = '9090'

def __init__(self, url):
result = urlparse(url)
self.protocol = result[0]
self.location = result[1]
self.path = result[2]
if self.location.find(':') > -1:
self.ip, self.port = result[1].split(':')
params = parse_qsl(result[4])
for key, value in params:
# url has a default.timeout property, but it can not add in python object
# so keep the last one
pos = key.find('.')
if pos > -1:
key = key[pos + 1:]
print key
self.__dict__[key] = value
@@ -1,16 +1,24 @@
# coding=utf-8
import Queue
from threading import Thread
import urllib
from urlparse import urlparse, parse_qsl

from kazoo.protocol.states import KazooState

from dubbo_client.common import ServiceProvider


__author__ = 'caozupeng'
from kazoo.client import KazooClient
import logging

logging.basicConfig()
zk = KazooClient(hosts='172.19.65.33:2181', read_only=True)
"""
所有注册过的服务端将在这里
格式为{providername:{ip+port:service}}
providername = group_version_servicename
"""
service_provides = {}


@@ -29,57 +37,69 @@ def state_listener(state):
zk.add_listener(state_listener)


class JsonProvide(object):
protocol = 'jsonrpc'
location = ''
path = ''
ip = '127.0.0.1'
port = '9090'

def __init__(self, url):
result = urlparse(url)
self.protocol = result[0]
self.location = result[1]
self.path = result[2]
if self.location.find(':') > -1:
self.ip, self.port = result[1].split(':')
params = parse_qsl(result[4])
for key, value in params:
# url has a default.timeout property, but it can not add in python object
# so keep the last one
pos = key.find('.')
if pos > -1:
key = key[pos + 1:]
print key
self.__dict__[key] = value


def node_listener(event):
print event
event_queue.put(event)


def add_provider_listener(provide_name):
children = zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'), watch=node_listener)
for child_node in children:
def handler_urls(urls):
for child_node in urls:
url = urllib.unquote(child_node).decode('utf8')
if url.startswith('jsonrpc'):
provide = JsonProvide(url)
provide = ServiceProvider(url)
service_key = service_provides.get(provide.interface)
if service_key:
service_key[provide.location] = provide
else:
service_provides[provide.interface] = {provide.location: provide}


def add_provider_listener(provide_name):
#如果已经存在,首先删除原有的服务的集合
if provide_name in service_provides:
del service_provides[provide_name]
children = zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'), watch=node_listener)
#全部重新添加
handler_urls(children)


num_worker_threads = 2


def worker():
while True:
event = event_queue.get()
do_event(event)
event_queue.task_done()


event_queue = Queue.Queue()


def do_event(event):
# event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
provide_name = event.path[7:event.path.rfind('/')]
if provide_name in service_provides:
del service_provides[provide_name]
if event.state == 'CONNECTED':
children = zk.get_children(event.path, watch=node_listener)
handler_urls(children)
if event.state == 'DELETED':
children = zk.get_children(event.path, watch=node_listener)
handler_urls(children)

for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = False
t.start()


zk.start()
event_queue.join()

if __name__ == '__main__':
zk.start()
if zk.exists("/dubbo"):
# Print the version of a node and its data
# children = zk.get_children("/dubbo")
# print "There are {0} children".format(len(children))
# for node in children:
# print node
add_provider_listener('com.ofpay.demo.api.UserProvider')

@@ -24,7 +24,7 @@ def raw_client(service_interface, app_params):


class DubboClient(object):
clients = []
interface = ''

class _Method(object):

@@ -36,14 +36,16 @@ def __call__(self, *args, **kwargs):
return self.client_instance.call(self.method, *args, **kwargs)

def __init__(self, interface):
self.interface = interface
add_provider_listener(interface)
provides = service_provides.get(interface, ())
if len(provides) > 0:
for location, provide in provides.items():
self.clients.append(HttpClient(url="http://{0}{1}".format(location, provide.path)))

def call(self, method, *args, **kwargs):
client = random.choice(self.clients)
provides = service_provides.get(self.interface, ())
if len(provides) == 0:
return None
location, provide = random.choice(provides.items())
print 'location is {0}'.format(location)
client = HttpClient(url="http://{0}{1}".format(location, provide.path))
return client.call(method, *args, **kwargs)

def __call__(self, method, *args, **kwargs):

This file was deleted.

@@ -1,13 +1,19 @@
# coding=utf-8
import time

from dubbo_client import DubboClient


__author__ = 'caozupeng'

if __name__ == '__main__':
service_interface = 'com.ofpay.demo.api.UserProvider'
dubbo_client = DubboClient(service_interface)
print dubbo_client.getUser('A003')
print dubbo_client.queryUser(
{u'age': 18, u'time': 1428463514153, u'sex': u'MAN', u'id': u'A003', u'name': u'zhangsan'})
print dubbo_client.queryAll()
print dubbo_client.isLimit('MAN', 'Joe')
for i in range(1000):
print dubbo_client.getUser('A003')
print dubbo_client.queryUser(
{u'age': 18, u'time': 1428463514153, u'sex': u'MAN', u'id': u'A003', u'name': u'zhangsan'})
print dubbo_client.queryAll()
print dubbo_client.isLimit('MAN', 'Joe')
print dubbo_client('getUser', 'A005')
time.sleep(5)

0 comments on commit e7a761a

Please sign in to comment.