Skip to content

Commit

Permalink
Timeouts in new Paxos class.
Browse files Browse the repository at this point in the history
  • Loading branch information
svetlyak40wt committed Feb 18, 2011
1 parent 23c57d8 commit 370b31e
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 25 deletions.
52 changes: 50 additions & 2 deletions lock/paxos.py
@@ -1,23 +1,43 @@
import shlex

from twisted.internet.defer import Deferred
from twisted.internet import base
from twisted.internet import reactor
from collections import deque

base.DelayedCall.debug = True

class ConflictError(RuntimeError):
class PaxosError(RuntimeError):
pass

class PrepareTimeout(PaxosError):
pass

class AcceptTimeout(PaxosError):
pass


def _stop_waiting(timeout):
if timeout is not None and not (timeout.called or timeout.cancelled):
timeout.cancel()


class Paxos(object):
def __init__(self, transport):
def __init__(self, transport, quorum_timeout=2):
self.transport = transport
self.quorum_timeout = quorum_timeout

self.id = 0
self.max_seen_id = 0

self.proposed_value = None
self.deferred = None
self.queue = deque() # queue of (value, deferred) to propose

# delayed calls for timeouts
self._accepted_timeout = None
self._acks_timeout = None

def recv(self, message, client):
message = shlex.split(message)
command = getattr(self, message[0])
Expand All @@ -38,6 +58,12 @@ def _start_paxos(self, value, deferred):
self.deferred = deferred

self._num_acks_to_wait = self.transport.quorum_size

def _timeout_callback():
print '+++ prepare timeout'
self.deferred.errback(PrepareTimeout)

self._acks_timeout = reactor.callLater(self.quorum_timeout, _timeout_callback)
self.transport.broadcast('paxos_prepare %s' % self.id)

def paxos_prepare(self, num, client):
Expand All @@ -51,7 +77,18 @@ def paxos_ack(self, num, client):
if num == self.id:
self._num_acks_to_wait -= 1
if self._num_acks_to_wait == 0:
_stop_waiting(self._acks_timeout)

self._num_accepts_to_wait = self.transport.quorum_size

def _timeout_callback():
print '+++ accept timeout'
self.deferred.errback(AcceptTimeout)

self._accepted_timeout = reactor.callLater(
self.quorum_timeout,
_timeout_callback
)
self.transport.broadcast('paxos_accept %s "%s"' % (self.id, self.proposed_value))

def paxos_accept(self, num, value, client):
Expand All @@ -64,6 +101,7 @@ def paxos_accepted(self, num, client):
if num == self.id:
self._num_accepts_to_wait -= 1
if self._num_accepts_to_wait == 0:
_stop_waiting(self._accepted_timeout)
self.transport.broadcast('paxos_learn %s "%s"' % (self.id, self.proposed_value))

def paxos_learn(self, num, value, client):
Expand All @@ -87,3 +125,13 @@ def paxos_learn(self, num, value, client):
def _send_to(self, client, message):
client.send(message, self.transport)

def _get_timeouts(self):
return [
self._accepted_timeout,
self._acks_timeout,
]

def _cancel_timeouts(self):
for timeout in self._get_timeouts():
_stop_waiting(timeout)

51 changes: 28 additions & 23 deletions lock/test/test_paxos.py
@@ -1,27 +1,16 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import random
import time

from twisted.trial import unittest
from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from random import random

from ..paxos import Paxos, ConflictError


def _wait(predicate):
"""Async waiter for predicate.
Returns deferred and will call callback
when predicate() will return true.
"""
d = Deferred()
def _wait():
if predicate():
d.callback(True)
else:
reactor.callLater(0.1, _wait)
reactor.callLater(0.1, _wait)
return d
from ..paxos import Paxos, PrepareTimeout
from ..utils import wait_calls
from itertools import chain


class Network(object):
Expand All @@ -38,7 +27,15 @@ def learn(self, num, value):
self.log.append((num, value))

def wait_delayed_calls(self):
return _wait(lambda: all((c.cancelled or c.called) for c in self._delayed_calls))
return wait_calls(
lambda: all(
(c is None or c.cancelled or c.called)
for c in chain(
self._delayed_calls,
*(tr.paxos._get_timeouts() for tr in self.transports)
)
)
)


class Transport(object):
Expand All @@ -57,7 +54,7 @@ def learn(self, num, value):
def send(self, message, from_transport):
print '%s sending "%s" to %s' % (from_transport.id, message, self.id)
self.network._delayed_calls.append(
reactor.callLater(random(), self.paxos.recv, message, from_transport)
reactor.callLater(random.random(), self.paxos.recv, message, from_transport)
)

@property
Expand All @@ -67,11 +64,19 @@ def quorum_size(self):

class PaxosTests(unittest.TestCase):
def setUp(self):
self.seed = int(time.time())
random.seed(self.seed)
print 'Random seed: %s' % self.seed

self.net = Network()
self.net.transports = [Transport(i, self.net) for i in xrange(5)]
for tr in self.net.transports:
tr.paxos = Paxos(tr)

def tearDown(self):
for tr in self.net.transports:
tr.paxos._cancel_timeouts()

@inlineCallbacks
def test_basic(self):
self.assertEqual([], self.net.log)
Expand Down Expand Up @@ -100,7 +105,7 @@ def test_two_proposes_from_different_nodes_in_sequence(self):
a = yield self.net.transports[0].paxos.propose('blah')

# Waiting when paxos on node 1 will learn the new value
yield _wait(lambda: self.net.transports[1].paxos.id == 1)
yield wait_calls(lambda: self.net.transports[1].paxos.id == 1)
b = yield self.net.transports[1].paxos.propose('minor')

yield self.net.wait_delayed_calls()
Expand All @@ -124,8 +129,8 @@ def check_success(result):

def _run_second_round():
def check_fail(result):
self.assertEqual(ConflictError, result)
second_round_failed[0] = True
if result.value == PrepareTimeout:
second_round_failed[0] = True

d2 = self.net.transports[1].paxos.propose('minor')
d2.addBoth(check_fail)
Expand Down
21 changes: 21 additions & 0 deletions lock/utils.py
Expand Up @@ -5,6 +5,9 @@

from functools import wraps

from twisted.internet.defer import Deferred
from twisted.internet import reactor

def parse_ip(text):
text = text.strip()
if ':' in text:
Expand Down Expand Up @@ -57,4 +60,22 @@ def trace_all(cls):
setattr(cls, key, trace(value))
return cls


escape = lambda x: x.replace('\\', '\\\\').replace('"', '\\"')


def wait_calls(predicate, check_step = 0.1):
"""Async waiter for predicate.
Returns deferred and will call callback
when predicate() will return true.
"""
d = Deferred()
def _wait():
if predicate():
d.callback(True)
else:
reactor.callLater(check_step, _wait)
reactor.callLater(check_step, _wait)
return d


18 changes: 18 additions & 0 deletions run-tests-many-times.sh
@@ -0,0 +1,18 @@
#!/bin/bash

NUM=${1:-2}
let ERRORS=0

date > tests.log

for i in `seq $NUM`
do
echo "$i iteration"
if ! env/bin/trial lock.test.test_paxos >> tests.log 2>&1
then
let ERRORS=$ERRORS+1
fi
done

echo "Num errors: $ERRORS"

0 comments on commit 370b31e

Please sign in to comment.