diff --git a/internal/assets/manager.go b/internal/assets/manager.go index 9d5e4c4834..854a2490ac 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -60,16 +60,17 @@ type Manager interface { } type assetManager struct { - ctx context.Context - database database.Plugin - txHelper txcommon.Helper - identity identity.Manager - data data.Manager - syncasync syncasync.Bridge - broadcast broadcast.Manager - messaging privatemessaging.Manager - tokens map[string]tokens.Plugin - retry retry.Retry + ctx context.Context + database database.Plugin + txHelper txcommon.Helper + identity identity.Manager + data data.Manager + syncasync syncasync.Bridge + broadcast broadcast.Manager + messaging privatemessaging.Manager + tokens map[string]tokens.Plugin + retry retry.Retry + metricsEnabled bool } func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin) (Manager, error) { @@ -91,6 +92,7 @@ func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manage MaximumDelay: config.GetDuration(config.AssetManagerRetryMaxDelay), Factor: config.GetFloat64(config.AssetManagerRetryFactor), }, + metricsEnabled: config.GetBool(config.MetricsEnabled), } return am, nil } diff --git a/internal/assets/manager_test.go b/internal/assets/manager_test.go index ccd867ea6e..851838ccf2 100644 --- a/internal/assets/manager_test.go +++ b/internal/assets/manager_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/hyperledger/firefly/internal/config" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" @@ -36,6 +37,8 @@ import ( func newTestAssets(t *testing.T) (*assetManager, func()) { config.Reset() + metrics.Registry() + config.Set(config.MetricsEnabled, true) mdi := &databasemocks.Plugin{} mim := &identitymanagermocks.Manager{} mdm := &datamocks.Manager{} diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index f11c4bc354..9f6e66d401 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -20,8 +20,10 @@ import ( "context" "fmt" + "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" @@ -42,20 +44,24 @@ func (am *assetManager) GetTokenTransferByID(ctx context.Context, ns, id string) } func (am *assetManager) NewTransfer(ns string, transfer *fftypes.TokenTransferInput) sysmessaging.MessageSender { + metrics.Registry() + config.Set(config.MetricsEnabled, true) sender := &transferSender{ - mgr: am, - namespace: ns, - transfer: transfer, + metricsEnabled: config.GetBool(config.MetricsEnabled), + mgr: am, + namespace: ns, + transfer: transfer, } sender.setDefaults() return sender } type transferSender struct { - mgr *assetManager - namespace string - transfer *fftypes.TokenTransferInput - resolved bool + mgr *assetManager + namespace string + transfer *fftypes.TokenTransferInput + resolved bool + metricsEnabled bool } // sendMethod is the specific operation requested of the transferSender. @@ -123,8 +129,11 @@ func (am *assetManager) MintTokens(ctx context.Context, ns string, transfer *fft if err := am.validateTransfer(ctx, ns, transfer); err != nil { return nil, err } - sender := am.NewTransfer(ns, transfer) + if am.metricsEnabled && len(transfer.LocalID.String()) > 0 { + metrics.MintSubmittedCounter.Inc() + metrics.AddTime(transfer.LocalID.String()) + } if waitConfirm { err = sender.SendAndWait(ctx) } else { @@ -138,8 +147,11 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns string, transfer *fft if err := am.validateTransfer(ctx, ns, transfer); err != nil { return nil, err } - sender := am.NewTransfer(ns, transfer) + if am.metricsEnabled && len(transfer.LocalID.String()) > 0 { + metrics.BurnSubmittedCounter.Inc() + metrics.AddTime(transfer.LocalID.String()) + } if waitConfirm { err = sender.SendAndWait(ctx) } else { @@ -156,8 +168,11 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns string, transfer if transfer.From == transfer.To { return nil, i18n.NewError(ctx, i18n.MsgCannotTransferToSelf) } - sender := am.NewTransfer(ns, transfer) + if am.metricsEnabled && len(transfer.LocalID.String()) > 0 { + metrics.TransferSubmittedCounter.Inc() + metrics.AddTime(transfer.LocalID.String()) + } if waitConfirm { err = sender.SendAndWait(ctx) } else { diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 725b307e9d..9230b664ca 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -62,6 +62,7 @@ type broadcastManager struct { syncasync syncasync.Bridge batchpin batchpin.Submitter maxBatchPayloadLength int64 + metricsEnabled bool } func NewBroadcastManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, bi blockchain.Plugin, dx dataexchange.Plugin, pi publicstorage.Plugin, ba batch.Manager, sa syncasync.Bridge, bp batchpin.Submitter) (Manager, error) { @@ -80,6 +81,7 @@ func NewBroadcastManager(ctx context.Context, di database.Plugin, im identity.Ma syncasync: sa, batchpin: bp, maxBatchPayloadLength: config.GetByteSize(config.BroadcastBatchPayloadLimit), + metricsEnabled: config.GetBool(config.MetricsEnabled), } bo := batch.Options{ BatchMaxSize: config.GetUint(config.BroadcastBatchSize), diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index ad393f3026..6d1f2d7bb4 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/hyperledger/firefly/internal/config" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/batchmocks" "github.com/hyperledger/firefly/mocks/batchpinmocks" "github.com/hyperledger/firefly/mocks/blockchainmocks" @@ -42,6 +43,8 @@ import ( func newTestBroadcast(t *testing.T) (*broadcastManager, func()) { config.Reset() + metrics.Registry() + config.Set(config.MetricsEnabled, true) mdi := &databasemocks.Plugin{} mim := &identitymanagermocks.Manager{} mdm := &datamocks.Manager{} diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index 81a93eb919..4b6d31f3b1 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -22,6 +22,7 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -39,6 +40,12 @@ func (bm *broadcastManager) NewBroadcast(ns string, in *fftypes.MessageInOut) sy func (bm *broadcastManager) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { broadcast := bm.NewBroadcast(ns, in) + + if bm.metricsEnabled && len(in.Header.ID.String()) > 0 { + metrics.BroadcastSubmittedCounter.Inc() + metrics.AddTime(in.Header.ID.String()) + } + if waitConfirm { err = broadcast.SendAndWait(ctx) } else { @@ -166,7 +173,7 @@ func (s *broadcastSender) sendInternal(ctx context.Context, method sendMethod) ( if err := s.mgr.database.UpsertMessage(ctx, &s.msg.Message, database.UpsertOptimizationNew); err != nil { return err } - log.L(ctx).Infof("Sent broadcast message %s:%s", s.msg.Header.Namespace, s.msg.Header.ID) + log.L(ctx).Infof("Sent broadcast message %s:%s sequence=%d", s.msg.Header.Namespace, s.msg.Header.ID, s.msg.Sequence) return err } diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 0e1c231954..129bfe3fb3 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -20,11 +20,13 @@ import ( "context" "crypto/sha256" "database/sql/driver" + "time" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/definitions" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/retry" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -44,6 +46,7 @@ type aggregator struct { offchainBatches chan *fftypes.UUID queuedRewinds chan *fftypes.UUID retry *retry.Retry + metricsEnabled bool } func newAggregator(ctx context.Context, di database.Plugin, sh definitions.DefinitionHandlers, dm data.Manager, en *eventNotifier) *aggregator { @@ -56,6 +59,7 @@ func newAggregator(ctx context.Context, di database.Plugin, sh definitions.Defin newPins: make(chan int64), offchainBatches: make(chan *fftypes.UUID, 1), // hops to queuedRewinds with a shouldertab on the event poller queuedRewinds: make(chan *fftypes.UUID, batchSize), + metricsEnabled: config.GetBool(config.MetricsEnabled), } firstEvent := fftypes.SubOptsFirstEvent(config.GetString(config.EventAggregatorFirstEvent)) ag.eventPoller = newEventPoller(ctx, di, en, &eventPollerConf{ @@ -384,6 +388,30 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M return nil }) + // Increment confirmer/rejected metrics for broadcast or private message + if ag.metricsEnabled { + timeElapsed := time.Since(metrics.GetTime(msg.Header.ID.String())).Seconds() + metrics.DeleteTime(msg.Header.ID.String()) + switch msg.Header.Type { + case fftypes.MessageTypeBroadcast: + metrics.BroadcastHistogram.Observe(timeElapsed) + if eventType == fftypes.EventTypeMessageConfirmed { + metrics.BroadcastConfirmedCounter.Inc() + } else if eventType == fftypes.EventTypeMessageRejected { + metrics.BroadcastRejectedCounter.Inc() + } + case fftypes.MessageTypePrivate: + metrics.PrivateMsgHistogram.Observe(timeElapsed) + if eventType == fftypes.EventTypeMessageConfirmed { + metrics.PrivateMsgConfirmedCounter.Inc() + } else if eventType == fftypes.EventTypeMessageRejected { + metrics.PrivateMsgRejectedCounter.Inc() + } + default: + break + } + } + return true, nil } diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index fd9c0197b5..f795466841 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/definitions" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/definitionsmocks" @@ -34,6 +35,8 @@ import ( ) func newTestAggregator() (*aggregator, func()) { + metrics.Registry() + config.Set(config.MetricsEnabled, true) mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} msh := &definitionsmocks.DefinitionHandlers{} @@ -1306,6 +1309,75 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { } +func TestAttemptMessageDispatchSuccessBroadcast(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdi := ag.database.(*databasemocks.Plugin) + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(true, nil) + mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) + + _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypeBroadcast, + }, + }, bs) + assert.NoError(t, err) +} + +func TestAttemptMessageDispatchRejectedBroadcast(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdi := ag.database.(*databasemocks.Plugin) + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(false, nil) + mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) + + _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypeBroadcast, + }, + Data: fftypes.DataRefs{ + {ID: fftypes.NewUUID()}, + }, + }, bs) + assert.NoError(t, err) +} + +func TestAttemptMessageDispatchRejectedPrivate(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdi := ag.database.(*databasemocks.Plugin) + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(false, nil) + mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) + + _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypePrivate, + }, + Data: fftypes.DataRefs{ + {ID: fftypes.NewUUID()}, + }, + }, bs) + assert.NoError(t, err) +} + func TestAttemptMessageUpdateMessageFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index ee06536ef3..03eca7a129 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -94,6 +94,7 @@ type eventManager struct { opCorrelationRetries int defaultTransport string internalEvents *system.Events + metricsEnabled bool } func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, dh definitions.DefinitionHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager) (EventManager, error) { @@ -124,6 +125,7 @@ func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publ newEventNotifier: newEventNotifier, newPinNotifier: newPinNotifier, aggregator: newAggregator(ctx, di, dh, dm, newPinNotifier), + metricsEnabled: config.GetBool(config.MetricsEnabled), } ie, _ := eifactory.GetPlugin(ctx, system.SystemEventsTransport) em.internalEvents = ie.(*system.Events) diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index fa6b222ba6..e4dd27191a 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -23,6 +23,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/events/system" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/assetmocks" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" @@ -43,6 +44,8 @@ var testNodeID = fftypes.NewUUID() func newTestEventManager(t *testing.T) (*eventManager, func()) { config.Reset() + metrics.Registry() + config.Set(config.MetricsEnabled, true) ctx, cancel := context.WithCancel(context.Background()) mdi := &databasemocks.Plugin{} mim := &identitymanagermocks.Manager{} diff --git a/internal/events/operation_update.go b/internal/events/operation_update.go index 5d6f18be49..dc594bdf10 100644 --- a/internal/events/operation_update.go +++ b/internal/events/operation_update.go @@ -42,6 +42,16 @@ func (em *eventManager) operationUpdateCtx(ctx context.Context, operationID *fft // Special handling for OpTypeTokenTransfer, which writes an event when it fails if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed { event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID) + // TODO: Figure out way to determine transfer type + // if em.metricsEnabled { + // Mint + // metrics.MintRejectedCounter.Inc() + // Transfer + // metrics.TransferRejectedCounter.Inc() + // Burn + // metrics.BurnRejectedCounter.Inc() + // } + if err := em.database.InsertEvent(ctx, event); err != nil { return err } diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 5fc3666e2f..3700833ec9 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -18,8 +18,10 @@ package events import ( "context" + "time" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -52,6 +54,7 @@ func (em *eventManager) loadTransferOperation(ctx context.Context, tx *fftypes.U } func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *tokens.TokenTransfer) (valid bool, err error) { + countMetric := true // Check that transfer has not already been recorded if existing, err := em.database.GetTokenTransferByProtocolID(ctx, transfer.Connector, transfer.ProtocolID); err != nil { return false, err @@ -87,6 +90,7 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke if existing, err := em.database.GetTokenTransfer(ctx, transfer.LocalID); err != nil { return false, err } else if existing != nil { + countMetric = false transfer.LocalID = fftypes.NewUUID() } } else { @@ -108,6 +112,23 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke return false, err } log.L(ctx).Infof("Token transfer recorded id=%s author=%s", transfer.ProtocolID, transfer.Key) + + if em.metricsEnabled && countMetric { + timeElapsed := time.Since(metrics.GetTime(transfer.LocalID.String())).Seconds() + metrics.DeleteTime(transfer.LocalID.String()) + switch transfer.Type { + case fftypes.TokenTransferTypeMint: + metrics.MintHistogram.Observe(timeElapsed) + metrics.MintConfirmedCounter.Inc() + case fftypes.TokenTransferTypeTransfer: + metrics.TransferHistogram.Observe(timeElapsed) + metrics.TransferConfirmedCounter.Inc() + case fftypes.TokenTransferTypeBurn: + metrics.BurnHistogram.Observe(timeElapsed) + metrics.BurnConfirmedCounter.Inc() + } + } + return true, nil } diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index c768a87d7e..a0477327f9 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -381,6 +381,68 @@ func TestTokensTransferredWithMessageReceived(t *testing.T) { mti.AssertExpectations(t) } +func TestTokensMintWithMessageSend(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + mdi := em.database.(*databasemocks.Plugin) + mti := &tokenmocks.Plugin{} + + uri := "firefly://token/1" + info := fftypes.JSONObject{"some": "info"} + transfer := &tokens.TokenTransfer{ + PoolProtocolID: "F1", + TokenTransfer: fftypes.TokenTransfer{ + Type: fftypes.TokenTransferTypeMint, + TokenIndex: "0", + URI: uri, + Connector: "erc1155", + Key: "0x12345", + From: "0x1", + To: "0x2", + ProtocolID: "123", + Message: fftypes.NewUUID(), + Amount: *fftypes.NewFFBigInt(1), + }, + Event: blockchain.Event{ + BlockchainTXID: "0xffffeeee", + ProtocolID: "0000/0000/0000", + Info: info, + }, + } + pool := &fftypes.TokenPool{ + Namespace: "ns1", + } + message := &fftypes.Message{ + BatchID: fftypes.NewUUID(), + State: fftypes.MessageStateStaged, + } + + mdi.On("GetTokenTransferByProtocolID", em.ctx, "erc1155", "123").Return(nil, nil).Times(2) + mdi.On("GetTokenPoolByProtocolID", em.ctx, "erc1155", "F1").Return(pool, nil).Times(2) + mdi.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *fftypes.BlockchainEvent) bool { + return e.Namespace == pool.Namespace && e.Name == transfer.Event.Name + })).Return(nil).Times(2) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeBlockchainEvent && ev.Namespace == pool.Namespace + })).Return(nil).Times(2) + mdi.On("UpsertTokenTransfer", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2) + mdi.On("UpdateTokenBalances", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2) + mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2) + mdi.On("ReplaceMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateReady + })).Return(fmt.Errorf("pop")) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace + })).Return(nil).Once() + + err := em.TokensTransferred(mti, transfer) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} + func TestTokensTransferredWithMessageSend(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() @@ -442,3 +504,65 @@ func TestTokensTransferredWithMessageSend(t *testing.T) { mdi.AssertExpectations(t) mti.AssertExpectations(t) } + +func TestTokensBurndWithMessageSend(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + mdi := em.database.(*databasemocks.Plugin) + mti := &tokenmocks.Plugin{} + + uri := "firefly://token/1" + info := fftypes.JSONObject{"some": "info"} + transfer := &tokens.TokenTransfer{ + PoolProtocolID: "F1", + TokenTransfer: fftypes.TokenTransfer{ + Type: fftypes.TokenTransferTypeBurn, + TokenIndex: "0", + URI: uri, + Connector: "erc1155", + Key: "0x12345", + From: "0x1", + To: "0x2", + ProtocolID: "123", + Message: fftypes.NewUUID(), + Amount: *fftypes.NewFFBigInt(1), + }, + Event: blockchain.Event{ + BlockchainTXID: "0xffffeeee", + ProtocolID: "0000/0000/0000", + Info: info, + }, + } + pool := &fftypes.TokenPool{ + Namespace: "ns1", + } + message := &fftypes.Message{ + BatchID: fftypes.NewUUID(), + State: fftypes.MessageStateStaged, + } + + mdi.On("GetTokenTransferByProtocolID", em.ctx, "erc1155", "123").Return(nil, nil).Times(2) + mdi.On("GetTokenPoolByProtocolID", em.ctx, "erc1155", "F1").Return(pool, nil).Times(2) + mdi.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *fftypes.BlockchainEvent) bool { + return e.Namespace == pool.Namespace && e.Name == transfer.Event.Name + })).Return(nil).Times(2) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeBlockchainEvent && ev.Namespace == pool.Namespace + })).Return(nil).Times(2) + mdi.On("UpsertTokenTransfer", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2) + mdi.On("UpdateTokenBalances", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2) + mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2) + mdi.On("ReplaceMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateReady + })).Return(fmt.Errorf("pop")) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace + })).Return(nil).Once() + + err := em.TokensTransferred(mti, transfer) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} diff --git a/internal/metrics/broadcast.go b/internal/metrics/broadcast.go new file mode 100644 index 0000000000..545e398a49 --- /dev/null +++ b/internal/metrics/broadcast.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var BroadcastSubmittedCounter prometheus.Counter +var BroadcastConfirmedCounter prometheus.Counter +var BroadcastRejectedCounter prometheus.Counter +var BroadcastHistogram prometheus.Histogram + +// BroadcastSubmittedCounterName is the prometheus metric for tracking the total number of broadcasts submitted +var BroadcastSubmittedCounterName = "ff_broadcast_submitted_total" + +// BroadcastConfirmedCounterName is the prometheus metric for tracking the total number of broadcasts confirmed +var BroadcastConfirmedCounterName = "ff_broadcast_confirmed_total" + +// BroadcastRejectedCounterName is the prometheus metric for tracking the total number of broadcasts rejected +var BroadcastRejectedCounterName = "ff_broadcast_rejected_total" + +// BroadcastHistogramName is the prometheus metric for tracking the total number of broadcast messages - histogram +var BroadcastHistogramName = "ff_broadcast_histogram" + +func InitBroadcastMetrics() { + BroadcastSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BroadcastSubmittedCounterName, + Help: "Number of submitted broadcasts", + }) + BroadcastConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BroadcastConfirmedCounterName, + Help: "Number of confirmed broadcasts", + }) + BroadcastRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BroadcastRejectedCounterName, + Help: "Number of rejected broadcasts", + }) + BroadcastHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: BroadcastHistogramName, + Help: "Histogram of broadcasts, bucketed by time to finished", + }) +} + +func RegisterBroadcastMetrics() { + registry.MustRegister(BroadcastSubmittedCounter) + registry.MustRegister(BroadcastConfirmedCounter) + registry.MustRegister(BroadcastRejectedCounter) + registry.MustRegister(BroadcastHistogram) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4a56f11b0b..711239e38d 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -17,6 +17,9 @@ package metrics import ( + "sync" + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" muxprom "gitlab.com/hfuss/mux-prometheus/pkg/middleware" @@ -26,6 +29,8 @@ var registry *prometheus.Registry var adminInstrumentation *muxprom.Instrumentation var restInstrumentation *muxprom.Instrumentation var BatchPinCounter prometheus.Counter +var timeMap map[string]time.Time +var mutex = &sync.Mutex{} // MetricsBatchPin is the prometheus metric for total number of batch pins submitted var MetricsBatchPin = "ff_batchpin_total" @@ -71,16 +76,28 @@ func newInstrumentation(subsystem string) *muxprom.Instrumentation { } func initMetricsCollectors() { + InitBroadcastMetrics() + InitPrivateMsgMetrics() + InitTokenMintMetrics() + InitTokenTransferMetrics() + InitTokenBurnMetrics() BatchPinCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: MetricsBatchPin, Help: "Number of batch pins submitted", }) + timeMap = make(map[string]time.Time) } func registerMetricsCollectors() { registry.MustRegister(collectors.NewGoCollector()) registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) registry.MustRegister(BatchPinCounter) + + RegisterBroadcastMetrics() + RegisterPrivateMsgMetrics() + RegisterTokenMintMetrics() + RegisterTokenTransferMetrics() + RegisterTokenBurnMetrics() } // Clear will reset the Prometheus metrics registry and instrumentations, useful for testing @@ -88,4 +105,24 @@ func Clear() { registry = nil adminInstrumentation = nil restInstrumentation = nil + timeMap = make(map[string]time.Time) +} + +func AddTime(id string) { + mutex.Lock() + timeMap[id] = time.Now() + mutex.Unlock() +} + +func GetTime(id string) time.Time { + mutex.Lock() + time := timeMap[id] + mutex.Unlock() + return time +} + +func DeleteTime(id string) { + mutex.Lock() + delete(timeMap, id) + mutex.Unlock() } diff --git a/internal/metrics/private_msg.go b/internal/metrics/private_msg.go new file mode 100644 index 0000000000..1f3c7cea96 --- /dev/null +++ b/internal/metrics/private_msg.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var PrivateMsgSubmittedCounter prometheus.Counter +var PrivateMsgConfirmedCounter prometheus.Counter +var PrivateMsgRejectedCounter prometheus.Counter +var PrivateMsgHistogram prometheus.Histogram + +// PrivateMsgSubmittedCounterName is the prometheus metric for tracking the total number of private messages submitted +var PrivateMsgSubmittedCounterName = "ff_private_msg_submitted_total" + +// PrivateMsgConfirmedCounterName is the prometheus metric for tracking the total number of private messages confirmed +var PrivateMsgConfirmedCounterName = "ff_private_msg_confirmed_total" + +// PrivateMsgRejectedCounterName is the prometheus metric for tracking the total number of private messages rejected +var PrivateMsgRejectedCounterName = "ff_private_msg_rejected_total" + +// PrivateMsgHistogramName is the prometheus metric for tracking the total number of private messages - histogram +var PrivateMsgHistogramName = "ff_private_msg_histogram" + +func InitPrivateMsgMetrics() { + PrivateMsgSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: PrivateMsgSubmittedCounterName, + Help: "Number of submitted private messages", + }) + PrivateMsgConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: PrivateMsgConfirmedCounterName, + Help: "Number of confirmed private messages", + }) + PrivateMsgRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: PrivateMsgRejectedCounterName, + Help: "Number of rejected private messages", + }) + PrivateMsgHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: PrivateMsgHistogramName, + Help: "Histogram of private messages, bucketed by time to finished", + }) +} + +func RegisterPrivateMsgMetrics() { + registry.MustRegister(PrivateMsgSubmittedCounter) + registry.MustRegister(PrivateMsgConfirmedCounter) + registry.MustRegister(PrivateMsgRejectedCounter) + registry.MustRegister(PrivateMsgHistogram) +} diff --git a/internal/metrics/token_burn.go b/internal/metrics/token_burn.go new file mode 100644 index 0000000000..9b0eefea7f --- /dev/null +++ b/internal/metrics/token_burn.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var BurnSubmittedCounter prometheus.Counter +var BurnConfirmedCounter prometheus.Counter +var BurnRejectedCounter prometheus.Counter +var BurnHistogram prometheus.Histogram + +// BurnSubmittedCounterName is the prometheus metric for tracking the total number of burns submitted +var BurnSubmittedCounterName = "ff_burn_submitted_total" + +// BurnConfirmedCounterName is the prometheus metric for tracking the total number of burns confirmed +var BurnConfirmedCounterName = "ff_burn_confirmed_total" + +// BurnRejectedCounterName is the prometheus metric for tracking the total number of burns rejected +var BurnRejectedCounterName = "ff_burn_rejected_total" + +// BurnHistogramName is the prometheus metric for tracking the total number of burns - histogram +var BurnHistogramName = "ff_burn_histogram" + +func InitTokenBurnMetrics() { + BurnSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BurnSubmittedCounterName, + Help: "Number of submitted burns", + }) + BurnConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BurnConfirmedCounterName, + Help: "Number of confirmed burns", + }) + BurnRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BurnRejectedCounterName, + Help: "Number of rejected burns", + }) + BurnHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: BurnHistogramName, + Help: "Histogram of burns, bucketed by time to finished", + }) +} + +func RegisterTokenBurnMetrics() { + registry.MustRegister(BurnSubmittedCounter) + registry.MustRegister(BurnConfirmedCounter) + registry.MustRegister(BurnRejectedCounter) + registry.MustRegister(BurnHistogram) +} diff --git a/internal/metrics/token_mint.go b/internal/metrics/token_mint.go new file mode 100644 index 0000000000..36aa9eb4dd --- /dev/null +++ b/internal/metrics/token_mint.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var MintSubmittedCounter prometheus.Counter +var MintConfirmedCounter prometheus.Counter +var MintRejectedCounter prometheus.Counter +var MintHistogram prometheus.Histogram + +// MintSubmittedCounterName is the prometheus metric for tracking the total number of mints submitted +var MintSubmittedCounterName = "ff_mint_submitted_total" + +// MintConfirmedCounterName is the prometheus metric for tracking the total number of mints confirmed +var MintConfirmedCounterName = "ff_mint_confirmed_total" + +// MintRejectedCounterName is the prometheus metric for tracking the total number of mints rejected +var MintRejectedCounterName = "ff_mint_rejected_total" + +// MintHistogramName is the prometheus metric for tracking the total number of mints - histogram +var MintHistogramName = "ff_mint_histogram" + +func InitTokenMintMetrics() { + MintSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: MintSubmittedCounterName, + Help: "Number of submitted mints", + }) + MintConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: MintConfirmedCounterName, + Help: "Number of confirmed mints", + }) + MintRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: MintRejectedCounterName, + Help: "Number of rejected mints", + }) + MintHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: MintHistogramName, + Help: "Histogram of mints, bucketed by time to finished", + }) +} + +func RegisterTokenMintMetrics() { + registry.MustRegister(MintSubmittedCounter) + registry.MustRegister(MintConfirmedCounter) + registry.MustRegister(MintRejectedCounter) + registry.MustRegister(MintHistogram) +} diff --git a/internal/metrics/token_transfer.go b/internal/metrics/token_transfer.go new file mode 100644 index 0000000000..0aaf2571ac --- /dev/null +++ b/internal/metrics/token_transfer.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var TransferSubmittedCounter prometheus.Counter +var TransferConfirmedCounter prometheus.Counter +var TransferRejectedCounter prometheus.Counter +var TransferHistogram prometheus.Histogram + +// TransferSubmittedCounterName is the prometheus metric for tracking the total number of transfers submitted +var TransferSubmittedCounterName = "ff_transfer_submitted_total" + +// TransferConfirmedCounterName is the prometheus metric for tracking the total number of transfers confirmed +var TransferConfirmedCounterName = "ff_transfer_confirmed_total" + +// TransferRejectedCounterName is the prometheus metric for tracking the total number of transfers rejected +var TransferRejectedCounterName = "ff_transfer_rejected_total" + +// TransferHistogramName is the prometheus metric for tracking the total number of transfers - histogram +var TransferHistogramName = "ff_transfer_histogram" + +func InitTokenTransferMetrics() { + TransferSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: TransferSubmittedCounterName, + Help: "Number of submitted transfers", + }) + TransferConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: TransferConfirmedCounterName, + Help: "Number of confirmed transfers", + }) + TransferRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: TransferRejectedCounterName, + Help: "Number of rejected transfers", + }) + TransferHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: TransferHistogramName, + Help: "Histogram of transfers, bucketed by time to finished", + }) +} + +func RegisterTokenTransferMetrics() { + registry.MustRegister(TransferSubmittedCounter) + registry.MustRegister(TransferConfirmedCounter) + registry.MustRegister(TransferRejectedCounter) + registry.MustRegister(TransferHistogram) +} diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index 3376b1265d..88a0d0e13e 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -21,6 +21,7 @@ import ( "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -38,6 +39,11 @@ func (pm *privateMessaging) NewMessage(ns string, in *fftypes.MessageInOut) sysm func (pm *privateMessaging) SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { message := pm.NewMessage(ns, in) + + if pm.metricsEnabled && len(in.Header.ID.String()) > 0 { + metrics.PrivateMsgSubmittedCounter.Inc() + metrics.AddTime(in.Header.ID.String()) + } if waitConfirm { err = message.SendAndWait(ctx) } else { @@ -185,7 +191,7 @@ func (s *messageSender) sendInternal(ctx context.Context, method sendMethod) err if err := s.mgr.database.UpsertMessage(ctx, &s.msg.Message, database.UpsertOptimizationNew); err != nil { return err } - log.L(ctx).Infof("Sent private message %s:%s", s.msg.Header.Namespace, s.msg.Header.ID) + log.L(ctx).Infof("Sent private message %s:%s sequence=%d", s.msg.Header.Namespace, s.msg.Header.ID, s.msg.Sequence) if method == methodSendImmediate { if err := s.sendUnpinned(ctx); err != nil { diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index b93b666749..90d0bee2a2 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -63,6 +63,7 @@ type privateMessaging struct { localNodeID *fftypes.UUID // lookup and cached on first use, as might not be registered at startup opCorrelationRetries int maxBatchPayloadLength int64 + metricsEnabled bool } func NewPrivateMessaging(ctx context.Context, di database.Plugin, im identity.Manager, dx dataexchange.Plugin, bi blockchain.Plugin, ba batch.Manager, dm data.Manager, sa syncasync.Bridge, bp batchpin.Submitter) (Manager, error) { @@ -93,6 +94,7 @@ func NewPrivateMessaging(ctx context.Context, di database.Plugin, im identity.Ma }, opCorrelationRetries: config.GetInt(config.PrivateMessagingOpCorrelationRetries), maxBatchPayloadLength: config.GetByteSize(config.PrivateMessagingBatchPayloadLimit), + metricsEnabled: config.GetBool(config.MetricsEnabled), } pm.groupManager.groupCache = ccache.New( // We use a LRU cache with a size-aware max diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 776794fc57..da2be4942a 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/hyperledger/firefly/internal/config" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/batchmocks" "github.com/hyperledger/firefly/mocks/batchpinmocks" "github.com/hyperledger/firefly/mocks/blockchainmocks" @@ -36,10 +37,12 @@ import ( ) func newTestPrivateMessaging(t *testing.T) (*privateMessaging, func()) { + metrics.Registry() config.Reset() config.Set(config.NodeName, "node1") config.Set(config.GroupCacheTTL, "1m") config.Set(config.GroupCacheSize, "1m") + config.Set(config.MetricsEnabled, true) mdi := &databasemocks.Plugin{} mim := &identitymanagermocks.Manager{}