Skip to content

Commit

Permalink
[FAB-3899] Improve test coverage - events
Browse files Browse the repository at this point in the history
This CR improves the test coverage for the events/producer package
from 15% to around 85% and the events package from not counted to 70%.

Change-Id: Ifb0c9207f936c1bf5d845616405b4e8134579655
Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
  • Loading branch information
wlahti committed May 17, 2017
1 parent fd026e9 commit 8629463
Show file tree
Hide file tree
Showing 5 changed files with 592 additions and 14 deletions.
32 changes: 28 additions & 4 deletions events/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ func (ec *EventsClient) send(emsg *ehpb.Event) error {

// RegisterAsync - registers interest in a event and doesn't wait for a response
func (ec *EventsClient) RegisterAsync(ies []*ehpb.Interest) error {
emsg := &ehpb.Event{Event: &ehpb.Event_Register{Register: &ehpb.Register{Events: ies}}}
var err error
creator, err := getCreatorFromLocalMSP()
if err != nil {
return fmt.Errorf("error getting creator from MSP: %s", err)
}
emsg := &ehpb.Event{Event: &ehpb.Event_Register{Register: &ehpb.Register{Events: ies}}, Creator: creator}

if err = ec.send(emsg); err != nil {
consumerLogger.Errorf("error on Register send %s\n", err)
}
Expand Down Expand Up @@ -131,8 +135,12 @@ func (ec *EventsClient) register(ies []*ehpb.Interest) error {

// UnregisterAsync - Unregisters interest in a event and doesn't wait for a response
func (ec *EventsClient) UnregisterAsync(ies []*ehpb.Interest) error {
emsg := &ehpb.Event{Event: &ehpb.Event_Unregister{Unregister: &ehpb.Unregister{Events: ies}}}
var err error
creator, err := getCreatorFromLocalMSP()
if err != nil {
return fmt.Errorf("error getting creator from MSP: %s", err)
}
emsg := &ehpb.Event{Event: &ehpb.Event_Unregister{Unregister: &ehpb.Unregister{Events: ies}}, Creator: creator}

if err = ec.send(emsg); err != nil {
err = fmt.Errorf("error on unregister send %s\n", err)
}
Expand Down Expand Up @@ -223,3 +231,19 @@ func (ec *EventsClient) Stop() error {
}
return ec.stream.CloseSend()
}

func getCreatorFromLocalMSP() ([]byte, error) {
localMsp := mspmgmt.GetLocalMSP()
if localMsp == nil {
return nil, errors.New("nil local MSP manager")
}
signer, err := localMsp.GetDefaultSigningIdentity()
if err != nil {
return nil, fmt.Errorf("could not obtain the default signing identity, err %s", err)
}
creator, err := signer.Serialize()
if err != nil {
return nil, fmt.Errorf("error serializing the signer: %s", err)
}
return creator, nil
}
14 changes: 11 additions & 3 deletions events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/events/consumer"
"github.com/hyperledger/fabric/events/producer"
"github.com/hyperledger/fabric/msp/mgmt/testtools"
"github.com/hyperledger/fabric/protos/common"
ehpb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
Expand Down Expand Up @@ -165,7 +166,7 @@ func TestReceiveMessage(t *testing.T) {
case <-adapter.notfy:
case <-time.After(2 * time.Second):
t.Fail()
t.Logf("timed out on messge")
t.Logf("timed out on message")
}
}
func TestReceiveAnyMessage(t *testing.T) {
Expand All @@ -190,7 +191,7 @@ func TestReceiveAnyMessage(t *testing.T) {
case <-adapter.notfy:
case <-time.After(5 * time.Second):
t.Fail()
t.Logf("timed out on messge")
t.Logf("timed out on message")
}
}
}
Expand Down Expand Up @@ -327,6 +328,14 @@ func BenchmarkMessages(b *testing.B) {
}

func TestMain(m *testing.M) {
// setup crypto algorithms
// setup the MSP manager so that we can sign/verify
err := msptesttools.LoadMSPSetupForTesting()
if err != nil {
fmt.Printf("Could not initialize msp, err %s", err)
os.Exit(-1)
return
}
SetupTestConfig()
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
Expand All @@ -353,7 +362,6 @@ func TestMain(m *testing.M) {
ehServer := producer.NewEventsServer(100, 0)
ehpb.RegisterEventsServer(grpcServer, ehServer)

fmt.Printf("Starting events server\n")
go grpcServer.Serve(lis)

var regTimeout = 5 * time.Second
Expand Down
5 changes: 2 additions & 3 deletions events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ func AddEventType(eventType pb.EventType) error {
}

func registerHandler(ie *pb.Interest, h *handler) error {
logger.Debugf("registerHandler %s", ie.EventType)

logger.Debugf("registering event type: %s", ie.EventType)
gEventProcessor.Lock()
defer gEventProcessor.Unlock()
if hl, ok := gEventProcessor.eventConsumers[ie.EventType]; !ok {
Expand All @@ -286,7 +285,7 @@ func registerHandler(ie *pb.Interest, h *handler) error {
}

func deRegisterHandler(ie *pb.Interest, h *handler) error {
logger.Debugf("deRegisterHandler %s", ie.EventType)
logger.Debugf("deregistering event type: %s", ie.EventType)

gEventProcessor.Lock()
defer gEventProcessor.Unlock()
Expand Down
166 changes: 166 additions & 0 deletions events/producer/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package producer

import (
"context"
"io"
"testing"
"time"

"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/protos/peer"
ehpb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

var peerAddress = "0.0.0.0:60303"

type client struct {
conn *grpc.ClientConn
stream peer.Events_ChatClient
}

func newClient() *client {
conn, err := comm.NewClientConnectionWithAddress(peerAddress, true, false, nil)
if err != nil {
panic(err)
}

stream, err := peer.NewEventsClient(conn).Chat(context.Background())
if err != nil {
panic(err)
}

cl := &client{
conn: conn,
stream: stream,
}
go cl.processEvents()
return cl
}

func (c *client) register(ies []*peer.Interest) error {
emsg := &peer.Event{Event: &peer.Event_Register{Register: &peer.Register{Events: ies}}, Creator: signerSerialized}
se, err := utils.GetSignedEvent(emsg, signer)
if err != nil {
return err
}
return c.stream.Send(se)
}

func (c *client) unregister(ies []*peer.Interest) error {
emsg := &peer.Event{Event: &peer.Event_Unregister{Unregister: &peer.Unregister{Events: ies}}, Creator: signerSerialized}
se, err := utils.GetSignedEvent(emsg, signer)
if err != nil {
return err
}
return c.stream.Send(se)
}

func (c *client) processEvents() error {
defer c.stream.CloseSend()
for {
_, err := c.stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
}

func TestEvents(t *testing.T) {
test := func(duration time.Duration) {
t.Log(duration)
f := func() {
Send(nil)
}
assert.Panics(t, f)
Send(&peer.Event{})
gEventProcessorBck := gEventProcessor
gEventProcessor = nil
e, err := createEvent()
assert.NoError(t, err)
Send(e)
gEventProcessor = gEventProcessorBck
Send(e)
}
for _, timeout := range []time.Duration{0, -1, 1} {
gEventProcessor.timeout = timeout
test(timeout)
}
}

func TestDeRegister(t *testing.T) {
f := func() {
deRegisterHandler(nil, nil)
}
assert.Panics(t, f)
assert.Error(t, deRegisterHandler(&peer.Interest{EventType: 100}, nil))
assert.Error(t, deRegisterHandler(&peer.Interest{EventType: peer.EventType_BLOCK}, nil))
}

func TestRegister(t *testing.T) {
f := func() {
registerHandler(nil, nil)
}
assert.Panics(t, f)

// attempt to register handlers (invalid type or nil handlers)
assert.Error(t, registerHandler(&peer.Interest{EventType: 100}, nil))

// attempt to register valid handler
recvChan := make(chan *streamEvent)
stream := &mockstream{c: recvChan}
handler, err := newEventHandler(stream)
assert.Nil(t, err, "error should have been nil")
assert.NoError(t, registerHandler(&peer.Interest{EventType: peer.EventType_BLOCK}, handler))
}

func TestProcessEvents(t *testing.T) {
cl := newClient()
interests := []*peer.Interest{
{EventType: peer.EventType_BLOCK},
{EventType: peer.EventType_CHAINCODE, RegInfo: &peer.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &peer.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event1"}}},
{EventType: peer.EventType_CHAINCODE, RegInfo: &peer.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &peer.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event2"}}},
}
cl.register(interests)
e, err := createEvent()
assert.NoError(t, err)
go Send(e)
time.Sleep(time.Second * 2)
cl.unregister(interests)
time.Sleep(time.Second * 2)
}

func TestInitializeEvents_twice(t *testing.T) {
initializeEventsTwice := func() {
initializeEvents(
uint(viper.GetInt("peer.events.buffersize")),
viper.GetDuration("peer.events.timeout"))
}
assert.Panics(t, initializeEventsTwice)
}

func TestAddEventType_alreadyDefined(t *testing.T) {
assert.Error(t, AddEventType(ehpb.EventType_CHAINCODE), "chaincode type already defined")
}
Loading

0 comments on commit 8629463

Please sign in to comment.