Skip to content

Commit

Permalink
Merge "Fix the multi-backend storge issue for ZMQ."
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Aug 10, 2013
2 parents 94c720c + a4f6ab0 commit b28e706
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 96 deletions.
2 changes: 1 addition & 1 deletion cinder/openstack/common/rpc/__init__.py
Expand Up @@ -287,7 +287,7 @@ def queue_get_for(context, topic, host):
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
<host>.
"""
return '%s.%s' % (topic, host) if host else topic
return '%s:%s' % (topic, host) if host else topic


_RPCIMPL = None
Expand Down
152 changes: 60 additions & 92 deletions cinder/openstack/common/rpc/impl_zmq.py
Expand Up @@ -30,7 +30,6 @@
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
from cinder.openstack.common import processutils as utils
from cinder.openstack.common.rpc import common as rpc_common

zmq = importutils.try_import('eventlet.green.zmq')
Expand Down Expand Up @@ -85,8 +84,8 @@


def _serialize(data):
"""
Serialization wrapper
"""Serialization wrapper.
We prefer using JSON, but it cannot encode all types.
Error if a developer passes us bad data.
"""
Expand All @@ -98,18 +97,15 @@ def _serialize(data):


def _deserialize(data):
"""
Deserialization wrapper
"""
"""Deserialization wrapper."""
LOG.debug(_("Deserializing: %s"), data)
return jsonutils.loads(data)


class ZmqSocket(object):
"""
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
and connection management.
"""A tiny wrapper around ZeroMQ.
Simplifies the send/recv protocol and connection management.
Can be used as a Context (supports the 'with' statement).
"""

Expand Down Expand Up @@ -180,7 +176,7 @@ def close(self):
return

# We must unsubscribe, or we'll leak descriptors.
if len(self.subscriptions) > 0:
if self.subscriptions:
for f in self.subscriptions:
try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
Expand All @@ -199,26 +195,24 @@ def close(self):
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None

def recv(self):
def recv(self, **kwargs):
if not self.can_recv:
raise RPCException(_("You cannot recv on this socket."))
return self.sock.recv_multipart()
return self.sock.recv_multipart(**kwargs)

def send(self, data):
def send(self, data, **kwargs):
if not self.can_send:
raise RPCException(_("You cannot send on this socket."))
self.sock.send_multipart(data)
self.sock.send_multipart(data, **kwargs)


class ZmqClient(object):
"""Client for ZMQ sockets."""

def __init__(self, addr, socket_type=None, bind=False):
if socket_type is None:
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def __init__(self, addr):
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)

def cast(self, msg_id, topic, data, envelope=False):
def cast(self, msg_id, topic, data, envelope):
msg_id = msg_id or 0

if not envelope:
Expand Down Expand Up @@ -282,7 +276,7 @@ def _get_response(self, ctx, proxy, topic, data):
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
except rpc_common.ClientException, e:
except rpc_common.ClientException as e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
Expand Down Expand Up @@ -356,26 +350,23 @@ def process(self, proxy, ctx, data):


class ZmqBaseReactor(ConsumerBase):
"""
A consumer class implementing a
centralized casting broker (PULL-PUSH)
for RoundRobin requests.
"""A consumer class implementing a centralized casting broker (PULL-PUSH).
Used for RoundRobin requests.
"""

def __init__(self, conf):
super(ZmqBaseReactor, self).__init__()

self.mapping = {}
self.proxies = {}
self.threads = []
self.sockets = []
self.subscribe = {}

self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)

def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
zmq_type_out=None, in_bind=True, out_bind=True,
subscribe=None):
def register(self, proxy, in_addr, zmq_type_in,
in_bind=True, subscribe=None):

LOG.info(_("Registering reactor"))

Expand All @@ -391,21 +382,6 @@ def register(self, proxy, in_addr, zmq_type_in, out_addr=None,

LOG.info(_("In reactor registered"))

if not out_addr:
return

if zmq_type_out not in (zmq.PUSH, zmq.PUB):
raise RPCException("Bad output socktype")

# Items push out.
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)

self.mapping[inq] = outq
self.mapping[outq] = inq
self.sockets.append(outq)

LOG.info(_("Out reactor registered"))

def consume_in_thread(self):
def _consume(sock):
LOG.info(_("Consuming socket"))
Expand All @@ -430,10 +406,9 @@ def close(self):


