Skip to content
This repository has been archived by the owner on Apr 28, 2023. It is now read-only.

Commit

Permalink
Use in-process quitter socket instead of exit msg
Browse files Browse the repository at this point in the history
This solves issue #24.
  • Loading branch information
JensRantil committed Nov 4, 2012
1 parent c933c38 commit 3b4ca7b
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 61 deletions.
132 changes: 96 additions & 36 deletions rewind/server/rewind.py
Expand Up @@ -18,11 +18,13 @@
from __future__ import print_function
from __future__ import absolute_import
import argparse
import atexit
import contextlib
import hashlib
import itertools
import logging
import os
import signal
import sys
import types
import uuid
Expand Down Expand Up @@ -67,14 +69,28 @@ class _RewindRunner(object):
"""

def __init__(self, eventstore, incoming_socket, query_socket,
streaming_socket, exit_message=None):
"""Constructor."""
streaming_socket, quitter_socket):
"""Constructor.
Parameters:
eventstore -- the event store to use.
incoming_socket -- the socket where incoming events are received. Only
recv() calls are being made on this socket.
query_socket -- the socket where event queries are coming in on. It
is expected to be of type zmq.REQ.
streaming_socket -- the socket where all incoming events are being
published to event handlers. Only send() calls are
being made on this socket.
quitter_socket -- socket used to signal that `_RewindRunner` shall
quit its main loop. Only recv() calls are being
made on this socket.
"""
self.eventstore = eventstore
self.incoming_socket = incoming_socket
self.query_socket = query_socket
self.streaming_socket = streaming_socket
assert exit_message is None or isinstance(exit_message, bytes)
self.exit_message = exit_message
self.quitter_socket = quitter_socket
self.oldid = ''

self.id_generator = _IdGenerator(key_exists=lambda key:
Expand All @@ -84,11 +100,13 @@ def __init__(self, eventstore, incoming_socket, query_socket,
self.poller = zmq.Poller()
self.poller.register(incoming_socket, zmq.POLLIN)
self.poller.register(query_socket, zmq.POLLIN)
self.poller.register(quitter_socket, zmq.POLLIN)

def run(self):
"""Main loop for `_RewindRunner`.
"""Main loop that does all the message passing.
Runs the program infinitely, or until an exit message is received.
Runs the program infinitely, or until a message is received on the
quitter socket.
"""
while self._handle_one_message():
Expand All @@ -107,23 +125,23 @@ def _handle_one_message(self):

if (self.incoming_socket in socks and
socks[self.incoming_socket] == zmq.POLLIN):
return self._handle_incoming_event()
self._handle_incoming_event()
elif (self.query_socket in socks
and socks[self.query_socket] == zmq.POLLIN):
return self._handle_query()

def _handle_incoming_event(self):
"""Handle an incoming event.
self._handle_query()
elif (self.quitter_socket in socks
and socks[self.quitter_socket] == zmq.POLLIN):
# We received a quit message and will now die
_logger.info("Received quit message. Request end of main loop.")
return False

Returns True if further messages should be received, False otherwise
(it should quit, that is).
# We want to continue receiving more events/requests.
return True

"""
def _handle_incoming_event(self):
"""Handle an incoming event."""
eventstr = self.incoming_socket.recv()

if self.exit_message and eventstr == self.exit_message:
return False

newid = self.id_generator.generate()

# Make sure newid is not part of our request vocabulary
Expand All @@ -141,15 +159,8 @@ def _handle_incoming_event(self):

self.oldid = newid

return True

def _handle_query(self):
"""Handle an event query.
Returns True if further messages should be received, False otherwise
(it should quit, that is).
"""
"""Handle an event query."""
requesttype = self.query_socket.recv()
if requesttype == b"QUERY":
assert self.query_socket.getsockopt(zmq.RCVMORE)
Expand All @@ -167,7 +178,7 @@ def _handle_query(self):
_logger.exception("A client requested a key that does not"
" exist:")
self.query_socket.send(b"ERROR Key did not exist")
return True
return

