Skip to content

Commit

Permalink
[FAB-1161] Push genesis block upon orderer init
Browse files Browse the repository at this point in the history
Before this changeset, the genesis block would be pushed with the first
broadcast request. A side-effect of this is that any deliver requests
reaching the orderer before the first broadcast request would panic.

This changeset fixes that by having the orderer push the genesis block
(and create the respective Kafka topic) upon initialization.

Change-Id: Ice5d84b45bcd1565f916f8ac13073380090e6036
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Nov 20, 2016
1 parent 5ce0d90 commit c1e6fb4
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 55 deletions.
20 changes: 10 additions & 10 deletions orderer/kafka/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,32 @@ type broadcasterImpl struct {
prevHash []byte
}

type broadcastSessionResponder struct {
queue chan *ab.BroadcastResponse
}

func newBroadcaster(conf *config.TopLevel) Broadcaster {
genesisBlock, _ := static.New().GenesisBlock()
return &broadcasterImpl{

b := &broadcasterImpl{
producer: newProducer(conf),
config: conf,
batchChan: make(chan *cb.Envelope, conf.General.BatchSize),
messages: genesisBlock.GetData().Data,
nextNumber: 0,
}
}

// Broadcast receives ordering requests by clients and sends back an
// acknowledgement for each received message in order, indicating
// success or type of failure
func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error {
b.once.Do(func() {
// Send the genesis block to create the topic
// otherwise consumers will throw an exception.
b.sendBlock()
// Spawn the goroutine that cuts blocks
go b.cutBlock(b.config.General.BatchTimeout, b.config.General.BatchSize)
})

return b
}

// Broadcast receives ordering requests by clients and sends back an
// acknowledgement for each received message in order, indicating
// success or type of failure
func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error {
return b.recvRequests(stream)
}

Expand Down
30 changes: 29 additions & 1 deletion orderer/kafka/broadcast_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,47 @@ limitations under the License.
package kafka

import (
"fmt"
"testing"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/orderer/common/bootstrap/static"
"github.com/hyperledger/fabric/orderer/config"
cb "github.com/hyperledger/fabric/protos/common"
)

func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk chan []byte) Broadcaster {
genesisBlock, _ := static.New().GenesisBlock()
wait := make(chan struct{})

mb := &broadcasterImpl{
producer: mockNewProducer(t, conf, seek, disk),
config: conf,
batchChan: make(chan *cb.Envelope, conf.General.BatchSize),
messages: [][]byte{[]byte("checkpoint")},
messages: genesisBlock.GetData().Data,
nextNumber: uint64(seek),
}

go func() {
rxBlockBytes := <-disk
rxBlock := &cb.Block{}
if err := proto.Unmarshal(rxBlockBytes, rxBlock); err != nil {
panic(err)
}
if !proto.Equal(rxBlock.GetData(), genesisBlock.GetData()) {
panic(fmt.Errorf("Broadcaster not functioning as expected"))
}
close(wait)
}()

mb.once.Do(func() {
// Send the genesis block to create the topic
// otherwise consumers will throw an exception.
mb.sendBlock()
// Spawn the goroutine that cuts blocks
go mb.cutBlock(mb.config.General.BatchTimeout, mb.config.General.BatchSize)
})
<-wait

return mb
}
51 changes: 7 additions & 44 deletions orderer/kafka/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,15 @@ limitations under the License.
package kafka

import (
"bytes"
"strconv"
"sync"
"testing"
"time"

"github.com/golang/protobuf/proto"
cb "github.com/hyperledger/fabric/protos/common"
)

func TestBroadcastInit(t *testing.T) {
disk := make(chan []byte)

mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
defer testClose(t, mb)

mbs := newMockBroadcastStream(t)
go func() {
if err := mb.Broadcast(mbs); err != nil {
t.Fatal("Broadcast error:", err)
}
}()

for {
select {
case in := <-disk:
block := new(cb.Block)
err := proto.Unmarshal(in, block)
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if !(bytes.Equal(block.Data.Data[0], []byte("checkpoint"))) {
t.Fatal("Expected first block to be a checkpoint")
}
return
case <-time.After(500 * time.Millisecond):
t.Fatal("Should have received the initialization block by now")
}
}
}

func TestBroadcastResponse(t *testing.T) {
disk := make(chan []byte)

Expand All @@ -70,8 +39,6 @@ func TestBroadcastResponse(t *testing.T) {
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Send a message to the orderer
go func() {
mbs.incoming <- &cb.Envelope{Payload: []byte("single message")}
Expand Down Expand Up @@ -103,8 +70,6 @@ func TestBroadcastBatch(t *testing.T) {
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Pump a batch's worth of messages into the system
go func() {
for i := 0; i < int(testConf.General.BatchSize); i++ {
Expand Down Expand Up @@ -158,8 +123,6 @@ func TestBroadcastBatch(t *testing.T) {
}
}()
<-disk // We tested the checkpoint block in a previous test, so we can ignore it now
// Force the response queue to overflow by blocking the broadcast stream's Send() method
mbs.closed = true
defer func() { mbs.closed = false }()
Expand Down Expand Up @@ -201,8 +164,6 @@ func TestBroadcastIncompleteBatch(t *testing.T) {
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Pump less than batchSize messages into the system
go func() {
for i := 0; i < messageCount; i++ {
Expand Down Expand Up @@ -239,6 +200,8 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
t.Skip("Skipping test as it requires a batchsize > 1")
}

var once sync.Once

messageCount := int(testConf.General.BatchSize) - 1

disk := make(chan []byte)
Expand All @@ -254,8 +217,6 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
}()

for i := 0; i < 2; i++ {
<-disk // Checkpoint block in first pass, first incomplete block in second pass -- both tested elsewhere

// Pump less than batchSize messages into the system
go func() {
for i := 0; i < messageCount; i++ {
Expand All @@ -268,6 +229,10 @@ func TestBroadcastConsecutiveIncompleteBatches(t *testing.T) {
for i := 0; i < messageCount; i++ {
<-mbs.outgoing
}

once.Do(func() {
<-disk // First incomplete block, tested elsewhere
})
}

for {
Expand Down Expand Up @@ -301,8 +266,6 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) {
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Pump a batch's worth of messages into the system
go func() {
for i := 0; i < int(testConf.General.BatchSize); i++ {
Expand Down

0 comments on commit c1e6fb4

Please sign in to comment.