Skip to content

Commit

Permalink
optional quorum as functions
Browse files Browse the repository at this point in the history
  • Loading branch information
ailidani committed Nov 25, 2018
1 parent 3e31d6f commit b17075b
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 72 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -6,4 +6,5 @@ tla/*
!tla/wpaxos.tla
.idea
model/.*
aws
aws
ppaxos
6 changes: 0 additions & 6 deletions bin/config.json
Expand Up @@ -22,15 +22,9 @@
"3.3": "http://127.0.0.1:8088"
},
"transport": "tcp",
"quorum": "fgrid",
"f": 0,
"adaptive": true,
"policy": "majority",
"threshold": 3,
"backoff": 0,
"thrifty": false,
"reply_when_commit": false,
"fast_read": false,
"chan_buffer_size": 1024,
"buffer_size": 0,
"multiversion": false,
Expand Down
2 changes: 1 addition & 1 deletion bin/start.sh
Expand Up @@ -2,5 +2,5 @@

PID_FILE=server.pid

./server -log_dir=. -log_level=info -id $1 &
./server -log_dir=. -log_level=info -id $1 -algorithm=paxos &
echo $! >> ${PID_FILE}
13 changes: 8 additions & 5 deletions client.go
Expand Up @@ -48,13 +48,16 @@ func NewHTTPClient(id ID) *HTTPClient {
Http: config.HTTPAddrs,
Client: &http.Client{},
}
i := 0
for node := range c.Addrs {
if node.Zone() == id.Zone() {
i++
if id != "" {
i := 0
for node := range c.Addrs {
if node.Zone() == id.Zone() {
i++
}
}
c.LocalN = i
}
c.LocalN = i

return c
}

Expand Down
27 changes: 10 additions & 17 deletions config.go
Expand Up @@ -15,14 +15,9 @@ type Config struct {
Addrs map[ID]string `json:"address"` // address for node communication
HTTPAddrs map[ID]string `json:"http_address"` // address for client server communication

Quorum string `json:"quorum"` // type of the quorums
F int `json:"f"` // number of failure zones in general grid quorums
Transport string `json:"transport"` // not used
ReplyWhenCommit bool `json:"reply_when_commit"` // reply to client when request is committed, instead of executed
FastRead bool `json:"fast_read"` // read from local copy
Adaptive bool `json:"adaptive"` // adaptive leader change, if true paxos forward request to current leader
Policy string `json:"policy"` // leader change policy {consecutive, majority}
Threshold float64 `json:"threshold"` // threshold for policy in WPaxos {n consecutive or time interval in ms}
Transport string `json:"transport"` // not used
Policy string `json:"policy"` // leader change policy {consecutive, majority}
Threshold float64 `json:"threshold"` // threshold for policy in WPaxos {n consecutive or time interval in ms}

Thrifty bool `json:"thrifty"` // only send messages to a quorum
BufferSize int `json:"buffer_size"` // buffer size for maps
Expand Down Expand Up @@ -61,15 +56,13 @@ func Simulation() {
// only used by init() and master
func MakeDefaultConfig() Config {
return Config{
Transport: "tcp",
ReplyWhenCommit: false,
Adaptive: true,
Policy: "consecutive",
Threshold: 3,
BufferSize: 1024,
ChanBufferSize: 1024,
MultiVersion: false,
Benchmark: DefaultBConfig(),
Transport: "tcp",
Policy: "consecutive",
Threshold: 3,
BufferSize: 1024,
ChanBufferSize: 1024,
MultiVersion: false,
Benchmark: DefaultBConfig(),
}
}

Expand Down
8 changes: 6 additions & 2 deletions epaxos/replica.go
@@ -1,10 +1,14 @@
package epaxos

import (
"flag"

"github.com/ailidani/paxi"
"github.com/ailidani/paxi/log"
)

var replyWhenCommit = flag.Bool("ReplyWhenCommit", false, "Reply to client when request is committed, instead of executed")

type Replica struct {
paxi.Node
log map[paxi.ID]map[int]*instance
Expand Down Expand Up @@ -267,7 +271,7 @@ func (r *Replica) handlePreAcceptReply(m PreAcceptReply) {
Seq: i.seq,
Dep: i.copyDep(),
})
if paxi.GetConfig().ReplyWhenCommit {
if *replyWhenCommit {
i.request.Reply(paxi.Reply{
Command: i.cmd,
})
Expand Down Expand Up @@ -343,7 +347,7 @@ func (r *Replica) handleAcceptReply(m AcceptReply) {
if i.quorum.Majority() {
i.status = COMMITTED
r.updateCommit(r.ID())
if paxi.GetConfig().ReplyWhenCommit {
if *replyWhenCommit {
i.request.Reply(paxi.Reply{
Command: i.cmd,
})
Expand Down
1 change: 1 addition & 0 deletions http.go
Expand Up @@ -47,6 +47,7 @@ func (n *node) handleRoot(w http.ResponseWriter, r *http.Request) {
var err error

// get all http headers
req.Properties = make(map[string]string)
for k := range r.Header {
if k == HTTPClientID {
cmd.ClientID = ID(r.Header.Get(HTTPClientID))
Expand Down
8 changes: 0 additions & 8 deletions master/master.go
Expand Up @@ -15,14 +15,9 @@ var port = flag.Int("port", 1735, "master port number")
var httpPort = flag.Int("http", 8080, "http port")

var n = flag.Int("n", 1, "N number of replicas, default value 1.")
var algorithm = flag.String("algorithm", "paxos", "Consensus algorithm name")
var f = flag.Int("f", 0, "tolerate f zone failures")
var adaptive = flag.Bool("adaptive", true, "Adaptive leader change")
var threshold = flag.Float64("threshold", 3.0, "Threshold for leader change")
var backOff = flag.Int("backoff", 100, "Random backoff time")
var thrifty = flag.Bool("thrifty", false, "")
var transport = flag.String("transport", "tcp", "Transport protocols, including tcp, udp, chan (local)")
var replywhencommit = flag.Bool("replywhencommit", false, "reply to client when request is committed, not executed")

func main() {
flag.Parse()
Expand All @@ -33,10 +28,7 @@ func main() {
out := make(chan paxi.Config)

config := paxi.MakeDefaultConfig()
config.F = *f
config.Adaptive = *adaptive
config.Threshold = *threshold
config.BackOff = *backOff
config.Thrifty = *thrifty

go func() {
Expand Down
13 changes: 0 additions & 13 deletions node.go
Expand Up @@ -105,19 +105,6 @@ func (n *node) handle() {
for {
msg := <-n.MessageChan
v := reflect.ValueOf(msg)

if config.FastRead && v.Type() == reflect.TypeOf(Request{}) {
r := msg.(Request)
if r.Command.IsRead() {
value := n.Execute(r.Command)
r.Reply(Reply{
Command: r.Command,
Value: value,
})
continue
}
}

name := v.Type().String()
f, exists := n.handles[name]
if !exists {
Expand Down
35 changes: 24 additions & 11 deletions paxos/paxos.go
Expand Up @@ -30,17 +30,30 @@ type Paxos struct {

quorum *paxi.Quorum // phase 1 quorum
requests []*paxi.Request // phase 1 pending requests

Q1 func(*paxi.Quorum) bool
Q2 func(*paxi.Quorum) bool
ReplyWhenCommit bool
}

// NewPaxos creates new paxos instance
func NewPaxos(n paxi.Node) *Paxos {
return &Paxos{
Node: n,
log: make(map[int]*entry, paxi.GetConfig().BufferSize),
slot: -1,
quorum: paxi.NewQuorum(),
requests: make([]*paxi.Request, 0),
func NewPaxos(n paxi.Node, options ...func(*Paxos)) *Paxos {
p := &Paxos{
Node: n,
log: make(map[int]*entry, paxi.GetConfig().BufferSize),
slot: -1,
quorum: paxi.NewQuorum(),
requests: make([]*paxi.Request, 0),
Q1: func(q *paxi.Quorum) bool { return q.Majority() },
Q2: func(q *paxi.Quorum) bool { return q.Majority() },
ReplyWhenCommit: false,
}

for _, opt := range options {
opt(p)
}

return p
}

// IsLeader indecates if this node is current leader
Expand Down Expand Up @@ -189,7 +202,7 @@ func (p *Paxos) HandleP1b(m P1b) {
// ack message
if m.Ballot.ID() == p.ID() && m.Ballot == p.ballot {
p.quorum.ACK(m.ID)
if p.quorum.Q1() {
if p.Q1(p.quorum) {
p.active = true
// propose any uncommitted entries
for i := p.execute; i <= p.slot; i++ {
Expand Down Expand Up @@ -273,15 +286,15 @@ func (p *Paxos) HandleP2b(m P2b) {
// if no q2 can be formed, this slot will be retried when received p2a or p3
if m.Ballot.ID() == p.ID() && m.Ballot == p.log[m.Slot].ballot {
p.log[m.Slot].quorum.ACK(m.ID)
if p.log[m.Slot].quorum.Q2() {
if p.Q2(p.log[m.Slot].quorum) {
p.log[m.Slot].commit = true
p.Broadcast(P3{
Ballot: m.Ballot,
Slot: m.Slot,
Command: p.log[m.Slot].command,
})

if paxi.GetConfig().ReplyWhenCommit {
if p.ReplyWhenCommit {
r := p.log[m.Slot].request
r.Reply(paxi.Reply{
Command: r.Command,
Expand Down Expand Up @@ -315,7 +328,7 @@ func (p *Paxos) HandleP3(m P3) {
e.command = m.Command
e.commit = true

if paxi.GetConfig().ReplyWhenCommit {
if p.ReplyWhenCommit {
if e.request != nil {
e.request.Reply(paxi.Reply{
Command: e.request.Command,
Expand Down
12 changes: 6 additions & 6 deletions quorum.go
@@ -1,7 +1,5 @@
package paxi

import "github.com/ailidani/paxi/log"

// Quorum records each acknowledgement and check for different types of quorum satisfied
type Quorum struct {
size int
Expand Down Expand Up @@ -95,27 +93,28 @@ func (q *Quorum) GridColumn() bool {
}

// FGridQ1 is flexible grid quorum for phase 1
func (q *Quorum) FGridQ1() bool {
func (q *Quorum) FGridQ1(Fz int) bool {
zone := 0
for z, n := range q.zones {
if n > config.npz[z]/2 {
zone++
}
}
return zone >= config.z-config.F
return zone >= config.z-Fz
}

// FGridQ2 is flexible grid quorum for phase 2
func (q *Quorum) FGridQ2() bool {
func (q *Quorum) FGridQ2(Fz int) bool {
zone := 0
for z, n := range q.zones {
if n > config.npz[z]/2 {
zone++
}
}
return zone >= config.F+1
return zone >= Fz+1
}

/*
// Q1 returns true if config.Quorum type is satisfied
func (q *Quorum) Q1() bool {
switch config.Quorum {
Expand Down Expand Up @@ -153,3 +152,4 @@ func (q *Quorum) Q2() bool {
return false
}
}
*/
15 changes: 14 additions & 1 deletion wpaxos/kpaxos.go
Expand Up @@ -12,12 +12,25 @@ type kpaxos struct {
paxi.Policy
}

