Skip to content
Permalink
Browse files
将Registry代码合并为一个Class
  • Loading branch information
JoeCao committed Apr 9, 2015
1 parent 5dcf7c9 commit 24f113fd4307d44aad04cb390d6e2e5a1c9e77e2
Showing 6 changed files with 117 additions and 130 deletions.
@@ -3,4 +3,9 @@
from rpc import (
DubboClient,
)
from rpcerror import *
from rpcerror import *

from registry import (
Registry,
ZookeeperRegistry,
)
@@ -25,5 +25,5 @@ def __init__(self, url):
pos = key.find('.')
if pos > -1:
key = key[pos + 1:]
print key
# print key
self.__dict__[key] = value
@@ -1,6 +1,4 @@
# coding=utf-8
import Queue
from threading import Thread
import urllib

from kazoo.protocol.states import KazooState
@@ -10,96 +8,105 @@

__author__ = 'caozupeng'
from kazoo.client import KazooClient
import logging

logging.basicConfig()
zk = KazooClient(hosts='172.19.65.33:2181', read_only=True)
"""
所有注册过的服务端将在这里
格式为{providername:{ip+port:service}}
providername = group_version_servicename
"""
service_provides = {}

class Registry(object):
def add_provider_listener(self, provide_name):
"""
监听注册中心的服务上下线
:param provide_name: 类似com.ofpay.demo.api.UserProvider这样的服务名
:return: 无返回
"""
pass

def get_provides(self, provide_name, default=None):
"""
获取已经注册的服务URL对象
:param provide_name: com.ofpay.demo.api.UserProvider
:param default:
:return: 返回一个dict的服务集合
"""
pass


class ZookeeperRegistry(Registry):
"""
所有注册过的服务端将在这里
格式为{providername:{ip+port:service}}
providername = group_version_servicename
"""
__service_provides = {}
__connect_state = 'UNCONNECT'

def __init__(self, hosts):
self.__zk = KazooClient(hosts=hosts, read_only=True)
self.__zk.add_listener(self.__state_listener)
self.__zk.start()

def __state_listener(self, state):
if state == KazooState.LOST:
# Register somewhere that the session was lost
self.__connect_state = state
elif state == KazooState.SUSPENDED:
# Handle being disconnected from Zookeeper
print 'disconnect from zookeeper'
self.__connect_state = state
else:
# Handle being connected/reconnected to Zookeeper
print 'connected'
self.__connect_state = state

def __event_listener(self, event):
self.__do_event(event)

def __handler_urls(self, urls):
for child_node in urls:
url = urllib.unquote(child_node).decode('utf8')
if url.startswith('jsonrpc'):
provide = ServiceProvider(url)
service_key = self.__service_provides.get(provide.interface)
if service_key:
service_key[provide.location] = provide
else:
self.__service_provides[provide.interface] = {provide.location: provide}

def __do_event(self, event):
# event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
provide_name = event.path[7:event.path.rfind('/')]
if provide_name in self.__service_provides:
del self.__service_provides[provide_name]
if event.state == 'CONNECTED':
children = self.__zk.get_children(event.path, watch=self.__event_listener)
self.__handler_urls(children)
if event.state == 'DELETED':
children = self.__zk.get_children(event.path, watch=self.__event_listener)
self.__handler_urls(children)

def add_provider_listener(self, provide_name):
"""
监听注册中心的服务上下线
:param provide_name: 类似com.ofpay.demo.api.UserProvider这样的服务名
:return: 无返回
"""
# 如果已经存在,首先删除原有的服务的集合
if provide_name in self.__service_provides:
del self.__service_provides[provide_name]
children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'),
watch=self.__event_listener)
# 全部重新添加
self.__handler_urls(children)

def get_provides(self, provide_name, default=None):
"""
获取已经注册的服务URL对象
:param provide_name: com.ofpay.demo.api.UserProvider
:param default:
:return: 返回一个dict的服务集合
"""
return self.__service_provides.get(provide_name, default)

def state_listener(state):
if state == KazooState.LOST:
# Register somewhere that the session was lost
print 'session lost'
elif state == KazooState.SUSPENDED:
# Handle being disconnected from Zookeeper
print 'disconnect from zookeeper'
else:
# Handle being connected/reconnected to Zookeeper
print 'connected'


zk.add_listener(state_listener)


def node_listener(event):
# print event
event_queue.put(event)


