Skip to content

Commit

Permalink
Address nkcr's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilthoniel committed Mar 26, 2020
1 parent 7cd9812 commit 3cc4a4d
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 40 deletions.
46 changes: 26 additions & 20 deletions ledger/byzcoin/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Ledger struct {
bc blockchain.Blockchain
txFactory transactionFactory
gossiper gossip.Gossiper
queue *txQueue
queue *txBag
}

// NewLedger creates a new Byzcoin ledger.
Expand All @@ -55,7 +55,7 @@ func NewLedger(mino mino.Mino) *Ledger {
bc: skipchain.NewSkipchain(mino, cosi),
txFactory: factory,
gossiper: gossip.NewFlat(mino, decoder),
queue: newTxQueue(),
queue: newTxBag(),
}
}

Expand Down Expand Up @@ -107,13 +107,18 @@ func (ldgr *Ledger) routine(actor blockchain.Actor, players mino.Players) {
// This timeout has two purposes. The very first use will determine
// the round time before the first block is proposed after a boot.
// Then it will be used to insure that blocks are still proposed in
// of catastrophic failure in the consensus layer (i.e. too many
// players offline for a while).
// case of catastrophic failure in the consensus layer (i.e. too
// many players offline for a while).
go ldgr.proposeBlock(actor, players)

roundTimeout = time.After(timeoutRoundTime)
case block := <-blocks:
payload := block.GetPayload().(*BlockPayload)
payload, ok := block.GetPayload().(*BlockPayload)
if !ok {
fabric.Logger.Warn().Msgf("found invalid payload type '%T' != '%T'",
block.GetPayload(), payload)
break
}

txRes := make([]TransactionResult, len(payload.GetTxs()))
for i, pb := range payload.GetTxs() {
Expand Down Expand Up @@ -155,16 +160,16 @@ func (ldgr *Ledger) proposeBlock(actor blockchain.Actor, players mino.Players) {

func (ldgr *Ledger) makePayload(txs []Transaction) (*BlockPayload, error) {
payload := &BlockPayload{
Txs: make([]*TransactionProto, 0, len(txs)),
Txs: make([]*TransactionProto, len(txs)),
}

for _, tx := range txs {
for i, tx := range txs {
packed, err := tx.Pack()
if err != nil {
return nil, err
return nil, xerrors.Errorf("failed to pack tx: %v", err)
}

payload.Txs = append(payload.Txs, packed.(*TransactionProto))
payload.Txs[i] = packed.(*TransactionProto)
}

return payload, nil
Expand All @@ -185,16 +190,17 @@ func (ldgr *Ledger) Watch(ctx context.Context) <-chan ledger.TransactionResult {
return
}

payload := block.GetPayload().(*BlockPayload)

for _, pb := range payload.GetTxs() {
tx, err := ldgr.txFactory.FromProto(pb)
if err != nil {
return
}

results <- TransactionResult{
txID: tx.(Transaction).hash,
payload, ok := block.GetPayload().(*BlockPayload)
if ok {
for _, txProto := range payload.GetTxs() {
tx, err := ldgr.txFactory.FromProto(txProto)
if err != nil {
return
}

results <- TransactionResult{
txID: tx.(Transaction).hash,
}
}
}
}
Expand All @@ -218,7 +224,7 @@ func newActor(g gossip.Gossiper) actor {
func (a actor) AddTransaction(in ledger.Transaction) error {
tx, ok := in.(Transaction)
if !ok {
return xerrors.Errorf("invalid message type '%T'", in)
return xerrors.Errorf("message type '%T' but expected '%T'", in, tx)
}

// The gossiper will propagate the transaction to other players but also to
Expand Down
4 changes: 2 additions & 2 deletions ledger/byzcoin/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ func (t Transaction) computeHash(f crypto.HashFactory) (Digest, error) {
h := f.New()
_, err := h.Write([]byte(t.value))
if err != nil {
return Digest{}, xerrors.Errorf("couldn't write value: %v", err)
return Digest{}, xerrors.Errorf("couldn't write t.value: %v", err)
}

d := Digest{}
copy(d[:], h.Sum(nil))
return d, nil
}

// TransactionResult contains the result of an execution of a transaction.
// TransactionResult contains the result of a transaction execution.
//
// - implements ledger.TransactionResult
// - implements fmt.Stringer
Expand Down
4 changes: 2 additions & 2 deletions ledger/byzcoin/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestTransaction_NewTransaction(t *testing.T) {
require.NotEmpty(t, tx.hash)

_, err = newTransaction(fakeHashFactory{err: xerrors.New("oops")}, "abc")
require.EqualError(t, err, "couldn't hash the tx: couldn't write value: oops")
require.EqualError(t, err, "couldn't hash the tx: couldn't write t.value: oops")
}

func TestTransaction_GetID(t *testing.T) {
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestTransactionFactory_FromProto(t *testing.T) {
require.Contains(t, err.Error(), "couldn't hash the tx: ")
}

//------------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// Utility functions

type fakeHash struct {
Expand Down
14 changes: 7 additions & 7 deletions ledger/byzcoin/txqueue.go → ledger/byzcoin/txbag.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package byzcoin

import "sync"

// txQueue is a storage abstraction where the transactions are stored while
// txBag is a storage abstraction where the transactions are stored while
// waiting to be included in a block.
type txQueue struct {
type txBag struct {
sync.Mutex
buffer map[Digest]Transaction
}

func newTxQueue() *txQueue {
return &txQueue{
func newTxBag() *txBag {
return &txBag{
buffer: make(map[Digest]Transaction),
}
}

// GetAll returns a list of the transactions currently queued.
func (q *txQueue) GetAll() []Transaction {
func (q *txBag) GetAll() []Transaction {
q.Lock()
defer q.Unlock()

Expand All @@ -29,14 +29,14 @@ func (q *txQueue) GetAll() []Transaction {
}

// Add adds the transaction to the queue.
func (q *txQueue) Add(tx Transaction) {
func (q *txBag) Add(tx Transaction) {
q.Lock()
q.buffer[tx.hash] = tx
q.Unlock()
}

// Remove deletes the transactions associated with the transaction results.
func (q *txQueue) Remove(res ...TransactionResult) {
func (q *txBag) Remove(res ...TransactionResult) {
q.Lock()
for _, txResult := range res {
delete(q.buffer, txResult.txID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestTxQueue_GetAll(t *testing.T) {
q := newTxQueue()
q := newTxBag()

q.buffer[Digest{1}] = Transaction{}
q.buffer[Digest{2}] = Transaction{}
Expand All @@ -17,7 +17,7 @@ func TestTxQueue_GetAll(t *testing.T) {
}

func TestTxQueue_Add(t *testing.T) {
q := newTxQueue()
q := newTxBag()

q.Add(Transaction{hash: Digest{1}})
q.Add(Transaction{hash: Digest{2}})
Expand All @@ -28,7 +28,7 @@ func TestTxQueue_Add(t *testing.T) {
}

func TestTxQueue_Remove(t *testing.T) {
q := newTxQueue()
q := newTxBag()

q.buffer[Digest{1}] = Transaction{}
q.buffer[Digest{2}] = Transaction{}
Expand Down
1 change: 1 addition & 0 deletions ledger/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type Transaction interface {
encoding.Packable

// GetID returns a unique identifier for the transaction.
GetID() []byte
}

Expand Down
14 changes: 9 additions & 5 deletions mino/gossip/flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

// Flat is an implementation of a message passing protocol that is using a flat
// communication approach.
//
// - implements gossip.Gossiper
type Flat struct {
mino mino.Mino
players mino.Players
Expand All @@ -29,8 +31,8 @@ func NewFlat(m mino.Mino, dec Decoder) *Flat {
}
}

// Start creates the RPC and starts to listen for incoming rumors while
// spreading its own ones.
// Start implements gossip.Gossiper. It creates the RPC and starts to listen for
// incoming rumors while spreading its own ones.
func (flat *Flat) Start(players mino.Players) error {
rpc, err := flat.mino.MakeRPC("flatgossip", handler{Flat: flat})
if err != nil {
Expand All @@ -43,14 +45,15 @@ func (flat *Flat) Start(players mino.Players) error {
return nil
}

// Stop stops the gossiper.
// Stop implements gossip.Gossiper. It stops the gossiper.
func (flat *Flat) Stop() error {
flat.rpc = nil

return nil
}

// Add adds the rumor to the pool of rumors. It will be spread to the players.
// Add implements gossip.Gossiper. It adds the rumor to the pool of rumors. It
// will be spread to the players.
func (flat *Flat) Add(rumor Rumor) error {
if flat.rpc == nil {
return xerrors.New("gossiper not started")
Expand Down Expand Up @@ -83,7 +86,8 @@ func (flat *Flat) Add(rumor Rumor) error {
}
}

// Rumors returns the channel that is populated with new rumors.
// Rumors implements gossip.Gossiper. It returns the channel that is populated
// with new rumors.
func (flat *Flat) Rumors() <-chan Rumor {
return flat.ch
}
Expand Down
2 changes: 1 addition & 1 deletion mino/gossip/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
//go:generate protoc -I ./ --go_out=./ ./messages.proto

// Rumor is the message that must be gossiped through the network. It is using
// the identifier as a unique way of differentiate all the rumors.
// the identifier as a unique way to differentiate all the rumors.
type Rumor interface {
encoding.Packable

Expand Down

0 comments on commit 3cc4a4d

Please sign in to comment.