diff --git a/rewind/server/rewind.py b/rewind/server/rewind.py index a8ea6e3..f83ba37 100644 --- a/rewind/server/rewind.py +++ b/rewind/server/rewind.py @@ -18,11 +18,13 @@ from __future__ import print_function from __future__ import absolute_import import argparse +import atexit import contextlib import hashlib import itertools import logging import os +import signal import sys import types import uuid @@ -67,14 +69,28 @@ class _RewindRunner(object): """ def __init__(self, eventstore, incoming_socket, query_socket, - streaming_socket, exit_message=None): - """Constructor.""" + streaming_socket, quitter_socket): + """Constructor. + + Parameters: + eventstore -- the event store to use. + incoming_socket -- the socket where incoming events are received. Only + recv() calls are being made on this socket. + query_socket -- the socket where event queries are coming in on. It + is expected to be of type zmq.REQ. + streaming_socket -- the socket where all incoming events are being + published to event handlers. Only send() calls are + being made on this socket. + quitter_socket -- socket used to signal that `_RewindRunner` shall + quit its main loop. Only recv() calls are being + made on this socket. + + """ self.eventstore = eventstore self.incoming_socket = incoming_socket self.query_socket = query_socket self.streaming_socket = streaming_socket - assert exit_message is None or isinstance(exit_message, bytes) - self.exit_message = exit_message + self.quitter_socket = quitter_socket self.oldid = '' self.id_generator = _IdGenerator(key_exists=lambda key: @@ -84,11 +100,13 @@ def __init__(self, eventstore, incoming_socket, query_socket, self.poller = zmq.Poller() self.poller.register(incoming_socket, zmq.POLLIN) self.poller.register(query_socket, zmq.POLLIN) + self.poller.register(quitter_socket, zmq.POLLIN) def run(self): - """Main loop for `_RewindRunner`. + """Main loop that does all the message passing. - Runs the program infinitely, or until an exit message is received. + Runs the program infinitely, or until a message is received on the + quitter socket. """ while self._handle_one_message(): @@ -107,23 +125,23 @@ def _handle_one_message(self): if (self.incoming_socket in socks and socks[self.incoming_socket] == zmq.POLLIN): - return self._handle_incoming_event() + self._handle_incoming_event() elif (self.query_socket in socks and socks[self.query_socket] == zmq.POLLIN): - return self._handle_query() - - def _handle_incoming_event(self): - """Handle an incoming event. + self._handle_query() + elif (self.quitter_socket in socks + and socks[self.quitter_socket] == zmq.POLLIN): + # We received a quit message and will now die + _logger.info("Received quit message. Request end of main loop.") + return False - Returns True if further messages should be received, False otherwise - (it should quit, that is). + # We want to continue receiving more events/requests. + return True - """ + def _handle_incoming_event(self): + """Handle an incoming event.""" eventstr = self.incoming_socket.recv() - if self.exit_message and eventstr == self.exit_message: - return False - newid = self.id_generator.generate() # Make sure newid is not part of our request vocabulary @@ -141,15 +159,8 @@ def _handle_incoming_event(self): self.oldid = newid - return True - def _handle_query(self): - """Handle an event query. - - Returns True if further messages should be received, False otherwise - (it should quit, that is). - - """ + """Handle an event query.""" requesttype = self.query_socket.recv() if requesttype == b"QUERY": assert self.query_socket.getsockopt(zmq.RCVMORE) @@ -167,7 +178,7 @@ def _handle_query(self): _logger.exception("A client requested a key that does not" " exist:") self.query_socket.send(b"ERROR Key did not exist") - return True + return # Since we are using ZeroMQ enveloping we want to cap the # maximum number of messages that are send for each request. @@ -198,8 +209,6 @@ def _handle_query(self): self.query_socket.recv() self.query_socket.send(b"ERROR Unknown request type") - return True - @contextlib.contextmanager def _zmq_context_context(*args): @@ -223,15 +232,71 @@ def _zmq_socket_context(context, socket_type, bind_endpoints): socket.close() +QUITTER_SOCKET_ADDRESS = 'inproc://rewind-quitter' +quitter_context = zmq.Context(1) +# Not technically needed since ZeroMQ shutdown will close all sockets +atexit.register(lambda: quitter_context.term()) + + +def _stop_mainloop(): + """Stop a currently running mainloop.""" + quitter_commander_socket = quitter_context.socket(zmq.PUB) + with contextlib.closing(quitter_commander_socket): + quitter_commander_socket.connect(QUITTER_SOCKET_ADDRESS) + quitter_commander_socket.send(b'QUIT, PLEASE') + + def run(args): + """Run Rewind and make sure we exit cleanly on SIGINT. + + Parameters: + args -- a list of command line parameters (omitting the initial + program list item given in `sys.argv`. + + """ + quitter_socket = quitter_context.socket(zmq.SUB) + with contextlib.closing(quitter_socket): + # Socket that will receive a message when asked to quit + quitter_socket.setsockopt(zmq.SUBSCRIBE, b'') + quitter_socket.bind(QUITTER_SOCKET_ADDRESS) + + # Registering SIGINT system event handler that send a quit message + # to the quitter_socket. + try: + # Making sure that we never ever get terminated while writing to + # file etcetera. When this signal is received there are two cases: + # 1. If we are in a ZeroMQ blocking call (poll or recv), ZeroMQ + # will throw an exception (this we will not catch and Rewind + # will exit). This is the most likely scenario. See + # http://www.zeromq.org/docs:2-1-upgrade for some info about + # this. + # 2. If we are in Rewind code, the signal will be caught and will + # ask Rewind to shut down cleanly by sending a message to the + # quitter socket. + signal.signal(signal.SIGINT, lambda _x, _y: _stop_mainloop()) + except ValueError as e: + # This happens if we are not in mainloop, which is the case + # when running tests. + _logger.warning('Not registering SIGINT handler: ' + str(e), + exc_info=True) + pass + else: + _logger.info('Succesfully registered SIGINT handler.') + + _run_quittable(args, quitter_socket) + + +def _run_quittable(args, quitter_socket): """Actually execute the program. Calling this method can be done from tests to simulate executing the application from command line. Parameters: - args -- a list of command line parameters (omitting the initial program - list item given in `sys.argv`. + args -- a list of command line parameters (omitting the initial + program list item given in `sys.argv`. + context -- the ZeroMQ context to use for sockets. + quitter_socket -- socket used to signal that Rewind should quit. returns -- a proposed exit code for the application. @@ -279,9 +344,7 @@ def log_creator(filename): # exception or similar. runner = _RewindRunner(eventstore, incoming_socket, query_socket, - streaming_socket, (args.exit_message.encode() - if args.exit_message - else None)) + streaming_socket, quitter_socket) runner.run() return 0 @@ -301,9 +364,6 @@ def main(argv=None): parser = argparse.ArgumentParser( description='Event storage and event proxy.' ) - parser.add_argument('--exit-codeword', metavar="MSG", dest="exit_message", - help="An incoming message that makes Rewind quit." - " Used for testing.") parser.add_argument('--datadir', '-D', metavar="DIR", help="The directory where events will be persisted." " Will be created if non-existent. Without this" diff --git a/rewind/server/test/test_rewind.py b/rewind/server/test/test_rewind.py index 53e67d6..c695fa3 100644 --- a/rewind/server/test/test_rewind.py +++ b/rewind/server/test/test_rewind.py @@ -20,6 +20,7 @@ import itertools import re import shutil +import signal import sys import tempfile import threading @@ -100,7 +101,7 @@ def testStartingWithPersistence(self): '--streaming-bind-endpoint', 'tcp://127.0.0.1:8091', '--datadir', datapath] print(" ".join(args)) - self.rewind = _RewindRunnerThread(args, 'tcp://127.0.0.1:8090') + self.rewind = _RewindRunnerThread(args) self.rewind.start() time.sleep(3) @@ -123,9 +124,7 @@ class _RewindRunnerThread(threading.Thread): """ - _EXIT_CODE = b'EXIT' - - def __init__(self, cmdline_args, exit_addr=None): + def __init__(self, cmdline_args): """Constructor. Parameters: @@ -135,12 +134,6 @@ def __init__(self, cmdline_args, exit_addr=None): """ thread = self - assert '--exit-codeword' not in cmdline_args, \ - "'--exit-codeword' is added by _RewindRunnerThread. Not elsewhere" - cmdline_args = (['--exit-codeword', - _RewindRunnerThread._EXIT_CODE.decode()] + - cmdline_args) - def exitcode_runner(*args, **kwargs): try: thread.exit_code = rewind.main(*args, **kwargs) @@ -153,21 +146,13 @@ def exitcode_runner(*args, **kwargs): super(_RewindRunnerThread, self).__init__(target=exitcode_runner, name="test-rewind", args=(cmdline_args,)) - self._exit_addr = exit_addr - def stop(self, context=None): + def stop(self): """Send a stop message to the event thread.""" - assert self._exit_addr is not None - - if context is None: - context = zmq.Context(1) - socket = context.socket(zmq.PUSH) - socket.setsockopt(zmq.LINGER, 1000) - socket.connect(self._exit_addr) - socket.send(_RewindRunnerThread._EXIT_CODE) + rewind._stop_mainloop() + time.sleep(0.5) # Acceptable exit time assert not self.isAlive() - socket.close() class TestReplication(unittest.TestCase): @@ -181,7 +166,7 @@ def setUp(self): """Starting a Rewind instance to test replication.""" args = ['--incoming-bind-endpoint', 'tcp://127.0.0.1:8090', '--streaming-bind-endpoint', 'tcp://127.0.0.1:8091'] - self.rewind = _RewindRunnerThread(args, 'tcp://127.0.0.1:8090') + self.rewind = _RewindRunnerThread(args) self.rewind.start() self.context = zmq.Context(3) @@ -263,7 +248,7 @@ def tearDown(self): self.assertTrue(self.rewind.isAlive(), "Did rewind crash? Not running.") - self.rewind.stop(self.context) + self.rewind.stop() self.assertFalse(self.rewind.isAlive(), "Rewind should not have been running. It was.") @@ -289,7 +274,7 @@ def setUp(self): args = ['--incoming-bind-endpoint', 'tcp://127.0.0.1:8090', '--query-bind-endpoint', 'tcp://127.0.0.1:8091'] - self.rewind = _RewindRunnerThread(args, 'tcp://127.0.0.1:8090') + self.rewind = _RewindRunnerThread(args) self.rewind.start() self.context = zmq.Context(3) @@ -492,7 +477,7 @@ def tearDown(self): self.assertTrue(self.rewind.isAlive(), "Did rewind crash? Not running.") - self.rewind.stop(self.context) + self.rewind.stop() self.assertFalse(self.rewind.isAlive(), "Rewind should not have been running. It was.")