Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
54d38ca
[new-prom-metrics] prometheus metrics to track broadcasts, private ms…
Feb 3, 2022
90f804a
Merge branch 'main' of github.com:hyperledger/firefly into new-prom-m…
Feb 4, 2022
7f46f1f
[new-prom-metrics] using global time map
Feb 4, 2022
36c6067
Merge branch 'main' of github.com:hyperledger/firefly into new-prom-m…
Feb 4, 2022
fe7e49e
Merge branch 'fix-481' of github.com:kaleido-io/firefly into new-prom…
Feb 6, 2022
a62009e
[new-prom-metrics] adding global time map
Feb 6, 2022
c4d8455
[new-prom-metrics] global timemap enhancements
Feb 7, 2022
9d0cd9a
[new-prom-metrics] removing duplicate metric
Feb 7, 2022
ba41a98
Merge branch 'main' of github.com:hyperledger/firefly into new-prom-m…
Feb 7, 2022
ecdd2e2
Merge branch 'main' of github.com:hyperledger/firefly into new-prom-m…
Feb 8, 2022
95200be
Merge branch 'main' of github.com:hyperledger/firefly into new-prom-m…
Feb 8, 2022
634a5d8
[new-prom-metrics] fixing concurrent metrics issue
Feb 8, 2022
efa3249
Merge branch 'main' of github.com:hyperledger/firefly into new-prom-m…
Feb 8, 2022
97bc641
Merge branch 'main' of github.com:hyperledger/firefly into new-prom-m…
Feb 8, 2022
354f4e8
Merge branch 'main' of github.com:hyperledger/firefly into new-prom-m…
Feb 8, 2022
a704fef
Logging in rewind
peterbroadhurst Feb 8, 2022
c66e526
Merge branch 'batch-logging' of github.com:kaleido-io/firefly into ne…
Feb 8, 2022
3934af8
Add more debug to show dispatch attempts
peterbroadhurst Feb 8, 2022
6436457
Merge branch 'batch-logging' of github.com:kaleido-io/firefly into ne…
Feb 8, 2022
9ce34dd
Fix logging of unmasked context when blocked
peterbroadhurst Feb 8, 2022
7066aa1
Merge branch 'batch-logging' of github.com:kaleido-io/firefly into ne…
Feb 8, 2022
d4e153e
Ensure index 0 hits
peterbroadhurst Feb 8, 2022
a03a716
Merge branch 'batch-logging' of github.com:kaleido-io/firefly into ne…
Feb 8, 2022
f2bb6bb
Extra logging
peterbroadhurst Feb 8, 2022
e48ef44
Merge branch 'batch-logging' of github.com:kaleido-io/firefly into ne…
Feb 8, 2022
9f13716
Avoid incrementing to next message
peterbroadhurst Feb 8, 2022
41def40
Merge branch 'batch-logging' of github.com:kaleido-io/firefly into ne…
Feb 8, 2022
1b9f576
[new-prom-metrics] unit testing for prom metrics
dechdev Feb 8, 2022
366f29e
[new-prom-metrics] merge with main
dechdev Feb 8, 2022
32732ea
[new-prom-metrics] removing batch logs
dechdev Feb 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,17 @@ type Manager interface {
}

type assetManager struct {
ctx context.Context
database database.Plugin
txHelper txcommon.Helper
identity identity.Manager
data data.Manager
syncasync syncasync.Bridge
broadcast broadcast.Manager
messaging privatemessaging.Manager
tokens map[string]tokens.Plugin
retry retry.Retry
ctx context.Context
database database.Plugin
txHelper txcommon.Helper
identity identity.Manager
data data.Manager
syncasync syncasync.Bridge
broadcast broadcast.Manager
messaging privatemessaging.Manager
tokens map[string]tokens.Plugin
retry retry.Retry
metricsEnabled bool
}