# Since we are using ZeroMQ enveloping we want to cap the
# maximum number of messages that are send for each request.
Expand Down Expand Up @@ -198,8 +209,6 @@ def _handle_query(self):
self.query_socket.recv()
self.query_socket.send(b"ERROR Unknown request type")

return True


@contextlib.contextmanager
def _zmq_context_context(*args):
Expand All @@ -223,15 +232,71 @@ def _zmq_socket_context(context, socket_type, bind_endpoints):
socket.close()


QUITTER_SOCKET_ADDRESS = 'inproc://rewind-quitter'
quitter_context = zmq.Context(1)
# Not technically needed since ZeroMQ shutdown will close all sockets
atexit.register(lambda: quitter_context.term())


def _stop_mainloop():
"""Stop a currently running mainloop."""
quitter_commander_socket = quitter_context.socket(zmq.PUB)
with contextlib.closing(quitter_commander_socket):
quitter_commander_socket.connect(QUITTER_SOCKET_ADDRESS)
quitter_commander_socket.send(b'QUIT, PLEASE')


def run(args):
"""Run Rewind and make sure we exit cleanly on SIGINT.
Parameters:
args -- a list of command line parameters (omitting the initial
program list item given in `sys.argv`.
"""
quitter_socket = quitter_context.socket(zmq.SUB)
with contextlib.closing(quitter_socket):
# Socket that will receive a message when asked to quit
quitter_socket.setsockopt(zmq.SUBSCRIBE, b'')
quitter_socket.bind(QUITTER_SOCKET_ADDRESS)

# Registering SIGINT system event handler that send a quit message
# to the quitter_socket.
try:
# Making sure that we never ever get terminated while writing to
# file etcetera. When this signal is received there are two cases:
# 1. If we are in a ZeroMQ blocking call (poll or recv), ZeroMQ
# will throw an exception (this we will not catch and Rewind
# will exit). This is the most likely scenario. See
# http://www.zeromq.org/docs:2-1-upgrade for some info about
# this.
# 2. If we are in Rewind code, the signal will be caught and will
# ask Rewind to shut down cleanly by sending a message to the
# quitter socket.
signal.signal(signal.SIGINT, lambda _x, _y: _stop_mainloop())
except ValueError as e:
# This happens if we are not in mainloop, which is the case
# when running tests.
_logger.warning('Not registering SIGINT handler: ' + str(e),
exc_info=True)
pass
else:
_logger.info('Succesfully registered SIGINT handler.')

_run_quittable(args, quitter_socket)


def _run_quittable(args, quitter_socket):
"""Actually execute the program.
Calling this method can be done from tests to simulate executing the
application from command line.
Parameters:
args -- a list of command line parameters (omitting the initial program
list item given in `sys.argv`.
args -- a list of command line parameters (omitting the initial
program list item given in `sys.argv`.
context -- the ZeroMQ context to use for sockets.
quitter_socket -- socket used to signal that Rewind should quit.
returns -- a proposed exit code for the application.
Expand Down Expand Up @@ -279,9 +344,7 @@ def log_creator(filename):
# exception or similar.

runner = _RewindRunner(eventstore, incoming_socket, query_socket,
streaming_socket, (args.exit_message.encode()
if args.exit_message
else None))
streaming_socket, quitter_socket)
runner.run()

return 0
Expand All @@ -301,9 +364,6 @@ def main(argv=None):
parser = argparse.ArgumentParser(
description='Event storage and event proxy.'
)
parser.add_argument('--exit-codeword', metavar="MSG", dest="exit_message",
help="An incoming message that makes Rewind quit."
" Used for testing.")
parser.add_argument('--datadir', '-D', metavar="DIR",
help="The directory where events will be persisted."
" Will be created if non-existent. Without this"
Expand Down
35 changes: 10 additions & 25 deletions rewind/server/test/test_rewind.py
Expand Up @@ -20,6 +20,7 @@
import itertools
import re
import shutil
import signal
import sys
import tempfile
import threading
Expand Down Expand Up @@ -100,7 +101,7 @@ def testStartingWithPersistence(self):
'--streaming-bind-endpoint', 'tcp://127.0.0.1:8091',
'--datadir', datapath]
print(" ".join(args))
self.rewind = _RewindRunnerThread(args, 'tcp://127.0.0.1:8090')
self.rewind = _RewindRunnerThread(args)
self.rewind.start()

time.sleep(3)
Expand All @@ -123,9 +124,7 @@ class _RewindRunnerThread(threading.Thread):
"""

