Skip to content

Commit

Permalink
Merge pull request #1941 from jyellick/deduplicate-with-censorship-risk
Browse files Browse the repository at this point in the history
Deduplicate requests based on last execution time
  • Loading branch information
srderson committed Jun 24, 2016
2 parents 1116c74 + 262bba2 commit 84b64ae
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 22 deletions.
66 changes: 44 additions & 22 deletions consensus/obcpbft/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type broadcaster struct {

type sendRequest struct {
msg *pb.Message
done chan struct{}
done chan bool
}

func newBroadcaster(self uint64, N int, f int, c communicator) *broadcaster {
Expand Down Expand Up @@ -73,64 +73,65 @@ func (b *broadcaster) Wait() {
b.closed.Wait()
}

func (b *broadcaster) drainerSend(dest uint64, send *sendRequest, printedValidatorNotFound bool) bool {
func (b *broadcaster) drainerSend(dest uint64, send *sendRequest, successLastTime bool) bool {
// Note, successLastTime is purely used to avoid flooding the log with unnecessary warning messages when a network problem is encountered
defer func() {
send.done <- struct{}{}
b.closed.Done()
}()
h, err := getValidatorHandle(dest)
if err != nil {
if !printedValidatorNotFound {
if successLastTime {
logger.Warningf("could not get handle for replica %d", dest)
}
time.Sleep(time.Second)
return true
}

if printedValidatorNotFound {
logger.Infof("Found handle for replica %d", dest)
printedValidatorNotFound = false
send.done <- false
return false
}

err = b.comm.Unicast(send.msg, h)
if err != nil {
logger.Warningf("could not send to replica %d: %v", dest, err)
if successLastTime {
logger.Warningf("could not send to replica %d: %v", dest, err)
}
send.done <- false
return false
}

return false
send.done <- true
return true

}

func (b *broadcaster) drainer(dest uint64) {
printedValidatorNotFound := false
successLastTime := false

for {
select {
case send := <-b.msgChans[dest]:
successLastTime = b.drainerSend(dest, send, successLastTime)
case <-b.closedCh:
for {
// Drain the message channel to free calling waiters before we shut down
select {
case send := <-b.msgChans[dest]:
send.done <- struct{}{}
send.done <- false
b.closed.Done()
default:
return
}
}
case send := <-b.msgChans[dest]:
printedValidatorNotFound = b.drainerSend(dest, send, printedValidatorNotFound)
}
}
}

func (b *broadcaster) unicastOne(msg *pb.Message, dest uint64, wait chan struct{}) {
func (b *broadcaster) unicastOne(msg *pb.Message, dest uint64, wait chan bool) {
select {
case b.msgChans[dest] <- &sendRequest{
msg: msg,
done: wait,
}:
default:
// If this channel is full, we must discard the message and flag it as done
wait <- struct{}{}
wait <- false
b.closed.Done()
}
}
Expand All @@ -152,7 +153,7 @@ func (b *broadcaster) send(msg *pb.Message, dest *uint64) error {
required = destCount - b.f
}

wait := make(chan struct{}, destCount)
wait := make(chan bool, destCount)

if dest != nil {
b.closed.Add(1)
Expand All @@ -164,8 +165,29 @@ func (b *broadcaster) send(msg *pb.Message, dest *uint64) error {
}
}

for i := 0; i < required; i++ {
<-wait
succeeded := 0
timer := time.NewTimer(time.Second) // TODO, make this configurable

// This loop will try to send, until one of:
// a) the required number of sends succeed
// b) all sends complete regardless of success
// c) the timeout expires and the required number of sends have returned
outer:
for i := 0; i < destCount; i++ {
select {
case success := <-wait:
if success {
succeeded++
if succeeded >= required {
break outer
}
}
case <-timer.C:
for i := i; i < required; i++ {
<-wait
}
break outer
}
}

return nil
Expand Down
86 changes: 86 additions & 0 deletions consensus/obcpbft/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,89 @@ func TestBroadcastUnicast(t *testing.T) {
t.Errorf("broadcast did not send to dest peer: %v", sent)
}
}

type mockFailComm struct {
mockComm
done chan struct{}
}

func (m *mockFailComm) Unicast(msg *pb.Message, dest *pb.PeerID) error {
return fmt.Errorf("always fails on purpose")
}

func TestBroadcastAllFail(t *testing.T) {
m := &mockFailComm{
mockComm: mockComm{
self: 1,
n: 4,
msgCh: make(chan mockMsg),
},
done: make(chan struct{}),
}

b := newBroadcaster(1, 4, 1, m)

maxc := 20
for c := 0; c < maxc; c++ {
b.Broadcast(&pb.Message{Payload: []byte(fmt.Sprintf("%d", c))})
}

done := make(chan struct{})
go func() {
close(m.done)
b.Close() // If the broadcasts are still trying (despite all the failures), this call blocks until the timeout
close(done)
}()

select {
case <-done:
return
case <-time.After(time.Second):
t.Fatal("Could not successfully close broadcaster, after 1 second")
}
}

type mockIndefinitelyStuckComm struct {
mockComm
done chan struct{}
}

func (m *mockIndefinitelyStuckComm) Unicast(msg *pb.Message, dest *pb.PeerID) error {
if dest.Name == "vp0" {
<-m.done
}
return fmt.Errorf("Always failing, on purpose, with vp0 stuck")
}

func TestBroadcastIndefinitelyStuck(t *testing.T) {
m := &mockIndefinitelyStuckComm{
mockComm: mockComm{
self: 1,
n: 4,
msgCh: make(chan mockMsg),
},
done: make(chan struct{}),
}

b := newBroadcaster(1, 4, 1, m)

broadcastDone := make(chan struct{})

go func() {
maxc := 3
for c := 0; c < maxc; c++ {
b.Broadcast(&pb.Message{Payload: []byte(fmt.Sprintf("%d", c))})
}
close(broadcastDone)
}()

select {
case <-broadcastDone:
// Success
case <-time.After(10 * time.Second):
t.Errorf("Got blocked for too long")
}

close(m.done)
b.Close()
}
11 changes: 11 additions & 0 deletions consensus/obcpbft/obc-batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type obcBatch struct {

reqStore *requestStore // Holds the outstanding and pending requests

deduplicator *deduplicator

persistForward
}

Expand Down Expand Up @@ -105,6 +107,8 @@ func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatc

op.reqStore = newRequestStore()

op.deduplicator = newDeduplicator()

op.idleChan = make(chan struct{})
close(op.idleChan) // TODO remove eventually

Expand Down Expand Up @@ -213,6 +217,8 @@ func (op *obcBatch) execute(seqNo uint64, raw []byte) {
logger.Debugf("Batch replica %d missing transaction %s outstanding=%v, pending=%v", op.pbft.id, tx.Uuid, outstanding, pending)
}
txs = append(txs, tx)

op.deduplicator.Execute(req)
}

meta, _ := proto.Marshal(&Metadata{seqNo})
Expand Down Expand Up @@ -310,6 +316,11 @@ func (op *obcBatch) processMessage(ocMsg *pb.Message, senderHandle *pb.PeerID) e
}

if req := batchMsg.GetRequest(); req != nil {
if !op.deduplicator.IsNew(req) {
logger.Warningf("Replica %d ignoring request as it is too old", op.pbft.id)
return nil
}

if (op.pbft.primary(op.pbft.view) == op.pbft.id) && op.pbft.activeView {
return op.leaderProcReq(req)
}
Expand Down

0 comments on commit 84b64ae

Please sign in to comment.