func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin) (Manager, error) {
Expand All @@ -91,6 +92,7 @@ func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manage
MaximumDelay: config.GetDuration(config.AssetManagerRetryMaxDelay),
Factor: config.GetFloat64(config.AssetManagerRetryFactor),
},
metricsEnabled: config.GetBool(config.MetricsEnabled),
}
return am, nil
}
Expand Down
3 changes: 3 additions & 0 deletions internal/assets/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/mocks/broadcastmocks"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
Expand All @@ -36,6 +37,8 @@ import (

func newTestAssets(t *testing.T) (*assetManager, func()) {
config.Reset()
metrics.Registry()
config.Set(config.MetricsEnabled, true)
mdi := &databasemocks.Plugin{}
mim := &identitymanagermocks.Manager{}
mdm := &datamocks.Manager{}
Expand Down
35 changes: 25 additions & 10 deletions internal/assets/token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"context"
"fmt"

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/internal/sysmessaging"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/database"
Expand All @@ -42,20 +44,24 @@ func (am *assetManager) GetTokenTransferByID(ctx context.Context, ns, id string)
}

func (am *assetManager) NewTransfer(ns string, transfer *fftypes.TokenTransferInput) sysmessaging.MessageSender {
metrics.Registry()
config.Set(config.MetricsEnabled, true)
sender := &transferSender{
mgr: am,
namespace: ns,
transfer: transfer,
metricsEnabled: config.GetBool(config.MetricsEnabled),
mgr: am,
namespace: ns,
transfer: transfer,
}
sender.setDefaults()
return sender
}

type transferSender struct {
mgr *assetManager
namespace string
transfer *fftypes.TokenTransferInput
resolved bool
mgr *assetManager
namespace string
transfer *fftypes.TokenTransferInput
resolved bool
metricsEnabled bool
}

// sendMethod is the specific operation requested of the transferSender.
Expand Down Expand Up @@ -123,8 +129,11 @@ func (am *assetManager) MintTokens(ctx context.Context, ns string, transfer *fft
if err := am.validateTransfer(ctx, ns, transfer); err != nil {
return nil, err
}

sender := am.NewTransfer(ns, transfer)
if am.metricsEnabled && len(transfer.LocalID.String()) > 0 {
metrics.MintSubmittedCounter.Inc()
metrics.AddTime(transfer.LocalID.String())
}
if waitConfirm {
err = sender.SendAndWait(ctx)
} else {
Expand All @@ -138,8 +147,11 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns string, transfer *fft
if err := am.validateTransfer(ctx, ns, transfer); err != nil {
return nil, err
}

sender := am.NewTransfer(ns, transfer)
if am.metricsEnabled && len(transfer.LocalID.String()) > 0 {
metrics.BurnSubmittedCounter.Inc()
metrics.AddTime(transfer.LocalID.String())
}
if waitConfirm {
err = sender.SendAndWait(ctx)
} else {
Expand All @@ -156,8 +168,11 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns string, transfer
if transfer.From == transfer.To {
return nil, i18n.NewError(ctx, i18n.MsgCannotTransferToSelf)
}

sender := am.NewTransfer(ns, transfer)
if am.metricsEnabled && len(transfer.LocalID.String()) > 0 {
metrics.TransferSubmittedCounter.Inc()
metrics.AddTime(transfer.LocalID.String())
}
if waitConfirm {
err = sender.SendAndWait(ctx)
} else {
Expand Down
2 changes: 2 additions & 0 deletions internal/broadcast/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type broadcastManager struct {
syncasync syncasync.Bridge
batchpin batchpin.Submitter
maxBatchPayloadLength int64
metricsEnabled bool
}

func NewBroadcastManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, bi blockchain.Plugin, dx dataexchange.Plugin, pi publicstorage.Plugin, ba batch.Manager, sa syncasync.Bridge, bp batchpin.Submitter) (Manager, error) {
Expand All @@ -80,6 +81,7 @@ func NewBroadcastManager(ctx context.Context, di database.Plugin, im identity.Ma
syncasync: sa,
batchpin: bp,
maxBatchPayloadLength: config.GetByteSize(config.BroadcastBatchPayloadLimit),
metricsEnabled: config.GetBool(config.MetricsEnabled),
}
bo := batch.Options{
BatchMaxSize: config.GetUint(config.BroadcastBatchSize),
Expand Down
3 changes: 3 additions & 0 deletions internal/broadcast/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/mocks/batchmocks"
"github.com/hyperledger/firefly/mocks/batchpinmocks"
"github.com/hyperledger/firefly/mocks/blockchainmocks"
Expand All @@ -42,6 +43,8 @@ import (

func newTestBroadcast(t *testing.T) (*broadcastManager, func()) {
config.Reset()
metrics.Registry()
config.Set(config.MetricsEnabled, true)
mdi := &databasemocks.Plugin{}
mim := &identitymanagermocks.Manager{}
mdm := &datamocks.Manager{}
Expand Down
9 changes: 8 additions & 1 deletion internal/broadcast/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/internal/sysmessaging"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
Expand All @@ -39,6 +40,12 @@ func (bm *broadcastManager) NewBroadcast(ns string, in *fftypes.MessageInOut) sy

func (bm *broadcastManager) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) {
broadcast := bm.NewBroadcast(ns, in)

if bm.metricsEnabled && len(in.Header.ID.String()) > 0 {
metrics.BroadcastSubmittedCounter.Inc()
metrics.AddTime(in.Header.ID.String())
}

if waitConfirm {
err = broadcast.SendAndWait(ctx)
} else {
Expand Down Expand Up @@ -166,7 +173,7 @@ func (s *broadcastSender) sendInternal(ctx context.Context, method sendMethod) (
if err := s.mgr.database.UpsertMessage(ctx, &s.msg.Message, database.UpsertOptimizationNew); err != nil {
return err
}
log.L(ctx).Infof("Sent broadcast message %s:%s", s.msg.Header.Namespace, s.msg.Header.ID)
log.L(ctx).Infof("Sent broadcast message %s:%s sequence=%d", s.msg.Header.Namespace, s.msg.Header.ID, s.msg.Sequence)

return err
}
Expand Down
28 changes: 28 additions & 0 deletions internal/events/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"context"
"crypto/sha256"
"database/sql/driver"
"time"

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/data"
"github.com/hyperledger/firefly/internal/definitions"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/internal/retry"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
Expand All @@ -44,6 +46,7 @@ type aggregator struct {
offchainBatches chan *fftypes.UUID
queuedRewinds chan *fftypes.UUID
retry *retry.Retry
metricsEnabled bool
}

func newAggregator(ctx context.Context, di database.Plugin, sh definitions.DefinitionHandlers, dm data.Manager, en *eventNotifier) *aggregator {
Expand All @@ -56,6 +59,7 @@ func newAggregator(ctx context.Context, di database.Plugin, sh definitions.Defin
newPins: make(chan int64),
offchainBatches: make(chan *fftypes.UUID, 1), // hops to queuedRewinds with a shouldertab on the event poller
queuedRewinds: make(chan *fftypes.UUID, batchSize),
metricsEnabled: config.GetBool(config.MetricsEnabled),
}
firstEvent := fftypes.SubOptsFirstEvent(config.GetString(config.EventAggregatorFirstEvent))
ag.eventPoller = newEventPoller(ctx, di, en, &eventPollerConf{
Expand Down Expand Up @@ -384,6 +388,30 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M
return nil
})

// Increment confirmer/rejected metrics for broadcast or private message
if ag.metricsEnabled {
timeElapsed := time.Since(metrics.GetTime(msg.Header.ID.String())).Seconds()
metrics.DeleteTime(msg.Header.ID.String())
switch msg.Header.Type {
case fftypes.MessageTypeBroadcast:
metrics.BroadcastHistogram.Observe(timeElapsed)
if eventType == fftypes.EventTypeMessageConfirmed {
metrics.BroadcastConfirmedCounter.Inc()
} else if eventType == fftypes.EventTypeMessageRejected {
metrics.BroadcastRejectedCounter.Inc()
}
case fftypes.MessageTypePrivate:
metrics.PrivateMsgHistogram.Observe(timeElapsed)
if eventType == fftypes.EventTypeMessageConfirmed {
metrics.PrivateMsgConfirmedCounter.Inc()
} else if eventType == fftypes.EventTypeMessageRejected {
metrics.PrivateMsgRejectedCounter.Inc()
}
default:
break
}
}

return true, nil
}

Expand Down
72 changes: 72 additions & 0 deletions internal/events/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/definitions"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
"github.com/hyperledger/firefly/mocks/definitionsmocks"
Expand All @@ -34,6 +35,8 @@ import (
)

func newTestAggregator() (*aggregator, func()) {
metrics.Registry()
config.Set(config.MetricsEnabled, true)
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
msh := &definitionsmocks.DefinitionHandlers{}
Expand Down Expand Up @@ -1306,6 +1309,75 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) {

}

func TestAttemptMessageDispatchSuccessBroadcast(t *testing.T) {
ag, cancel := newTestAggregator()
defer cancel()
bs := newBatchState(ag)

mdi := ag.database.(*databasemocks.Plugin)
mdm := ag.data.(*datamocks.Manager)
mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil)
mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(true, nil)
mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil)

_, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
Type: fftypes.MessageTypeBroadcast,
},
}, bs)
assert.NoError(t, err)
}

func TestAttemptMessageDispatchRejectedBroadcast(t *testing.T) {
ag, cancel := newTestAggregator()
defer cancel()
bs := newBatchState(ag)

mdi := ag.database.(*databasemocks.Plugin)
mdm := ag.data.(*datamocks.Manager)
mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil)
mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(false, nil)
mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil)

_, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
Type: fftypes.MessageTypeBroadcast,
},
Data: fftypes.DataRefs{
{ID: fftypes.NewUUID()},
},
}, bs)
assert.NoError(t, err)
}

