diff --git a/lock/lock.py b/lock/lock.py index 45a5549..4dbb646 100644 --- a/lock/lock.py +++ b/lock/lock.py @@ -18,9 +18,10 @@ from twisted.python import failure from twisted.web import server -from . utils import parse_ip, parse_ips, escape -from . exceptions import KeyAlreadyExists, KeyNotFound, PaxosFailed -from . web import Root +from .utils import parse_ip, parse_ips, escape +from .exceptions import KeyAlreadyExists, KeyNotFound, PaxosFailed +from .web import Root +from .paxos import Paxos def stop_waiting(timeout): @@ -253,17 +254,21 @@ def connectionLost(self, reason): def lineReceived(self, line): self.log.info('RECV: ' + line) - parsed = shlex.split(line) - command = parsed[0] - args = parsed[1:] - try: - cmd = getattr(self, 'cmd_' + command) - except: - cmd = self.factory.find_callback(line) - if cmd is None: - raise RuntimeError('Unknown command "%s"' % command) + self.factory.paxos.recv(line, self) + #parsed = shlex.split(line) + #command = parsed[0] + #args = parsed[1:] + #try: + # cmd = getattr(self, 'cmd_' + command) + #except: + # cmd = self.factory.find_callback(line) + # if cmd is None: + # raise RuntimeError('Unknown command "%s"' % command) + + #cmd(client = self, *args) - cmd(client = self, *args) + def send(self, line, transport): + self.sendLine(line) def sendLine(self, line): @@ -280,8 +285,10 @@ class LockFactory(ClientFactory): protocol = LockProtocol def __init__(self, config): + self.paxos = Paxos(self, self.learn) + interface, port = parse_ip(config.get('myself', 'listen', '4001')) - server_list = parse_ips(config.get('cluster', 'nodes', '127.0.0.1:4001')) + self.neighbours = parse_ips(config.get('cluster', 'nodes', '127.0.0.1:4001')) self._first_connect_delay = float(config.get('cluster', 'first_connect_delay', 0)) self.port = port @@ -299,10 +306,6 @@ def __init__(self, config): self.connections = {} self._all_connections = [] - self.neighbours = [ - conn for conn in server_list - if conn != (self.interface, self.port) - ] # list of deferreds to be called when # connections with all other nodes will be established @@ -394,14 +397,14 @@ def set_key(self, key, value): # raise KeyAlreadyExists('Key "%s" already exists' % key) value = 'set-key %s "%s"' % (key, escape(value)) - return self._start_paxos(value) + return self.paxos.propose(value) def del_key(self, key): #if key not in self._keys: # raise KeyNotFound('Key "%s" not found' % key) value = 'del-key %s' % key - return self._start_paxos(value) + return self.paxos.propose(value) def add_connection(self, conn): self.connections[conn.other_side] = conn @@ -498,11 +501,37 @@ def clientConnectionFailed(self, connector, reason): reason )) + + # BEGIN Paxos Transport methods def broadcast(self, line): for connection in self.connections.values(): connection.sendLine(line) return len(self.connections) + @property + def quorum_size(self): + return max(2, len(self.connections)/ 2 + 1) + + def learn(self, num, value): + """First callback in the paxos result accepting chain.""" + logging.info('factory.learn %s' % (value,)) + + self.master = (self.interface, self.port) + self._log.append(value) + + splitted = shlex.split(value) + command_name, args = splitted[0], splitted[1:] + + command = '_log_cmd_' + command_name.replace('-', '_') + cmd = getattr(self, command) + + try: + return cmd(*args) + except Exception, e: + logging.exception('command "%s" failed' % command_name) + raise + + # END Paxos Transport methods def on_learn(self, num, value, client = None): logging.info('factory.on_learn %s' % (value,)) diff --git a/lock/paxos.py b/lock/paxos.py index 4af4132..9b4cbd5 100644 --- a/lock/paxos.py +++ b/lock/paxos.py @@ -5,6 +5,8 @@ from twisted.internet import reactor from collections import deque +from .utils import escape + base.DelayedCall.debug = True class PaxosError(RuntimeError): @@ -23,8 +25,9 @@ def _stop_waiting(timeout): class Paxos(object): - def __init__(self, transport, quorum_timeout=2): + def __init__(self, transport, on_learn, quorum_timeout=2): self.transport = transport + self.on_learn = on_learn self.quorum_timeout = quorum_timeout self.id = 0 @@ -61,7 +64,7 @@ def _start_paxos(self, value, deferred): def _timeout_callback(): print '+++ prepare timeout' - self.deferred.errback(PrepareTimeout) + self.deferred.errback(PrepareTimeout()) self._acks_timeout = reactor.callLater(self.quorum_timeout, _timeout_callback) self.transport.broadcast('paxos_prepare %s' % self.id) @@ -83,13 +86,13 @@ def paxos_ack(self, num, client): def _timeout_callback(): print '+++ accept timeout' - self.deferred.errback(AcceptTimeout) + self.deferred.errback(AcceptTimeout()) self._accepted_timeout = reactor.callLater( self.quorum_timeout, _timeout_callback ) - self.transport.broadcast('paxos_accept %s "%s"' % (self.id, self.proposed_value)) + self.transport.broadcast('paxos_accept %s "%s"' % (self.id, escape(self.proposed_value))) def paxos_accept(self, num, value, client): num = int(num) @@ -102,16 +105,18 @@ def paxos_accepted(self, num, client): self._num_accepts_to_wait -= 1 if self._num_accepts_to_wait == 0: _stop_waiting(self._accepted_timeout) - self.transport.broadcast('paxos_learn %s "%s"' % (self.id, self.proposed_value)) + self.transport.broadcast('paxos_learn %s "%s"' % (self.id, escape(self.proposed_value))) def paxos_learn(self, num, value, client): num = int(num) self.id = num - self.transport.learn(num, value) + + result = self.on_learn(num, value) + if self.deferred is not None and \ self.proposed_value == value: - self.deferred.callback((num, value)) + self.deferred.callback(result) if self.queue: # start new Paxos instance diff --git a/lock/test/test_paxos.py b/lock/test/test_paxos.py index 2b7eda6..2de3a60 100644 --- a/lock/test/test_paxos.py +++ b/lock/test/test_paxos.py @@ -25,6 +25,7 @@ def broadcast(self, message, from_transport): def learn(self, num, value): self.log.append((num, value)) + return (num, value) def wait_delayed_calls(self): return wait_calls( @@ -48,9 +49,6 @@ def broadcast(self, message): print '%s broadcasting "%s"' % (self.id, message) return self.network.broadcast(message, self) - def learn(self, num, value): - self.network.learn(num, value) - def send(self, message, from_transport): print '%s sending "%s" to %s' % (from_transport.id, message, self.id) self.network._delayed_calls.append( @@ -71,7 +69,7 @@ def setUp(self): self.net = Network() self.net.transports = [Transport(i, self.net) for i in xrange(5)] for tr in self.net.transports: - tr.paxos = Paxos(tr) + tr.paxos = Paxos(tr, on_learn=self.net.learn) def tearDown(self): for tr in self.net.transports: diff --git a/lock/web.py b/lock/web.py index e7b9984..87af5fd 100644 --- a/lock/web.py +++ b/lock/web.py @@ -15,7 +15,9 @@ from twisted.python.log import err from twisted.python.failure import Failure -from . exceptions import KeyAlreadyExists, KeyNotFound, PaxosFailed +from . exceptions import KeyAlreadyExists, KeyNotFound +from .paxos import PaxosError + def _get_key_from_path(path): return path[1:] @@ -26,6 +28,7 @@ def cb(): return 'Long Call Result' return deferLater(reactor, secs, cb) + def delayed(func): func = inlineCallbacks(func) @@ -92,7 +95,7 @@ def render_POST(self, request): yield self._lock.set_key(key, '') except KeyAlreadyExists: request.setResponseCode(CONFLICT) - except PaxosFailed: + except PaxosError: request.setResponseCode(EXPECTATION_FAILED) @@ -104,7 +107,7 @@ def render_DELETE(self, request): yield self._lock.del_key(key) except KeyNotFound: request.setResponseCode(NOT_FOUND) - except PaxosFailed: + except PaxosError: request.setResponseCode(EXPECTATION_FAILED)