Skip to content

Commit

Permalink
[FAB-3233] Enhance logging when peer sends block event
Browse files Browse the repository at this point in the history
This CR adds logging messages to help debug issues that may occur
when sending block events from the peer. It also cleans up the loggers
for the events/producer package to use the same logger object.

Change-Id: Ic6f8d5cc9699174a6d44740d8f6e04f50e56b3c9
Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
  • Loading branch information
wlahti committed Apr 19, 2017
1 parent a747010 commit f540a5a
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 23 deletions.
15 changes: 8 additions & 7 deletions events/producer/eventhelper.go
Expand Up @@ -22,21 +22,17 @@ import (
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
)

var logger *logging.Logger // package-level logger

func init() {
logger = logging.MustGetLogger("eventhub_producer")
}

// SendProducerBlockEvent sends block event to clients
func SendProducerBlockEvent(block *common.Block) error {
logger.Debugf("Entry")
defer logger.Debugf("Exit")
bevent := &common.Block{}
bevent.Header = block.Header
bevent.Metadata = block.Metadata
bevent.Data = &common.BlockData{}
var channelId string
for _, d := range block.Data.Data {
ebytes := d
if ebytes != nil {
Expand All @@ -53,8 +49,10 @@ func SendProducerBlockEvent(block *common.Block) error {
if err != nil {
return err
}
channelId = chdr.ChannelId

if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Channel [%s]: Block event for block number [%d] contains transaction id: %s", channelId, block.Header.Number, chdr.TxId)
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
return fmt.Errorf("error unmarshalling transaction payload for block event: %s", err)
Expand Down Expand Up @@ -103,6 +101,9 @@ func SendProducerBlockEvent(block *common.Block) error {
}
bevent.Data.Data = append(bevent.Data.Data, ebytes)
}

logger.Infof("Channel [%s]: Sending event for block number [%d]", channelId, block.Header.Number)

return Send(CreateBlockEvent(bevent))
}

Expand Down
19 changes: 13 additions & 6 deletions events/producer/events.go
Expand Up @@ -210,7 +210,7 @@ type eventProcessor struct {
var gEventProcessor *eventProcessor

func (ep *eventProcessor) start() {
producerLogger.Info("Event processor started")
logger.Info("Event processor started")
for {
//wait for event
e := <-ep.eventChannel
Expand All @@ -219,7 +219,7 @@ func (ep *eventProcessor) start() {
eType := getMessageType(e)
ep.Lock()
if hl, _ = ep.eventConsumers[eType]; hl == nil {
producerLogger.Errorf("event of type %s does not exist", eType)
logger.Errorf("Event of type %s does not exist", eType)
ep.Unlock()
continue
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func initializeEvents(bufferSize uint, tout int) {
//AddEventType supported event
func AddEventType(eventType pb.EventType) error {
gEventProcessor.Lock()
producerLogger.Debugf("registering %s", pb.EventType_name[int32(eventType)])
logger.Debugf("Registering %s", pb.EventType_name[int32(eventType)])
if _, ok := gEventProcessor.eventConsumers[eventType]; ok {
gEventProcessor.Unlock()
return fmt.Errorf("event type exists %s", pb.EventType_name[int32(eventType)])
Expand All @@ -272,7 +272,7 @@ func AddEventType(eventType pb.EventType) error {
}

func registerHandler(ie *pb.Interest, h *handler) error {
producerLogger.Debugf("registerHandler %s", ie.EventType)
logger.Debugf("registerHandler %s", ie.EventType)

gEventProcessor.Lock()
defer gEventProcessor.Unlock()
Expand All @@ -286,7 +286,7 @@ func registerHandler(ie *pb.Interest, h *handler) error {
}

func deRegisterHandler(ie *pb.Interest, h *handler) error {
producerLogger.Debugf("deRegisterHandler %s", ie.EventType)
logger.Debugf("deRegisterHandler %s", ie.EventType)

gEventProcessor.Lock()
defer gEventProcessor.Unlock()
Expand All @@ -303,30 +303,37 @@ func deRegisterHandler(ie *pb.Interest, h *handler) error {

//Send sends the event to interested consumers
func Send(e *pb.Event) error {
logger.Debugf("Entry")
defer logger.Debugf("Exit")
if e.Event == nil {
producerLogger.Error("event not set")
logger.Error("event not set")
return fmt.Errorf("event not set")
}

if gEventProcessor == nil {
logger.Debugf("Event processor is nil")
return nil
}

if gEventProcessor.timeout < 0 {
logger.Debugf("Event processor timeout < 0")
select {
case gEventProcessor.eventChannel <- e:
default:
return fmt.Errorf("could not send the blocking event")
}
} else if gEventProcessor.timeout == 0 {
logger.Debugf("Event processor timeout = 0")
gEventProcessor.eventChannel <- e
} else {
logger.Debugf("Event processor timeout > 0")
select {
case gEventProcessor.eventChannel <- e:
case <-time.After(time.Duration(gEventProcessor.timeout) * time.Millisecond):
return fmt.Errorf("could not send the blocking event")
}
}

logger.Debugf("Event sent successfully")
return nil
}
10 changes: 5 additions & 5 deletions events/producer/handler.go
Expand Up @@ -57,7 +57,7 @@ func getInterestKey(interest pb.Interest) string {
case pb.EventType_CHAINCODE:
key = "/" + strconv.Itoa(int(pb.EventType_CHAINCODE)) + "/" + interest.GetChaincodeRegInfo().ChaincodeId + "/" + interest.GetChaincodeRegInfo().EventName
default:
producerLogger.Errorf("unknown interest type %s", interest.EventType)
logger.Errorf("unknown interest type %s", interest.EventType)
}

return key
Expand All @@ -68,7 +68,7 @@ func (d *handler) register(iMsg []*pb.Interest) error {
// and only lock once for entire array here
for _, v := range iMsg {
if err := registerHandler(v, d); err != nil {
producerLogger.Errorf("could not register %s: %s", v, err)
logger.Errorf("could not register %s: %s", v, err)
continue
}
d.interestedEvents[getInterestKey(*v)] = v
Expand All @@ -80,7 +80,7 @@ func (d *handler) register(iMsg []*pb.Interest) error {
func (d *handler) deregister(iMsg []*pb.Interest) error {
for _, v := range iMsg {
if err := deRegisterHandler(v, d); err != nil {
producerLogger.Errorf("could not deregister %s", v)
logger.Errorf("could not deregister %s", v)
continue
}
delete(d.interestedEvents, getInterestKey(*v))
Expand All @@ -91,7 +91,7 @@ func (d *handler) deregister(iMsg []*pb.Interest) error {
func (d *handler) deregisterAll() {
for k, v := range d.interestedEvents {
if err := deRegisterHandler(v, d); err != nil {
producerLogger.Errorf("could not deregister %s", v)
logger.Errorf("could not deregister %s", v)
continue
}
delete(d.interestedEvents, k)
Expand Down Expand Up @@ -149,7 +149,7 @@ func (d *handler) SendMessage(msg *pb.Event) error {
// minimally viable release. Eventually events will be made channel-specific, at which point this method
// should be revisited
func validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
producerLogger.Debugf("ValidateEventMessage starts for signed event %p", signedEvt)
logger.Debugf("ValidateEventMessage starts for signed event %p", signedEvt)

// messages from the client for registering and unregistering must be signed
// and accompanied by the signing certificate in the "Creator" field
Expand Down
9 changes: 4 additions & 5 deletions events/producer/producer.go
Expand Up @@ -27,7 +27,7 @@ import (

const defaultTimeout = time.Second * 3

var producerLogger = logging.MustGetLogger("eventhub_producer")
var logger = logging.MustGetLogger("eventhub_producer")

// EventsServer implementation of the Peer service
type EventsServer struct {
Expand Down Expand Up @@ -57,19 +57,18 @@ func (p *EventsServer) Chat(stream pb.Events_ChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
producerLogger.Debug("Received EOF, ending Chat")
logger.Debug("Received EOF, ending Chat")
return nil
}
if err != nil {
e := fmt.Errorf("error during Chat, stopping handler: %s", err)
producerLogger.Error(e.Error())
logger.Error(e.Error())
return e
}
err = handler.HandleMessage(in)
if err != nil {
producerLogger.Errorf("error handling message: %s", err)
logger.Errorf("Error handling message: %s", err)
return err
}

}
}

0 comments on commit f540a5a

Please sign in to comment.