Navigation Menu

Skip to content

Commit

Permalink
Old code was replaced with new Paxos class.
Browse files Browse the repository at this point in the history
  • Loading branch information
svetlyak40wt committed Feb 19, 2011
1 parent 370b31e commit 3e0b1f5
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 34 deletions.
69 changes: 49 additions & 20 deletions lock/lock.py
Expand Up @@ -18,9 +18,10 @@
from twisted.python import failure
from twisted.web import server

from . utils import parse_ip, parse_ips, escape
from . exceptions import KeyAlreadyExists, KeyNotFound, PaxosFailed
from . web import Root
from .utils import parse_ip, parse_ips, escape
from .exceptions import KeyAlreadyExists, KeyNotFound, PaxosFailed
from .web import Root
from .paxos import Paxos


def stop_waiting(timeout):
Expand Down Expand Up @@ -253,17 +254,21 @@ def connectionLost(self, reason):

def lineReceived(self, line):
self.log.info('RECV: ' + line)
parsed = shlex.split(line)
command = parsed[0]
args = parsed[1:]
try:
cmd = getattr(self, 'cmd_' + command)
except:
cmd = self.factory.find_callback(line)
if cmd is None:
raise RuntimeError('Unknown command "%s"' % command)
self.factory.paxos.recv(line, self)
#parsed = shlex.split(line)
#command = parsed[0]
#args = parsed[1:]
#try:
# cmd = getattr(self, 'cmd_' + command)
#except:
# cmd = self.factory.find_callback(line)
# if cmd is None:
# raise RuntimeError('Unknown command "%s"' % command)

#cmd(client = self, *args)

cmd(client = self, *args)
def send(self, line, transport):
self.sendLine(line)


def sendLine(self, line):
Expand All @@ -280,8 +285,10 @@ class LockFactory(ClientFactory):
protocol = LockProtocol

def __init__(self, config):
self.paxos = Paxos(self, self.learn)

interface, port = parse_ip(config.get('myself', 'listen', '4001'))
server_list = parse_ips(config.get('cluster', 'nodes', '127.0.0.1:4001'))
self.neighbours = parse_ips(config.get('cluster', 'nodes', '127.0.0.1:4001'))
self._first_connect_delay = float(config.get('cluster', 'first_connect_delay', 0))

self.port = port
Expand All @@ -299,10 +306,6 @@ def __init__(self, config):

self.connections = {}
self._all_connections = []
self.neighbours = [
conn for conn in server_list
if conn != (self.interface, self.port)
]

# list of deferreds to be called when
# connections with all other nodes will be established
Expand Down Expand Up @@ -394,14 +397,14 @@ def set_key(self, key, value):
# raise KeyAlreadyExists('Key "%s" already exists' % key)

value = 'set-key %s "%s"' % (key, escape(value))
return self._start_paxos(value)
return self.paxos.propose(value)

def del_key(self, key):
#if key not in self._keys:
# raise KeyNotFound('Key "%s" not found' % key)

value = 'del-key %s' % key
return self._start_paxos(value)
return self.paxos.propose(value)

def add_connection(self, conn):
self.connections[conn.other_side] = conn
Expand Down Expand Up @@ -498,11 +501,37 @@ def clientConnectionFailed(self, connector, reason):
reason
))


# BEGIN Paxos Transport methods
def broadcast(self, line):
for connection in self.connections.values():
connection.sendLine(line)
return len(self.connections)

@property
def quorum_size(self):
return max(2, len(self.connections)/ 2 + 1)

def learn(self, num, value):
"""First callback in the paxos result accepting chain."""
logging.info('factory.learn %s' % (value,))

self.master = (self.interface, self.port)
self._log.append(value)

splitted = shlex.split(value)
command_name, args = splitted[0], splitted[1:]

command = '_log_cmd_' + command_name.replace('-', '_')
cmd = getattr(self, command)

try:
return cmd(*args)
except Exception, e:
logging.exception('command "%s" failed' % command_name)
raise

# END Paxos Transport methods

def on_learn(self, num, value, client = None):
logging.info('factory.on_learn %s' % (value,))
Expand Down
19 changes: 12 additions & 7 deletions lock/paxos.py
Expand Up @@ -5,6 +5,8 @@
from twisted.internet import reactor
from collections import deque

from .utils import escape

base.DelayedCall.debug = True

class PaxosError(RuntimeError):
Expand All @@ -23,8 +25,9 @@ def _stop_waiting(timeout):