_EXIT_CODE = b'EXIT'

def __init__(self, cmdline_args, exit_addr=None):
def __init__(self, cmdline_args):
"""Constructor.
Parameters:
Expand All @@ -135,12 +134,6 @@ def __init__(self, cmdline_args, exit_addr=None):
"""
thread = self

assert '--exit-codeword' not in cmdline_args, \
"'--exit-codeword' is added by _RewindRunnerThread. Not elsewhere"
cmdline_args = (['--exit-codeword',
_RewindRunnerThread._EXIT_CODE.decode()] +
cmdline_args)

def exitcode_runner(*args, **kwargs):
try:
thread.exit_code = rewind.main(*args, **kwargs)
Expand All @@ -153,21 +146,13 @@ def exitcode_runner(*args, **kwargs):
super(_RewindRunnerThread, self).__init__(target=exitcode_runner,
name="test-rewind",
args=(cmdline_args,))
self._exit_addr = exit_addr

def stop(self, context=None):
def stop(self):
"""Send a stop message to the event thread."""
assert self._exit_addr is not None

if context is None:
context = zmq.Context(1)
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.LINGER, 1000)
socket.connect(self._exit_addr)
socket.send(_RewindRunnerThread._EXIT_CODE)
rewind._stop_mainloop()

time.sleep(0.5) # Acceptable exit time
assert not self.isAlive()
socket.close()


class TestReplication(unittest.TestCase):
Expand All @@ -181,7 +166,7 @@ def setUp(self):
"""Starting a Rewind instance to test replication."""
args = ['--incoming-bind-endpoint', 'tcp://127.0.0.1:8090',
'--streaming-bind-endpoint', 'tcp://127.0.0.1:8091']
self.rewind = _RewindRunnerThread(args, 'tcp://127.0.0.1:8090')
self.rewind = _RewindRunnerThread(args)
self.rewind.start()

self.context = zmq.Context(3)
Expand Down Expand Up @@ -263,7 +248,7 @@ def tearDown(self):

self.assertTrue(self.rewind.isAlive(),
"Did rewind crash? Not running.")
self.rewind.stop(self.context)
self.rewind.stop()
self.assertFalse(self.rewind.isAlive(),
"Rewind should not have been running. It was.")

Expand All @@ -289,7 +274,7 @@ def setUp(self):

args = ['--incoming-bind-endpoint', 'tcp://127.0.0.1:8090',
'--query-bind-endpoint', 'tcp://127.0.0.1:8091']
self.rewind = _RewindRunnerThread(args, 'tcp://127.0.0.1:8090')
self.rewind = _RewindRunnerThread(args)
self.rewind.start()

self.context = zmq.Context(3)
Expand Down Expand Up @@ -492,7 +477,7 @@ def tearDown(self):

self.assertTrue(self.rewind.isAlive(),
"Did rewind crash? Not running.")
self.rewind.stop(self.context)
self.rewind.stop()
self.assertFalse(self.rewind.isAlive(),
"Rewind should not have been running. It was.")

Expand Down

0 comments on commit 3b4ca7b

Please sign in to comment.