func Q1(q *paxi.Quorum) bool {
return q.FGridQ1(*fz)
}

func Q2(q *paxi.Quorum) bool {
return q.FGridQ2(*fz)
}

func newKPaxos(key paxi.Key, node paxi.Node) *kpaxos {
k := &kpaxos{}
k.Node = node
k.key = key
k.Policy = paxi.NewPolicy()
k.Paxos = paxos.NewPaxos(k)

quorum := func(p *paxos.Paxos) {
p.Q1 = Q1
p.Q2 = Q2
}
k.Paxos = paxos.NewPaxos(k, quorum)

// zone := int(key)%paxi.GetConfig().Z() + 1
// id := paxi.NewID(zone, 1)
Expand Down
7 changes: 6 additions & 1 deletion wpaxos/replica.go
@@ -1,10 +1,15 @@
package wpaxos

import (
"flag"

"github.com/ailidani/paxi"
"github.com/ailidani/paxi/log"
)

var adaptive = flag.Bool("adaptive", true, "stable leader, if true paxos forward request to current leader")
var fz = flag.Int("fz", 0, "f_z fault tolerent zones")

// Replica is WPaxos replica node
type Replica struct {
paxi.Node
Expand Down Expand Up @@ -40,7 +45,7 @@ func (r *Replica) handleRequest(m paxi.Request) {
r.init(key)

p := r.paxi[key]
if paxi.GetConfig().Adaptive {
if *adaptive {
if p.IsLeader() || p.Ballot() == 0 {
p.HandleRequest(m)
to := p.Hit(m.NodeID)
Expand Down

0 comments on commit b17075b

Please sign in to comment.