class ZmqProxy(ZmqBaseReactor):
"""
A consumer class implementing a
topic-based proxy, forwarding to
IPC sockets.
"""A consumer class implementing a topic-based proxy.
Forwards to IPC sockets.
"""

def __init__(self, conf):
Expand All @@ -446,11 +421,8 @@ def __init__(self, conf):
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir

#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
topic = data[1]

LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
data = sock.recv(copy=False)
topic = data[1].bytes

if topic.startswith('fanout~'):
sock_type = zmq.PUB
Expand Down Expand Up @@ -492,9 +464,7 @@ def publisher(waiter):

while(True):
data = self.topic_proxy[topic].get()
out_sock.send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
{'data': data})
out_sock.send(data, copy=False)

wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
Expand All @@ -507,37 +477,34 @@ def publisher(waiter):

try:
self.topic_proxy[topic].put_nowait(data)
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
{'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})

def consume_in_thread(self):
"""Runs the ZmqProxy service"""
"""Runs the ZmqProxy service."""
ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None)

if not os.path.isdir(ipc_dir):
try:
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
try:
os.makedirs(ipc_dir)
except os.error:
if not os.path.isdir(ipc_dir):
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))

LOG.error(_("Required IPC directory does not exist at"
" %s") % (ipc_dir, ))
try:
self.register(consumption_proxy,
consume_in,
zmq.PULL,
out_bind=True)
zmq.PULL)
except zmq.ZMQError:
if os.access(ipc_dir, os.X_OK):
with excutils.save_and_reraise_exception():
LOG.error(_("Permission denied to IPC directory at"
" %s") % (ipc_dir, ))
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
Expand All @@ -547,8 +514,9 @@ def consume_in_thread(self):

def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
Expand All @@ -561,10 +529,9 @@ def unflatten_envelope(packenv):


class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
consumer for messages. Can also be
used as a 1:1 proxy
"""A consumer class implementing a consumer for messages.
Can also be used as a 1:1 proxy
"""

def __init__(self, conf):
Expand All @@ -574,11 +541,6 @@ def consume(self, sock):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
if sock in self.mapping:
LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
'data': data})
self.mapping[sock].send(data)
return

proxy = self.proxies[sock]

Expand Down Expand Up @@ -622,7 +584,7 @@ def create_consumer(self, topic, proxy, fanout=False):
else:
sock_type = zmq.PULL
subscribe = None
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
topic = '.'.join((topic, CONF.rpc_zmq_host))

if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered."))
Expand Down Expand Up @@ -751,10 +713,9 @@ def _call(addr, context, topic, msg, timeout=None,

def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
message to all relevant hosts.
"""Wraps the sending of messages.
Dispatches to the matchmaker and sends message to all relevant hosts.
"""
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
Expand All @@ -763,7 +724,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
LOG.debug(_("Sending message(s) to: %s"), queues)

# Don't stack if we have no matchmaker results
if len(queues) == 0:
if not queues:
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
Expand Down Expand Up @@ -807,12 +768,14 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
"""Send a message to all listening and expect no reply."""
# NOTE(ewindisch): fanout~ is used because it avoid splitting on .
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
LOG.error(_('topic is %s.') % topic)
if topic:
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)


def notify(conf, context, topic, msg, envelope):
"""
Send notification event.
"""Send notification event.
Notifications are sent to topic-priority.
This differs from the AMQP drivers which send to topic.priority.
"""
Expand Down Expand Up @@ -846,6 +809,11 @@ def _get_ctxt():
def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
matchmaker = importutils.import_object(
CONF.rpc_zmq_matchmaker, *args, **kwargs)
mm = CONF.rpc_zmq_matchmaker
if mm.endswith('matchmaker.MatchMakerRing'):
mm.replace('matchmaker', 'matchmaker_ring')
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
' %(new)s instead') % dict(
orig=CONF.rpc_zmq_matchmaker, new=mm))
matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker
4 changes: 2 additions & 2 deletions cinder/service.py
Expand Up @@ -356,7 +356,6 @@ def start(self):
version_string = version.version_string()
LOG.audit(_('Starting %(topic)s node (version %(version_string)s)'),
{'topic': self.topic, 'version_string': version_string})
self.manager.init_host()
self.model_disconnected = False
ctxt = context.get_admin_context()
try:
Expand All @@ -376,13 +375,14 @@ def start(self):
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)

node_topic = '%s.%s' % (self.topic, self.host)
node_topic = '%s:%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)

self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)

# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.manager.init_host()

if self.report_interval:
pulse = utils.LoopingCall(self.report_state)
Expand Down
2 changes: 1 addition & 1 deletion cinder/tests/test_volume_rpcapi.py
Expand Up @@ -94,7 +94,7 @@ def _test_volume_api(self, method, rpc_method, **kwargs):
host = kwargs['host']
else:
host = kwargs['volume']['host']
expected_topic = '%s.%s' % (CONF.volume_topic, host)
expected_topic = '%s:%s' % (CONF.volume_topic, host)

self.fake_args = None
self.fake_kwargs = None
Expand Down

0 comments on commit b28e706

Please sign in to comment.