Skip to content

Commit

Permalink
[FAB-1452] Setup mock consumer before using disk
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1452

Consider this:
1. Before we bring up a mock producer and mock consumer we need to
initialize them so that they output the right offsets. We do this via a
process where we add fill-in blocks until we get them to the desired
offset.
2. We move messages between the mock producer to the mock consumer via
goroutine that fetches a message from the producer's disk (an unbuffered
channel) and places it into the consumer's disk (another unbuffered
channel).

Up until now, before proceeding to step 2, we would only make sure that
the producer was set-up. The goroutine then would add a message to the
consumer's disk before the consumer was set-up. As a result, the first
message we would receive past the setup process would sometimes be the
last fill-in message (instead of the first, regular test message).

This changeset adds a hook for checking that consumer has been setup and
modifies the unit tests so that they use it before proceeding with the
message passing between disks.

Change-Id: I3b86f67ffe5110a5165d96bf727f2f5dfa4e462f
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Dec 24, 2016
1 parent 8f1e830 commit b8369e5
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 12 deletions.
22 changes: 11 additions & 11 deletions orderer/kafka/consumer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ type mockConsumerImpl struct {
parentConsumer *mocks.Consumer
chainPartitionManager *mocks.PartitionConsumer
chainPartitionConsumer sarama.PartitionConsumer
disk chan *ab.KafkaMessage
isSetup chan struct{}
t *testing.T

disk chan *ab.KafkaMessage
isSetup chan struct{}
targetOffset int64
t *testing.T
}

func mockNewConsumer(t *testing.T, cp ChainPartition, offset int64, disk chan *ab.KafkaMessage) (Consumer, error) {
Expand All @@ -59,6 +61,7 @@ func mockNewConsumer(t *testing.T, cp ChainPartition, offset int64, disk chan *a
}
mc := &mockConsumerImpl{
consumedOffset: 0,
targetOffset: offset,
chainPartition: cp,

parentConsumer: parentConsumer,
Expand All @@ -69,19 +72,13 @@ func mockNewConsumer(t *testing.T, cp ChainPartition, offset int64, disk chan *a
t: t,
}
// Stop-gap hack until sarama issue #745 is resolved:
if offset >= testOldestOffset && offset <= (testNewestOffset-1) {
mc.testFillWithBlocks(offset - 1) // Prepare the consumer so that the next Recv gives you blob #offset
if mc.targetOffset >= testOldestOffset && mc.targetOffset <= (testNewestOffset-1) {
mc.testFillWithBlocks(mc.targetOffset - 1) // Prepare the consumer so that the next Recv gives you blob #targetOffset
} else {
err = fmt.Errorf("Out of range offset (seek number) given to consumer: %d", offset)
return mc, err
}

if mc.consumedOffset == offset-1 {
close(mc.isSetup)
} else {
mc.t.Fatal("Mock consumer failed to initialize itself properly")
}

return mc, err
}

Expand All @@ -101,6 +98,9 @@ func (mc *mockConsumerImpl) Recv() <-chan *sarama.ConsumerMessage {
case outgoingMsg := <-mc.disk:
mc.consumedOffset++
mc.chainPartitionManager.YieldMessage(testNewConsumerMessage(mc.chainPartition, mc.consumedOffset, outgoingMsg))
if mc.consumedOffset == mc.targetOffset-1 {
close(mc.isSetup) // Hook for callers
}
return mc.chainPartitionConsumer.Messages()
}

Expand Down
1 change: 1 addition & 0 deletions orderer/kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func testConsumerRecvFunc(given, expected int64) func(t *testing.T) {
testClose(t, mc)
t.Fatal("Consumer should have proceeded normally:", err)
}
<-mc.(*mockConsumerImpl).isSetup
go func() {
disk <- newRegularMessage([]byte("foo"))
}()
Expand Down
6 changes: 5 additions & 1 deletion orderer/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func newChain(consenter testableConsenter, support multichain.ConsenterSupport)
halted: false, // Redundant as the default value for booleans is false but added for readability
exitChan: make(chan struct{}),
haltedChan: make(chan struct{}),
setupChan: make(chan struct{}),
}
}

Expand Down Expand Up @@ -130,7 +131,9 @@ type chainImpl struct {
halted bool // For the Enqueue() calls
exitChan chan struct{} // For the Chain's Halt() method

haltedChan chan struct{} // Hook for testing
// Hooks for testing
haltedChan chan struct{}
setupChan chan struct{}
}

// Start allocates the necessary resources for staying up to date with this Chain.
Expand Down Expand Up @@ -158,6 +161,7 @@ func (ch *chainImpl) Start() {
return
}
ch.consumer = consumer
close(ch.setupChan)

// 3. Set the loop the keep up to date with the chain.
go ch.loop()
Expand Down
12 changes: 12 additions & 0 deletions orderer/kafka/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ func TestKafkaConsenterEmptyBatch(t *testing.T) {
case <-time.After(testTimePadding):
t.Fatal("Mock producer not setup in time")
}
// Same for the mock consumer
select {
case <-ch.setupChan:
case <-time.After(testTimePadding):
t.Fatal("Mock consumer not setup in time")
}
}()
wg.Wait()

Expand Down Expand Up @@ -166,6 +172,12 @@ func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) {
case <-time.After(testTimePadding):
t.Fatal("Mock producer not setup in time")
}
// Same for the mock consumer
select {
case <-ch.setupChan:
case <-time.After(testTimePadding):
t.Fatal("Mock consumer not setup in time")
}
}()
wg.Wait()

Expand Down

0 comments on commit b8369e5

Please sign in to comment.