func TestAttemptMessageDispatchRejectedPrivate(t *testing.T) {
ag, cancel := newTestAggregator()
defer cancel()
bs := newBatchState(ag)

mdi := ag.database.(*databasemocks.Plugin)
mdm := ag.data.(*datamocks.Manager)
mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil)
mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(false, nil)
mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil)

_, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{
Header: fftypes.MessageHeader{
ID: fftypes.NewUUID(),
Type: fftypes.MessageTypePrivate,
},
Data: fftypes.DataRefs{
{ID: fftypes.NewUUID()},
},
}, bs)
assert.NoError(t, err)
}

func TestAttemptMessageUpdateMessageFail(t *testing.T) {
ag, cancel := newTestAggregator()
defer cancel()
Expand Down
2 changes: 2 additions & 0 deletions internal/events/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type eventManager struct {
opCorrelationRetries int
defaultTransport string
internalEvents *system.Events
metricsEnabled bool
}

func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, dh definitions.DefinitionHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager) (EventManager, error) {
Expand Down Expand Up @@ -124,6 +125,7 @@ func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publ
newEventNotifier: newEventNotifier,
newPinNotifier: newPinNotifier,
aggregator: newAggregator(ctx, di, dh, dm, newPinNotifier),
metricsEnabled: config.GetBool(config.MetricsEnabled),
}
ie, _ := eifactory.GetPlugin(ctx, system.SystemEventsTransport)
em.internalEvents = ie.(*system.Events)
Expand Down
3 changes: 3 additions & 0 deletions internal/events/event_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/events/system"
"github.com/hyperledger/firefly/internal/metrics"
"github.com/hyperledger/firefly/mocks/assetmocks"
"github.com/hyperledger/firefly/mocks/broadcastmocks"
"github.com/hyperledger/firefly/mocks/databasemocks"
Expand All @@ -43,6 +44,8 @@ var testNodeID = fftypes.NewUUID()

func newTestEventManager(t *testing.T) (*eventManager, func()) {
config.Reset()
metrics.Registry()
config.Set(config.MetricsEnabled, true)
ctx, cancel := context.WithCancel(context.Background())
mdi := &databasemocks.Plugin{}
mim := &identitymanagermocks.Manager{}
Expand Down
Loading