def handler_urls(urls):
for child_node in urls:
url = urllib.unquote(child_node).decode('utf8')
if url.startswith('jsonrpc'):
provide = ServiceProvider(url)
service_key = service_provides.get(provide.interface)
if service_key:
service_key[provide.location] = provide
else:
service_provides[provide.interface] = {provide.location: provide}


def add_provider_listener(provide_name):
#如果已经存在,首先删除原有的服务的集合
if provide_name in service_provides:
del service_provides[provide_name]
children = zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'), watch=node_listener)
#全部重新添加
handler_urls(children)


num_worker_threads = 2


def worker():
while True:
event = event_queue.get()
do_event(event)
event_queue.task_done()


event_queue = Queue.Queue()


def do_event(event):
# event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
provide_name = event.path[7:event.path.rfind('/')]
if provide_name in service_provides:
del service_provides[provide_name]
if event.state == 'CONNECTED':
children = zk.get_children(event.path, watch=node_listener)
handler_urls(children)
if event.state == 'DELETED':
children = zk.get_children(event.path, watch=node_listener)
handler_urls(children)

for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()


zk.start()
event_queue.join()

if __name__ == '__main__':
zk.start()
if zk.exists("/dubbo"):
add_provider_listener('com.ofpay.demo.api.UserProvider')
pass

@@ -1,30 +1,15 @@
import httplib
import json
import random
from urllib2 import HTTPError

from pyjsonrpc import HttpClient, JsonRpcError

from dubbo_client.registry import service_provides, add_provider_listener

from dubbo_client.rpcerror import NoProvider, ConnectionFail, dubbo_client_errors


__author__ = 'caozupeng'


def raw_client(service_interface, app_params):
headers = {"Content-type": "application/json-rpc",
"Accept": "text/json"}
provides = service_provides.get(service_interface, ())
if len(provides) > 0:
location, first = provides.items().pop()
h1 = httplib.HTTPConnection(first.ip, port=int(first.port))
h1.request("POST", first.path, json.dumps(app_params), headers)
response = h1.getresponse()
return response.read(), None
else:
return None, 'can not find the provide of {0}'.format(service_interface)


class DubboClient(object):
interface = ''

@@ -37,12 +22,13 @@ def __init__(self, client_instance, method):
def __call__(self, *args, **kwargs):
return self.client_instance.call(self.method, *args, **kwargs)

def __init__(self, interface):
def __init__(self, interface, registry):
self.interface = interface
add_provider_listener(interface)
self.registry = registry
self.registry.add_provider_listener(interface)

def call(self, method, *args, **kwargs):
provides = service_provides.get(self.interface, {})
provides = self.registry.get_provides(self.interface, {})
if len(provides) == 0:
raise NoProvider('can not find provide', self.interface)
location, provide = random.choice(provides.items())
@@ -70,14 +56,4 @@ def __getattr__(self, method):


if __name__ == '__main__':
app_params = {
"jsonrpc": "2.0",
"method": "getUser",
"params": ["A001"],
"id": 1
}
service_interface = 'com.ofpay.demo.api.UserProvider'
add_provider_listener(service_interface)
ret, error = raw_client(service_interface, app_params)
if not error:
print json.loads(ret, encoding='utf-8')
pass
@@ -1,11 +1,8 @@
from dubbo_client.registry import zk
from dubbo_client import ZookeeperRegistry

__author__ = 'caozupeng'

if __name__ == '__main__':
if zk.exists("/dubbo"):
# Print the version of a node and its data
children = zk.get_children("/dubbo")
print "There are {0} children".format(len(children))
for node in children:
print node
registry = ZookeeperRegistry('172.19.65.33:2181')
registry.add_provider_listener('com.ofpay.demo.api.UserProvider')
print registry.get_provides('com.ofpay.demo.api.UserProvider')
@@ -2,13 +2,15 @@
import time

from dubbo_client import DubboClient, DubboClientError
from dubbo_client import ZookeeperRegistry


__author__ = 'caozupeng'

if __name__ == '__main__':
service_interface = 'com.ofpay.demo.api.UserProvider'
dubbo_client = DubboClient(service_interface)
registry = ZookeeperRegistry('172.19.65.33:2181')
dubbo_client = DubboClient(service_interface, registry)
for i in range(1000):
try:
print dubbo_client.getUser('A003')

0 comments on commit 24f113f

Please sign in to comment.