diff --git a/ledger/byzcoin/mod.go b/ledger/byzcoin/mod.go index 552098db2..54d8031ff 100644 --- a/ledger/byzcoin/mod.go +++ b/ledger/byzcoin/mod.go @@ -37,7 +37,7 @@ type Ledger struct { bc blockchain.Blockchain txFactory transactionFactory gossiper gossip.Gossiper - queue *txQueue + queue *txBag } // NewLedger creates a new Byzcoin ledger. @@ -55,7 +55,7 @@ func NewLedger(mino mino.Mino) *Ledger { bc: skipchain.NewSkipchain(mino, cosi), txFactory: factory, gossiper: gossip.NewFlat(mino, decoder), - queue: newTxQueue(), + queue: newTxBag(), } } @@ -107,13 +107,18 @@ func (ldgr *Ledger) routine(actor blockchain.Actor, players mino.Players) { // This timeout has two purposes. The very first use will determine // the round time before the first block is proposed after a boot. // Then it will be used to insure that blocks are still proposed in - // of catastrophic failure in the consensus layer (i.e. too many - // players offline for a while). + // case of catastrophic failure in the consensus layer (i.e. too + // many players offline for a while). go ldgr.proposeBlock(actor, players) roundTimeout = time.After(timeoutRoundTime) case block := <-blocks: - payload := block.GetPayload().(*BlockPayload) + payload, ok := block.GetPayload().(*BlockPayload) + if !ok { + fabric.Logger.Warn().Msgf("found invalid payload type '%T' != '%T'", + block.GetPayload(), payload) + break + } txRes := make([]TransactionResult, len(payload.GetTxs())) for i, pb := range payload.GetTxs() { @@ -155,16 +160,16 @@ func (ldgr *Ledger) proposeBlock(actor blockchain.Actor, players mino.Players) { func (ldgr *Ledger) makePayload(txs []Transaction) (*BlockPayload, error) { payload := &BlockPayload{ - Txs: make([]*TransactionProto, 0, len(txs)), + Txs: make([]*TransactionProto, len(txs)), } - for _, tx := range txs { + for i, tx := range txs { packed, err := tx.Pack() if err != nil { - return nil, err + return nil, xerrors.Errorf("failed to pack tx: %v", err) } - payload.Txs = append(payload.Txs, packed.(*TransactionProto)) + payload.Txs[i] = packed.(*TransactionProto) } return payload, nil @@ -185,16 +190,17 @@ func (ldgr *Ledger) Watch(ctx context.Context) <-chan ledger.TransactionResult { return } - payload := block.GetPayload().(*BlockPayload) - - for _, pb := range payload.GetTxs() { - tx, err := ldgr.txFactory.FromProto(pb) - if err != nil { - return - } - - results <- TransactionResult{ - txID: tx.(Transaction).hash, + payload, ok := block.GetPayload().(*BlockPayload) + if ok { + for _, txProto := range payload.GetTxs() { + tx, err := ldgr.txFactory.FromProto(txProto) + if err != nil { + return + } + + results <- TransactionResult{ + txID: tx.(Transaction).hash, + } } } } @@ -218,7 +224,7 @@ func newActor(g gossip.Gossiper) actor { func (a actor) AddTransaction(in ledger.Transaction) error { tx, ok := in.(Transaction) if !ok { - return xerrors.Errorf("invalid message type '%T'", in) + return xerrors.Errorf("message type '%T' but expected '%T'", in, tx) } // The gossiper will propagate the transaction to other players but also to diff --git a/ledger/byzcoin/transaction.go b/ledger/byzcoin/transaction.go index 8d81e7312..e1b80baa4 100644 --- a/ledger/byzcoin/transaction.go +++ b/ledger/byzcoin/transaction.go @@ -59,7 +59,7 @@ func (t Transaction) computeHash(f crypto.HashFactory) (Digest, error) { h := f.New() _, err := h.Write([]byte(t.value)) if err != nil { - return Digest{}, xerrors.Errorf("couldn't write value: %v", err) + return Digest{}, xerrors.Errorf("couldn't write t.value: %v", err) } d := Digest{} @@ -67,7 +67,7 @@ func (t Transaction) computeHash(f crypto.HashFactory) (Digest, error) { return d, nil } -// TransactionResult contains the result of an execution of a transaction. +// TransactionResult contains the result of a transaction execution. // // - implements ledger.TransactionResult // - implements fmt.Stringer diff --git a/ledger/byzcoin/transaction_test.go b/ledger/byzcoin/transaction_test.go index 1cff9f147..2652b1ef5 100644 --- a/ledger/byzcoin/transaction_test.go +++ b/ledger/byzcoin/transaction_test.go @@ -26,7 +26,7 @@ func TestTransaction_NewTransaction(t *testing.T) { require.NotEmpty(t, tx.hash) _, err = newTransaction(fakeHashFactory{err: xerrors.New("oops")}, "abc") - require.EqualError(t, err, "couldn't hash the tx: couldn't write value: oops") + require.EqualError(t, err, "couldn't hash the tx: couldn't write t.value: oops") } func TestTransaction_GetID(t *testing.T) { @@ -102,7 +102,7 @@ func TestTransactionFactory_FromProto(t *testing.T) { require.Contains(t, err.Error(), "couldn't hash the tx: ") } -//------------------------------------------------------------------------------ +// ----------------------------------------------------------------------------- // Utility functions type fakeHash struct { diff --git a/ledger/byzcoin/txqueue.go b/ledger/byzcoin/txbag.go similarity index 69% rename from ledger/byzcoin/txqueue.go rename to ledger/byzcoin/txbag.go index 3326c3447..8d85c8486 100644 --- a/ledger/byzcoin/txqueue.go +++ b/ledger/byzcoin/txbag.go @@ -2,21 +2,21 @@ package byzcoin import "sync" -// txQueue is a storage abstraction where the transactions are stored while +// txBag is a storage abstraction where the transactions are stored while // waiting to be included in a block. -type txQueue struct { +type txBag struct { sync.Mutex buffer map[Digest]Transaction } -func newTxQueue() *txQueue { - return &txQueue{ +func newTxBag() *txBag { + return &txBag{ buffer: make(map[Digest]Transaction), } } // GetAll returns a list of the transactions currently queued. -func (q *txQueue) GetAll() []Transaction { +func (q *txBag) GetAll() []Transaction { q.Lock() defer q.Unlock() @@ -29,14 +29,14 @@ func (q *txQueue) GetAll() []Transaction { } // Add adds the transaction to the queue. -func (q *txQueue) Add(tx Transaction) { +func (q *txBag) Add(tx Transaction) { q.Lock() q.buffer[tx.hash] = tx q.Unlock() } // Remove deletes the transactions associated with the transaction results. -func (q *txQueue) Remove(res ...TransactionResult) { +func (q *txBag) Remove(res ...TransactionResult) { q.Lock() for _, txResult := range res { delete(q.buffer, txResult.txID) diff --git a/ledger/byzcoin/txqueue_test.go b/ledger/byzcoin/txbag_test.go similarity index 94% rename from ledger/byzcoin/txqueue_test.go rename to ledger/byzcoin/txbag_test.go index dab97d9a0..bb7f69184 100644 --- a/ledger/byzcoin/txqueue_test.go +++ b/ledger/byzcoin/txbag_test.go @@ -7,7 +7,7 @@ import ( ) func TestTxQueue_GetAll(t *testing.T) { - q := newTxQueue() + q := newTxBag() q.buffer[Digest{1}] = Transaction{} q.buffer[Digest{2}] = Transaction{} @@ -17,7 +17,7 @@ func TestTxQueue_GetAll(t *testing.T) { } func TestTxQueue_Add(t *testing.T) { - q := newTxQueue() + q := newTxBag() q.Add(Transaction{hash: Digest{1}}) q.Add(Transaction{hash: Digest{2}}) @@ -28,7 +28,7 @@ func TestTxQueue_Add(t *testing.T) { } func TestTxQueue_Remove(t *testing.T) { - q := newTxQueue() + q := newTxBag() q.buffer[Digest{1}] = Transaction{} q.buffer[Digest{2}] = Transaction{} diff --git a/ledger/mod.go b/ledger/mod.go index 5e9d3573d..6f68b6b38 100644 --- a/ledger/mod.go +++ b/ledger/mod.go @@ -12,6 +12,7 @@ import ( type Transaction interface { encoding.Packable + // GetID returns a unique identifier for the transaction. GetID() []byte } diff --git a/mino/gossip/flat.go b/mino/gossip/flat.go index aaf49cdc4..476c4d651 100644 --- a/mino/gossip/flat.go +++ b/mino/gossip/flat.go @@ -12,6 +12,8 @@ import ( // Flat is an implementation of a message passing protocol that is using a flat // communication approach. +// +// - implements gossip.Gossiper type Flat struct { mino mino.Mino players mino.Players @@ -29,8 +31,8 @@ func NewFlat(m mino.Mino, dec Decoder) *Flat { } } -// Start creates the RPC and starts to listen for incoming rumors while -// spreading its own ones. +// Start implements gossip.Gossiper. It creates the RPC and starts to listen for +// incoming rumors while spreading its own ones. func (flat *Flat) Start(players mino.Players) error { rpc, err := flat.mino.MakeRPC("flatgossip", handler{Flat: flat}) if err != nil { @@ -43,14 +45,15 @@ func (flat *Flat) Start(players mino.Players) error { return nil } -// Stop stops the gossiper. +// Stop implements gossip.Gossiper. It stops the gossiper. func (flat *Flat) Stop() error { flat.rpc = nil return nil } -// Add adds the rumor to the pool of rumors. It will be spread to the players. +// Add implements gossip.Gossiper. It adds the rumor to the pool of rumors. It +// will be spread to the players. func (flat *Flat) Add(rumor Rumor) error { if flat.rpc == nil { return xerrors.New("gossiper not started") @@ -83,7 +86,8 @@ func (flat *Flat) Add(rumor Rumor) error { } } -// Rumors returns the channel that is populated with new rumors. +// Rumors implements gossip.Gossiper. It returns the channel that is populated +// with new rumors. func (flat *Flat) Rumors() <-chan Rumor { return flat.ch } diff --git a/mino/gossip/mod.go b/mino/gossip/mod.go index 1c0b83d3f..a7d210b9c 100644 --- a/mino/gossip/mod.go +++ b/mino/gossip/mod.go @@ -9,7 +9,7 @@ import ( //go:generate protoc -I ./ --go_out=./ ./messages.proto // Rumor is the message that must be gossiped through the network. It is using -// the identifier as a unique way of differentiate all the rumors. +// the identifier as a unique way to differentiate all the rumors. type Rumor interface { encoding.Packable