class Paxos(object):
def __init__(self, transport, quorum_timeout=2):
def __init__(self, transport, on_learn, quorum_timeout=2):
self.transport = transport
self.on_learn = on_learn
self.quorum_timeout = quorum_timeout

self.id = 0
Expand Down Expand Up @@ -61,7 +64,7 @@ def _start_paxos(self, value, deferred):

def _timeout_callback():
print '+++ prepare timeout'
self.deferred.errback(PrepareTimeout)
self.deferred.errback(PrepareTimeout())

self._acks_timeout = reactor.callLater(self.quorum_timeout, _timeout_callback)
self.transport.broadcast('paxos_prepare %s' % self.id)
Expand All @@ -83,13 +86,13 @@ def paxos_ack(self, num, client):

def _timeout_callback():
print '+++ accept timeout'
self.deferred.errback(AcceptTimeout)
self.deferred.errback(AcceptTimeout())

self._accepted_timeout = reactor.callLater(
self.quorum_timeout,
_timeout_callback
)
self.transport.broadcast('paxos_accept %s "%s"' % (self.id, self.proposed_value))
self.transport.broadcast('paxos_accept %s "%s"' % (self.id, escape(self.proposed_value)))

def paxos_accept(self, num, value, client):
num = int(num)
Expand All @@ -102,16 +105,18 @@ def paxos_accepted(self, num, client):
self._num_accepts_to_wait -= 1
if self._num_accepts_to_wait == 0:
_stop_waiting(self._accepted_timeout)
self.transport.broadcast('paxos_learn %s "%s"' % (self.id, self.proposed_value))
self.transport.broadcast('paxos_learn %s "%s"' % (self.id, escape(self.proposed_value)))

def paxos_learn(self, num, value, client):
num = int(num)
self.id = num
self.transport.learn(num, value)

result = self.on_learn(num, value)

if self.deferred is not None and \
self.proposed_value == value:

self.deferred.callback((num, value))
self.deferred.callback(result)

if self.queue:
# start new Paxos instance
Expand Down
6 changes: 2 additions & 4 deletions lock/test/test_paxos.py
Expand Up @@ -25,6 +25,7 @@ def broadcast(self, message, from_transport):

def learn(self, num, value):
self.log.append((num, value))
return (num, value)

def wait_delayed_calls(self):
return wait_calls(
Expand All @@ -48,9 +49,6 @@ def broadcast(self, message):
print '%s broadcasting "%s"' % (self.id, message)
return self.network.broadcast(message, self)

def learn(self, num, value):
self.network.learn(num, value)

def send(self, message, from_transport):
print '%s sending "%s" to %s' % (from_transport.id, message, self.id)
self.network._delayed_calls.append(
Expand All @@ -71,7 +69,7 @@ def setUp(self):
self.net = Network()
self.net.transports = [Transport(i, self.net) for i in xrange(5)]
for tr in self.net.transports:
tr.paxos = Paxos(tr)
tr.paxos = Paxos(tr, on_learn=self.net.learn)

def tearDown(self):
for tr in self.net.transports:
Expand Down
9 changes: 6 additions & 3 deletions lock/web.py
Expand Up @@ -15,7 +15,9 @@
from twisted.python.log import err
from twisted.python.failure import Failure

from . exceptions import KeyAlreadyExists, KeyNotFound, PaxosFailed
from . exceptions import KeyAlreadyExists, KeyNotFound
from .paxos import PaxosError


def _get_key_from_path(path):
return path[1:]
Expand All @@ -26,6 +28,7 @@ def cb():
return 'Long Call Result'
return deferLater(reactor, secs, cb)


def delayed(func):
func = inlineCallbacks(func)

Expand Down Expand Up @@ -92,7 +95,7 @@ def render_POST(self, request):
yield self._lock.set_key(key, '')
except KeyAlreadyExists:
request.setResponseCode(CONFLICT)
except PaxosFailed:
except PaxosError:
request.setResponseCode(EXPECTATION_FAILED)


Expand All @@ -104,7 +107,7 @@ def render_DELETE(self, request):
yield self._lock.del_key(key)
except KeyNotFound:
request.setResponseCode(NOT_FOUND)
except PaxosFailed:
except PaxosError:
request.setResponseCode(EXPECTATION_FAILED)


0 comments on commit 3e0b1f5

Please sign in to comment.