Skip to content

Commit

Permalink
Merge "[FAB-10654] Enhance eventhub logging with client IP" into rele…
Browse files Browse the repository at this point in the history
…ase-1.0
  • Loading branch information
Jason Yellick authored and Gerrit Code Review committed Jul 26, 2018
2 parents 428d286 + be075c4 commit b23007c
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 57 deletions.
23 changes: 23 additions & 0 deletions common/util/net.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package util

import (
"golang.org/x/net/context"
"google.golang.org/grpc/peer"
)

func ExtractRemoteAddress(ctx context.Context) string {
var remoteAddress string
p, ok := peer.FromContext(ctx)
if !ok {
return ""
}
if address := p.Addr; address != nil {
remoteAddress = address.String()
}
return remoteAddress
}
35 changes: 35 additions & 0 deletions common/util/net_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package util

import (
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc/peer"
)

type addr struct {
}

func (*addr) Network() string {
return ""
}

func (*addr) String() string {
return "1.2.3.4:5000"
}

func TestExtractAddress(t *testing.T) {
ctx := context.Background()
assert.Zero(t, ExtractRemoteAddress(ctx))

ctx = peer.NewContext(ctx, &peer.Peer{
Addr: &addr{},
})
assert.Equal(t, "1.2.3.4:5000", ExtractRemoteAddress(ctx))
}
16 changes: 7 additions & 9 deletions events/producer/eventhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,31 @@ import (

// SendProducerBlockEvent sends block event to clients
func SendProducerBlockEvent(block *common.Block) error {
logger.Debugf("Entry")
defer logger.Debugf("Exit")
bevent := &common.Block{}
bevent.Header = block.Header
bevent.Metadata = block.Metadata
bevent.Data = &common.BlockData{}
var channelId string
var channelID string
for _, d := range block.Data.Data {
ebytes := d
if ebytes != nil {
if env, err := utils.GetEnvelopeFromBlock(ebytes); err != nil {
logger.Errorf("error getting tx from block(%s)\n", err)
logger.Errorf("error getting tx from block: %s", err)
} else if env != nil {
// get the payload from the envelope
payload, err := utils.GetPayload(env)
if err != nil {
return fmt.Errorf("could not extract payload from envelope, err %s", err)
return fmt.Errorf("could not extract payload from envelope: %s", err)
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return err
}
channelId = chdr.ChannelId
channelID = chdr.ChannelId

if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Channel [%s]: Block event for block number [%d] contains transaction id: %s", channelId, block.Header.Number, chdr.TxId)
logger.Debugf("Channel [%s]: Block event for block number [%d] contains transaction id: %s", channelID, block.Header.Number, chdr.TxId)
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
return fmt.Errorf("error unmarshalling transaction payload for block event: %s", err)
Expand Down Expand Up @@ -94,15 +92,15 @@ func SendProducerBlockEvent(block *common.Block) error {
}
ebytes, err = utils.GetBytesEnvelope(env)
if err != nil {
return fmt.Errorf("cannot marshal transaction %s", err)
return fmt.Errorf("cannot marshal transaction: %s", err)
}
}
}
}
bevent.Data.Data = append(bevent.Data.Data, ebytes)
}

logger.Infof("Channel [%s]: Sending event for block number [%d]", channelId, block.Header.Number)
logger.Infof("Channel [%s]: Sending event for block number [%d]", channelID, block.Header.Number)

return Send(CreateBlockEvent(bevent))
}
Expand Down
17 changes: 6 additions & 11 deletions events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func AddEventType(eventType pb.EventType) error {
logger.Debugf("Registering %s", pb.EventType_name[int32(eventType)])
if _, ok := gEventProcessor.eventConsumers[eventType]; ok {
gEventProcessor.Unlock()
return fmt.Errorf("event type exists %s", pb.EventType_name[int32(eventType)])
return fmt.Errorf("event type %s already exists", pb.EventType_name[int32(eventType)])
}

switch eventType {
Expand All @@ -279,7 +279,7 @@ func AddEventType(eventType pb.EventType) error {
}

func registerHandler(ie *pb.Interest, h *handler) error {
logger.Debugf("registering event type: %s", ie.EventType)
logger.Debugf("Registering event type: %s", ie.EventType)
gEventProcessor.Lock()
defer gEventProcessor.Unlock()
if hl, ok := gEventProcessor.eventConsumers[ie.EventType]; !ok {
Expand All @@ -292,7 +292,7 @@ func registerHandler(ie *pb.Interest, h *handler) error {
}

func deRegisterHandler(ie *pb.Interest, h *handler) error {
logger.Debugf("deregistering event type: %s", ie.EventType)
logger.Debugf("Deregistering event type: %s", ie.EventType)

gEventProcessor.Lock()
defer gEventProcessor.Unlock()
Expand All @@ -309,8 +309,6 @@ func deRegisterHandler(ie *pb.Interest, h *handler) error {

//Send sends the event to interested consumers
func Send(e *pb.Event) error {
logger.Debugf("Entry")
defer logger.Debugf("Exit")
if e.Event == nil {
logger.Error("event not set")
return fmt.Errorf("event not set")
Expand All @@ -322,24 +320,21 @@ func Send(e *pb.Event) error {
}

if gEventProcessor.timeout < 0 {
logger.Debugf("Event processor timeout < 0")
select {
case gEventProcessor.eventChannel <- e:
default:
return fmt.Errorf("could not send the blocking event")
return fmt.Errorf("could not add block event to event processor queue")
}
} else if gEventProcessor.timeout == 0 {
logger.Debugf("Event processor timeout = 0")
gEventProcessor.eventChannel <- e
} else {
logger.Debugf("Event processor timeout > 0")
select {
case gEventProcessor.eventChannel <- e:
case <-time.After(gEventProcessor.timeout):
return fmt.Errorf("could not send the blocking event")
return fmt.Errorf("could not add block event to event processor queue")
}
}

logger.Debugf("Event sent successfully")
logger.Debugf("Event added to event processor queue")
return nil
}
80 changes: 44 additions & 36 deletions events/producer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,32 @@ import (

"github.com/golang/protobuf/proto"

"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/msp/mgmt"
pb "github.com/hyperledger/fabric/protos/peer"
)

type handler struct {
ChatStream pb.Events_ChatServer
interestedEvents map[string]*pb.Interest
RemoteAddr string
}

func newEventHandler(stream pb.Events_ChatServer) (*handler, error) {
d := &handler{
ChatStream: stream,
h := &handler{
ChatStream: stream,
interestedEvents: make(map[string]*pb.Interest),
RemoteAddr: util.ExtractRemoteAddress(stream.Context()),
}
d.interestedEvents = make(map[string]*pb.Interest)
return d, nil
logger.Debug("event handler created for", h.RemoteAddr)
return h, nil
}

// Stop stops this handler
func (d *handler) Stop() error {
d.deregisterAll()
d.interestedEvents = nil
func (h *handler) Stop() error {
h.deregisterAll()
h.interestedEvents = nil
logger.Debug("handler stopped for", h.RemoteAddr)
return nil
}

Expand All @@ -56,83 +61,86 @@ func getInterestKey(interest pb.Interest) string {
case pb.EventType_CHAINCODE:
key = "/" + strconv.Itoa(int(pb.EventType_CHAINCODE)) + "/" + interest.GetChaincodeRegInfo().ChaincodeId + "/" + interest.GetChaincodeRegInfo().EventName
default:
logger.Errorf("unknown interest type %s", interest.EventType)
logger.Errorf("unsupported interest type: %s", interest.EventType)
}

return key
}

func (d *handler) register(iMsg []*pb.Interest) error {
func (h *handler) register(iMsg []*pb.Interest) error {
// Could consider passing interest array to registerHandler
// and only lock once for entire array here
for _, v := range iMsg {
if err := registerHandler(v, d); err != nil {
logger.Errorf("could not register %s: %s", v, err)
if err := registerHandler(v, h); err != nil {
logger.Errorf("could not register %s for %s: %s", v, h.RemoteAddr, err)
continue
}
d.interestedEvents[getInterestKey(*v)] = v
h.interestedEvents[getInterestKey(*v)] = v
}

return nil
}

func (d *handler) deregister(iMsg []*pb.Interest) error {
func (h *handler) deregister(iMsg []*pb.Interest) error {
for _, v := range iMsg {
if err := deRegisterHandler(v, d); err != nil {
logger.Errorf("could not deregister %s", v)
if err := deRegisterHandler(v, h); err != nil {
logger.Errorf("could not deregister %s for %s: %s", v, h.RemoteAddr, err)
continue
}
delete(d.interestedEvents, getInterestKey(*v))
delete(h.interestedEvents, getInterestKey(*v))
}
return nil
}

func (d *handler) deregisterAll() {
for k, v := range d.interestedEvents {
if err := deRegisterHandler(v, d); err != nil {
logger.Errorf("could not deregister %s", v)
func (h *handler) deregisterAll() {
for k, v := range h.interestedEvents {
if err := deRegisterHandler(v, h); err != nil {
logger.Errorf("could not deregister %s for %s: %s", v, h.RemoteAddr, err)
continue
}
delete(d.interestedEvents, k)
delete(h.interestedEvents, k)
}
}

// HandleMessage handles the Openchain messages for the Peer.
func (d *handler) HandleMessage(msg *pb.SignedEvent) error {
func (h *handler) HandleMessage(msg *pb.SignedEvent) error {
evt, err := validateEventMessage(msg)
if err != nil {
return fmt.Errorf("event message must be properly signed by an identity from the same organization as the peer: [%s]", err)
return fmt.Errorf("event message must be properly signed by an identity from the same organization as the peer for %s: %s", h.RemoteAddr, err)
}

switch evt.Event.(type) {
case *pb.Event_Register:
eventsObj := evt.GetRegister()
if err := d.register(eventsObj.Events); err != nil {
return fmt.Errorf("could not register events %s", err)
if err := h.register(eventsObj.Events); err != nil {
return fmt.Errorf("could not register events for %s: %s", h.RemoteAddr, err)
}
case *pb.Event_Unregister:
eventsObj := evt.GetUnregister()
if err := d.deregister(eventsObj.Events); err != nil {
return fmt.Errorf("could not unregister events %s", err)
if err := h.deregister(eventsObj.Events); err != nil {
return fmt.Errorf("could not deregister events for %s: %s", h.RemoteAddr, err)
}
case nil:
default:
return fmt.Errorf("invalide type from client %T", evt.Event)
return fmt.Errorf("invalid event type received from %s: %T", h.RemoteAddr, evt.Event)
}
//TODO return supported events.. for now just return the received msg
if err := d.ChatStream.Send(evt); err != nil {
return fmt.Errorf("error sending response to %v: %s", msg, err)
if err := h.ChatStream.Send(evt); err != nil {
return fmt.Errorf("error sending response to %s: %s", h.RemoteAddr, err)
}

return nil
}

// SendMessage sends a message to the remote PEER through the stream
func (d *handler) SendMessage(msg *pb.Event) error {
err := d.ChatStream.Send(msg)
func (h *handler) SendMessage(msg *pb.Event) error {
logger.Debug("sending event to", h.RemoteAddr)
err := h.ChatStream.Send(msg)
if err != nil {
return fmt.Errorf("error Sending message through ChatStream: %s", err)
logger.Debugf("sending event failed for %s: %s", h.RemoteAddr, err)
return fmt.Errorf("error sending message through ChatStream: %s", err)
}
logger.Debug("event sent successfully to", h.RemoteAddr)
return nil
}

Expand Down Expand Up @@ -164,18 +172,18 @@ func validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
// Load MSPPrincipal for policy
principal, err := principalGetter.Get(mgmt.Members)
if err != nil {
return nil, fmt.Errorf("failed getting local MSP principal [member]: [%s]", err)
return nil, fmt.Errorf("failed getting local MSP principal [member]: %s", err)
}

id, err := localMSP.DeserializeIdentity(evt.Creator)
if err != nil {
return nil, fmt.Errorf("failed deserializing event creator: [%s]", err)
return nil, fmt.Errorf("failed deserializing event creator: %s", err)
}

// Verify that event's creator satisfies the principal
err = id.SatisfiesPrincipal(principal)
if err != nil {
return nil, fmt.Errorf("failed verifying the creator satisfies local MSP's [member] principal: [%s]", err)
return nil, fmt.Errorf("failed verifying the creator satisfies local MSP's [member] principal: %s", err)
}

// Verify the signature
Expand Down
2 changes: 1 addition & 1 deletion events/producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (*mockstream) SetTrailer(metadata.MD) {
}

func (*mockstream) Context() context.Context {
panic("not implemented")
return context.Background()
}

func (*mockstream) SendMsg(m interface{}) error {
Expand Down

0 comments on commit b23007c

Please sign in to comment.