Skip to content

Commit

Permalink
Consensus: add a close function to actors
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilthoniel committed Mar 19, 2020
1 parent c2b3572 commit 0fc032d
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 47 deletions.
19 changes: 16 additions & 3 deletions consensus/cosipbft/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (c *Consensus) Listen(v consensus.Validator) (consensus.Actor, error) {
}

actor := pbftActor{
closing: make(chan struct{}),
hashFactory: c.factory.GetHashFactory(),
}

Expand All @@ -92,14 +93,16 @@ func (c *Consensus) Listen(v consensus.Validator) (consensus.Actor, error) {
}

type pbftActor struct {
closing chan struct{}
hashFactory crypto.HashFactory
cosiActor cosi.Actor
rpc mino.RPC
}

// Propose takes the proposal and send it to the participants of the consensus.
// It returns nil if the consensus is reached and that the participant are
// committed to it, otherwise it returns the refusal reason.
// Propose implements consensus.Actor. It takes the proposal and send it to the
// participants of the consensus. It returns nil if the consensus is reached and
// that the participant are committed to it, otherwise it returns the refusal
// reason.
func (a pbftActor) Propose(p consensus.Proposal, players mino.Players) error {
prepareReq, err := newPrepareRequest(p, a.hashFactory)
if err != nil {
Expand Down Expand Up @@ -144,6 +147,9 @@ func (a pbftActor) Propose(p consensus.Proposal, players mino.Players) error {
// TODO: timeout in context ?
resps, errs := a.rpc.Call(propagateReq, ca)
select {
case <-a.closing:
// Abort the RPC call.
// TODO: add context to the RPC.Call
case <-resps:
case err := <-errs:
return xerrors.Errorf("couldn't propagate the link: %v", err)
Expand All @@ -152,6 +158,13 @@ func (a pbftActor) Propose(p consensus.Proposal, players mino.Players) error {
return nil
}

// Close implements consensus.Actor. It announces a close event to allow current
// execution to be aborted.
func (a pbftActor) Close() error {
close(a.closing)
return nil
}

type handler struct {
*Consensus
validator consensus.Validator
Expand Down
12 changes: 11 additions & 1 deletion consensus/cosipbft/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func checkSignatureValue(t *testing.T, pb *any.Any, value uint64) {
require.Equal(t, value, wrapper.GetValue())
}

func TestConsensus_Propose(t *testing.T) {
func TestActor_Propose(t *testing.T) {
rpc := &fakeRPC{}
cosiActor := &fakeCosiActor{}
actor := &pbftActor{
Expand Down Expand Up @@ -175,6 +175,16 @@ func TestConsensus_Propose(t *testing.T) {
checkSignatureValue(t, propagate.GetCommit(), 2)
}

func TestActor_Close(t *testing.T) {
actor := &pbftActor{
closing: make(chan struct{}),
}

require.NoError(t, actor.Close())
_, ok := <-actor.closing
require.False(t, ok)
}

type badCosiActor struct {
cosi.CollectiveSigning
delay int
Expand Down
2 changes: 2 additions & 0 deletions consensus/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Actor interface {
// Propose performs the consensus algorithm using the list of nodes
// as participants.
Propose(proposal Proposal, players mino.Players) error

Close() error
}

// Consensus is an interface that provides primitives to propose data to a set
Expand Down
4 changes: 2 additions & 2 deletions consensus/qsc/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (b *bTLCB) execute(ctx context.Context, message proto.Message) (*View, erro
fabric.Logger.Trace().Msgf("%d going through prepare broadcast", b.node)
prepareSet, err := b.b1.execute(ctx, m)
if err != nil {
return nil, xerrors.Errorf("couldn't broadcast: %v", err)
return nil, xerrors.Errorf("couldn't broadcast: %w", err)
}

m2, err := b.makeMessage(&MessageSet{Messages: prepareSet.GetReceived()})
Expand All @@ -276,7 +276,7 @@ func (b *bTLCB) execute(ctx context.Context, message proto.Message) (*View, erro
fabric.Logger.Trace().Msgf("%d going through commit broadcast", b.node)
commitSet, err := b.b2.execute(ctx, m2)
if err != nil {
return nil, xerrors.Errorf("couldn't broadcast: %v", err)
return nil, xerrors.Errorf("couldn't broadcast: %w", err)
}

ret, err := b.merge(prepareSet, commitSet)
Expand Down
71 changes: 43 additions & 28 deletions consensus/qsc/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ var protoenc encoding.ProtoMarshaler = encoding.NewProtoEncoder()
// decide to include them in the common state.
type Consensus struct {
ch chan consensus.Proposal
closing chan struct{}
history history
broadcast broadcast
historiesFactory historiesFactory

Stopped chan struct{}
}

// NewQSC returns a new instance of QSC.
Expand All @@ -48,10 +47,10 @@ func NewQSC(node int64, mino mino.Mino, players mino.Players) (*Consensus, error

return &Consensus{
ch: make(chan consensus.Proposal),
closing: make(chan struct{}),
history: make(history, 0),
broadcast: bc,
historiesFactory: defaultHistoriesFactory{},
Stopped: make(chan struct{}),
}, nil
}

Expand All @@ -73,37 +72,47 @@ func (c *Consensus) Listen(val consensus.Validator) (consensus.Actor, error) {
for {
var proposal consensus.Proposal
select {
case prop, ok := <-c.ch:
if !ok {
fabric.Logger.Trace().Msg("closing")
return
}

proposal = prop
case <-c.closing:
fabric.Logger.Trace().Msg("closing")
return
case proposal = <-c.ch:
default:
// If the current node does not have anything to propose, it
// still has to participate so it sends an empty proposal.
proposal = nil
}

err := c.executeRound(proposal, val)
ctx, cancel := context.WithTimeout(context.Background(), EpochTimeout)

go func() {
// This Go routine is responsible for listening a close event
// from the actor.
select {
case <-ctx.Done():
case <-c.closing:
// Cancel the execution of the next time step.
cancel()
}
}()

err := c.executeRound(ctx, proposal, val)
if err != nil {
fabric.Logger.Err(err).Msg("failed to execute a time step")
select {
case <-c.closing:
default:
// Only log if the consensus has not been closed properly.
fabric.Logger.Err(err).Msg("failed to execute a time step")
}
}

cancel()
}
}()

return actor{ch: c.ch}, nil
return actor{ch: c.ch, closing: c.closing}, nil
}

// Close stops and cleans the main loop.
func (c *Consensus) Close() error {
close(c.ch)

return nil
}

func (c *Consensus) executeRound(prop consensus.Proposal, val consensus.Validator) error {
func (c *Consensus) executeRound(ctx context.Context, prop consensus.Proposal, val consensus.Validator) error {
// 1. Choose the message and the random value. The new epoch will be
// appended to the current history.
e := epoch{
Expand All @@ -121,12 +130,9 @@ func (c *Consensus) executeRound(prop consensus.Proposal, val consensus.Validato

// 2. Broadcast our history to the network and get back messages
// from this time step.
ctx, cancel := context.WithTimeout(context.Background(), EpochTimeout)
defer cancel()

prepareSet, err := c.broadcast.send(ctx, newHistory)
if err != nil {
return xerrors.Errorf("couldn't broadcast: %v", err)
return xerrors.Errorf("couldn't broadcast: %w", err)
}

// 3. Get the best history from the received messages.
Expand All @@ -138,7 +144,7 @@ func (c *Consensus) executeRound(prop consensus.Proposal, val consensus.Validato
// 4. Broadcast what we received in step 3.
commitSet, err := c.broadcast.send(ctx, Bp.getBest())
if err != nil {
return xerrors.Errorf("couldn't broadcast: %v", err)
return xerrors.Errorf("couldn't broadcast: %w", err)
}

// 5. Get the best history from the second broadcast.
Expand Down Expand Up @@ -177,11 +183,20 @@ func (c *Consensus) executeRound(prop consensus.Proposal, val consensus.Validato
//
// - implements consensus.Actor
type actor struct {
ch chan consensus.Proposal
ch chan consensus.Proposal
closing chan struct{}
}

// Propose implements consensus.Actor. It sends the proposal to the qsc loop.
// Propose implements consensus.Actor. It sends the proposal to the qsc loop. If
// the actor has been closed, it will panic.
func (a actor) Propose(proposal consensus.Proposal, players mino.Players) error {
a.ch <- proposal
return nil
}

// Close implements consensus.Actor. It stops and cleans the main loop.
func (a actor) Close() error {
close(a.closing)

return nil
}
24 changes: 11 additions & 13 deletions consensus/qsc/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,22 @@ func TestQSC_Basic(t *testing.T) {
actors[i] = actor
}

wg := sync.WaitGroup{}
wg.Add(n)
for j := 0; j < n; j++ {
actor := actors[j]
go func() {
defer wg.Done()
for i := 0; i < k; i++ {
err := actor.Propose(newFakeProposal(), nil)
require.NoError(t, err)
}
}()
}
wg.Wait()

for _, val := range validators {
val.wg.Wait()
}

for _, c := range cons {
require.NoError(t, c.Close())
for _, actor := range actors {
actor.Close()
}

require.Equal(t, cons[0].history, cons[1].history)
Expand All @@ -83,39 +79,41 @@ func TestQSC_ExecuteRound(t *testing.T) {
historiesFactory: factory,
}

ctx := context.Background()

bc.err = xerrors.New("oops")
err := qsc.executeRound(fakeProposal{}, &fakeValidator{})
err := qsc.executeRound(ctx, fakeProposal{}, &fakeValidator{})
require.EqualError(t, err, "couldn't broadcast: oops")

bc.delay = 1
factory.err = xerrors.New("oops")
err = qsc.executeRound(fakeProposal{}, &fakeValidator{})
err = qsc.executeRound(ctx, fakeProposal{}, &fakeValidator{})
require.Error(t, err)
require.True(t, xerrors.Is(err, encoding.NewDecodingError("broadcasted set", nil)))

bc.delay = 1
factory.delay = 1
err = qsc.executeRound(fakeProposal{}, &fakeValidator{})
err = qsc.executeRound(ctx, fakeProposal{}, &fakeValidator{})
require.EqualError(t, err, "couldn't broadcast: oops")

bc.err = nil
factory.delay = 1
err = qsc.executeRound(fakeProposal{}, &fakeValidator{})
err = qsc.executeRound(ctx, fakeProposal{}, &fakeValidator{})
require.Error(t, err)
require.True(t, xerrors.Is(err, encoding.NewDecodingError("received set", nil)))

factory.delay = 2
err = qsc.executeRound(fakeProposal{}, &fakeValidator{})
err = qsc.executeRound(ctx, fakeProposal{}, &fakeValidator{})
require.Error(t, err)
require.True(t, xerrors.Is(err, encoding.NewDecodingError("broadcasted set", nil)))

factory.delay = 3
err = qsc.executeRound(fakeProposal{}, &fakeValidator{})
err = qsc.executeRound(ctx, fakeProposal{}, &fakeValidator{})
require.Error(t, err)
require.True(t, xerrors.Is(err, encoding.NewDecodingError("received set", nil)))

factory.err = nil
err = qsc.executeRound(fakeProposal{}, badValidator{})
err = qsc.executeRound(ctx, fakeProposal{}, badValidator{})
require.EqualError(t, err, "couldn't commit: oops")
}

Expand Down

0 comments on commit 0fc032d

Please sign in to comment.