Skip to content
Permalink
Browse files
实现multicast的注册。还差一个下线处理
  • Loading branch information
JoeCao committed Apr 15, 2015
1 parent bacf949 commit 244c71fd4e1b32d4bbada63a423e3dd6b2c03a92
Showing 4 changed files with 117 additions and 74 deletions.
@@ -8,6 +8,7 @@
from registry import (
Registry,
ZookeeperRegistry,
MulticastRegistry
)
from config import (
ApplicationConfig,
@@ -1,10 +1,12 @@
# coding=utf-8
__author__ = 'caozupeng'
import os
import socket
import struct
from threading import Thread

from dubbo_client.config import ApplicationConfig


__author__ = 'caozupeng'
import urllib

from kazoo.protocol.states import KazooState
@@ -14,6 +16,11 @@


class Registry(object):
_service_provides = {}

def _do_event(self, event):
pass

def register(self, interface, **kwargs):
"""
客户端注册到注册中心,亮出自己的身份
@@ -32,62 +39,31 @@ def subscribe(self, interface, **kwargs):
"""
pass

def get_provides(self, provide_name, default=None, **kwargs):
def get_provides(self, interface, **kwargs):
"""
获取已经注册的服务URL对象
:param provide_name: com.ofpay.demo.api.UserProvider
:param interface: com.ofpay.demo.api.UserProvider
:param default:
:return: 返回一个dict的服务集合
"""
pass


class ZookeeperRegistry(Registry):
"""
所有注册过的服务端将在这里
interface=com.ofpay.demo.DemoService
location = ip:port/url 比如 172.19.20.111:38080/com.ofpay.demo.DemoService2
providername = servicename|version|group
dict 格式为{interface:{providername:{ip+port:service_url}}}
"""
_service_provides = {}
_app_config = ApplicationConfig('default_app')
_connect_state = 'UNCONNECT'


def __init__(self, zk_hosts, application_config=None):
if application_config:
self._app_config = application_config
self.__zk = KazooClient(hosts=zk_hosts)
self.__zk.add_listener(self.__state_listener)
self.__zk.start()

def __state_listener(self, state):
if state == KazooState.LOST:
# Register somewhere that the session was lost
self._connect_state = state
elif state == KazooState.SUSPENDED:
# Handle being disconnected from Zookeeper
# print 'disconnect from zookeeper'
self._connect_state = state
else:
# Handle being connected/reconnected to Zookeeper
# print 'connected'
self._connect_state = state
group = kwargs.get('group', '')
version = kwargs.get('version', '')
key = self._to_key(interface, version, group)
second = self._service_provides.get(interface, {})
return second.get(key, {})

def __event_listener(self, event):
def event_listener(self, event):
"""
node provides上下线的监听回调函数
:param event:
:return:
"""
self._do_event(event)

def __to_key(self, interface, versioin, group):
def _to_key(self, interface, versioin, group):
return '{0}|{1}|{2}'.format(interface, versioin, group)

def __handler_nodes(self, interface, nodes):
def _handler_nodes(self, interface, nodes):
"""
将zookeeper中查询到的服务节点列表加入到一个dict中
zookeeper中保持的节点url类似如下
@@ -107,7 +83,7 @@ def __handler_nodes(self, interface, nodes):
node = urllib.unquote(child_node).decode('utf8')
if node.startswith('jsonrpc'):
service_url = ServiceURL(node)
key = self.__to_key(service_url.interface, service_url.version, service_url.group)
key = self._to_key(service_url.interface, service_url.version, service_url.group)
second_dict = self._service_provides.get(interface)
if second_dict:
# 获取最内层的nest的dict
@@ -120,16 +96,49 @@ def __handler_nodes(self, interface, nodes):
# create the second dict
self._service_provides[interface] = {key: {service_url.location: service_url}}


class ZookeeperRegistry(Registry):
"""
所有注册过的服务端将在这里
interface=com.ofpay.demo.DemoService
location = ip:port/url 比如 172.19.20.111:38080/com.ofpay.demo.DemoService2
providername = servicename|version|group
dict 格式为{interface:{providername:{ip+port:service_url}}}
"""
_app_config = ApplicationConfig('default_app')
_connect_state = 'UNCONNECT'

def __init__(self, zk_hosts, application_config=None):
if application_config:
self._app_config = application_config
self.__zk = KazooClient(hosts=zk_hosts)
self.__zk.add_listener(self.__state_listener)
self.__zk.start()

def __state_listener(self, state):
if state == KazooState.LOST:
# Register somewhere that the session was lost
self._connect_state = state
elif state == KazooState.SUSPENDED:
# Handle being disconnected from Zookeeper
# print 'disconnect from zookeeper'
self._connect_state = state
else:
# Handle being connected/reconnected to Zookeeper
# print 'connected'
self._connect_state = state

def _do_event(self, event):
# event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
provide_name = event.path[7:event.path.rfind('/')]
if event.state == 'CONNECTED':
children = self.__zk.get_children(event.path, watch=self.__event_listener)
self.__handler_nodes(provide_name, children)
children = self.__zk.get_children(event.path, watch=self.event_listener)
self._handler_nodes(provide_name, children)
if event.state == 'DELETED':
children = self.__zk.get_children(event.path, watch=self.__event_listener)
self.__handler_nodes(provide_name, children)
children = self.__zk.get_children(event.path, watch=self.event_listener)
self._handler_nodes(provide_name, children)

def register(self, interface, **kwargs):
ip = self.__zk._connection._socket.getsockname()[0]
@@ -162,30 +171,53 @@ def subscribe(self, interface, **kwargs):
version = kwargs.get('version', '')
group = kwargs.get('group', '')
children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'providers'),
watch=self.__event_listener)
watch=self.event_listener)
# 全部重新添加
self.__handler_nodes(interface, children)
self._handler_nodes(interface, children)


class MulticastRegistry(Registry):

class _Loop(Thread):
def __init__(self, address, callback):
Thread.__init__(self)
multicast_group, multicast_port = address.split(':')
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.sock.bind(('', int(multicast_port)))
mreq = struct.pack("4sl", socket.inet_aton(multicast_group), socket.INADDR_ANY)
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self.callback = callback

def run(self):
while True:
event = self.sock.recv(10240)
# print event
self.callback(event.rstrip())

def __init__(self, address, application_config=None):
if application_config:
self._app_config = application_config
self._Loop(address, self.event_listener).start()

def _do_event(self, event):
if event.startswith('register'):
url = event[9:]
# print url
service_provide = ServiceURL(url)
self._handler_nodes(service_provide.interface, (url,))
# print self._service_provides

def get_provides(self, interface, **kwargs):
"""
获取已经注册的服务URL对象
:param interface: com.ofpay.demo.api.UserProvider
:param default:
:return: 返回一个dict的服务集合
"""
group = kwargs.get('group', '')
version = kwargs.get('version', '')
key = self.__to_key(interface, version, group)
second = self._service_provides.get(interface, {})
return second.get(key, {})


if __name__ == '__main__':
zk = KazooClient(hosts='192.168.59.103:2181')
zk.start()
parent_node = '{0}/{1}/{2}'.format('dubbo', 'com.ofpay.demo.api.UserProvider', 'consumers')
nodes = zk.get_children(parent_node)
for child_node in nodes:
node = urllib.unquote(child_node).decode('utf8')
print node
# zk = KazooClient(hosts='192.168.59.103:2181')
# zk.start()
# parent_node = '{0}/{1}/{2}'.format('dubbo', 'com.ofpay.demo.api.UserProvider', 'consumers')
# nodes = zk.get_children(parent_node)
# for child_node in nodes:
# node = urllib.unquote(child_node).decode('utf8')
# print node
# zk.delete(parent_node+'/'+child_node, recursive=True)
# registry = MulticastRegistry('224.5.6.7:1234')
pass
@@ -1,8 +1,16 @@
from dubbo_client import ZookeeperRegistry
from dubbo_client import ZookeeperRegistry, MulticastRegistry

__author__ = 'caozupeng'

if __name__ == '__main__':
def multicat():
registry = MulticastRegistry('224.5.6.7:1234')
registry.subscribe('com.ofpay.demo.api.UserProvider')
print registry.get_provides('com.ofpay.demo.api.UserProvider')

def zookeeper():
registry = ZookeeperRegistry('172.19.65.33:2181')
registry.subscribe('com.ofpay.demo.api.UserProvider')
print registry.get_provides('com.ofpay.demo.api.UserProvider')
print registry.get_provides('com.ofpay.demo.api.UserProvider')

if __name__ == '__main__':
multicat()
@@ -1,16 +1,18 @@
# coding=utf-8
import time

from dubbo_client import ZookeeperRegistry, DubboClient, DubboClientError, ApplicationConfig
from dubbo_client import ZookeeperRegistry, DubboClient, DubboClientError, ApplicationConfig, MulticastRegistry


__author__ = 'caozupeng'


if __name__ == '__main__':
config = ApplicationConfig('test_rpclib')
service_interface = 'com.ofpay.demo.api.UserProvider'
# 该对象较重,有zookeeper的连接,需要保存使用
registry = ZookeeperRegistry('192.168.59.103:2181', config)
# registry = ZookeeperRegistry('192.168.59.103:2181', config)
registry = MulticastRegistry('224.5.6.7:1234', config)
user_provider = DubboClient(service_interface, registry, version='2.0')
for i in range(1000):
try:

0 comments on commit 244c71f

Please sign in to comment.