From 54d38ca17772f5ab7ac397419411ab82a03a5fcf Mon Sep 17 00:00:00 2001 From: David Echelberger Date: Thu, 3 Feb 2022 16:13:06 -0500 Subject: [PATCH 01/14] [new-prom-metrics] prometheus metrics to track broadcasts, private msgs, and token mint/burn/transfers Signed-off-by: David Echelberger --- internal/assets/manager.go | 30 ++++++--- internal/assets/manager_test.go | 10 +++ internal/assets/token_transfer.go | 31 +++++++-- internal/broadcast/manager.go | 9 +++ internal/broadcast/manager_test.go | 10 +++ internal/broadcast/message.go | 6 ++ internal/events/aggregator.go | 34 +++++++++- internal/events/aggregator_test.go | 6 +- internal/events/event_manager.go | 4 +- internal/events/operation_update.go | 12 ++++ internal/events/tokens_transferred.go | 21 ++++++ internal/metrics/broadcast.go | 64 +++++++++++++++++++ internal/metrics/metrics.go | 7 ++ internal/metrics/private_msg.go | 64 +++++++++++++++++++ internal/metrics/token_burn.go | 64 +++++++++++++++++++ internal/metrics/token_mint.go | 64 +++++++++++++++++++ internal/metrics/token_transfer.go | 64 +++++++++++++++++++ internal/privatemessaging/message.go | 6 ++ internal/privatemessaging/privatemessaging.go | 9 +++ .../privatemessaging/privatemessaging_test.go | 10 +++ mocks/assetmocks/manager.go | 16 +++++ mocks/broadcastmocks/manager.go | 16 +++++ mocks/privatemessagingmocks/manager.go | 16 +++++ 23 files changed, 553 insertions(+), 20 deletions(-) create mode 100644 internal/metrics/broadcast.go create mode 100644 internal/metrics/private_msg.go create mode 100644 internal/metrics/token_burn.go create mode 100644 internal/metrics/token_mint.go create mode 100644 internal/metrics/token_transfer.go diff --git a/internal/assets/manager.go b/internal/assets/manager.go index 9d5e4c4834..cfa3686d8d 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -18,6 +18,7 @@ package assets import ( "context" + "time" "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/config" @@ -55,21 +56,25 @@ type Manager interface { GetTokenConnectors(ctx context.Context, ns string) ([]*fftypes.TokenConnector, error) + GetStartTime() time.Time + Start() error WaitStop() } type assetManager struct { - ctx context.Context - database database.Plugin - txHelper txcommon.Helper - identity identity.Manager - data data.Manager - syncasync syncasync.Bridge - broadcast broadcast.Manager - messaging privatemessaging.Manager - tokens map[string]tokens.Plugin - retry retry.Retry + ctx context.Context + database database.Plugin + txHelper txcommon.Helper + identity identity.Manager + data data.Manager + syncasync syncasync.Bridge + broadcast broadcast.Manager + messaging privatemessaging.Manager + tokens map[string]tokens.Plugin + retry retry.Retry + metricsEnabled bool + startTime time.Time } func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin) (Manager, error) { @@ -91,6 +96,7 @@ func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manage MaximumDelay: config.GetDuration(config.AssetManagerRetryMaxDelay), Factor: config.GetFloat64(config.AssetManagerRetryFactor), }, + metricsEnabled: config.GetBool(config.MetricsEnabled), } return am, nil } @@ -138,6 +144,10 @@ func (am *assetManager) GetTokenConnectors(ctx context.Context, ns string) ([]*f return connectors, nil } +func (am *assetManager) GetStartTime() time.Time { + return am.startTime +} + func (am *assetManager) Start() error { return nil } diff --git a/internal/assets/manager_test.go b/internal/assets/manager_test.go index ccd867ea6e..c9a9dfdbb9 100644 --- a/internal/assets/manager_test.go +++ b/internal/assets/manager_test.go @@ -17,6 +17,7 @@ package assets import ( "context" "testing" + "time" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/mocks/broadcastmocks" @@ -56,6 +57,15 @@ func newTestAssets(t *testing.T) (*assetManager, func()) { return am, cancel } +func TestGetStartTime(t *testing.T) { + am, cancel := newTestAssets(t) + now := time.Now() + am.startTime = now + defer cancel() + + assert.Equal(t, am.GetStartTime(), now) +} + func TestInitFail(t *testing.T) { _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index e67ea32b0e..c92094527e 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -19,9 +19,12 @@ package assets import ( "context" "fmt" + "time" + "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" @@ -43,19 +46,21 @@ func (am *assetManager) GetTokenTransferByID(ctx context.Context, ns, id string) func (am *assetManager) NewTransfer(ns string, transfer *fftypes.TokenTransferInput) sysmessaging.MessageSender { sender := &transferSender{ - mgr: am, - namespace: ns, - transfer: transfer, + metricsEnabled: config.GetBool(config.MetricsEnabled), + mgr: am, + namespace: ns, + transfer: transfer, } sender.setDefaults() return sender } type transferSender struct { - mgr *assetManager - namespace string - transfer *fftypes.TokenTransferInput - resolved bool + mgr *assetManager + namespace string + transfer *fftypes.TokenTransferInput + resolved bool + metricsEnabled bool } // sendMethod is the specific operation requested of the transferSender. @@ -124,6 +129,10 @@ func (am *assetManager) MintTokens(ctx context.Context, ns string, transfer *fft return nil, err } + if am.metricsEnabled { + metrics.MintSubmittedCounter.Inc() + am.startTime = time.Now() + } sender := am.NewTransfer(ns, transfer) if waitConfirm { err = sender.SendAndWait(ctx) @@ -139,6 +148,10 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns string, transfer *fft return nil, err } + if am.metricsEnabled { + metrics.BurnSubmittedCounter.Inc() + am.startTime = time.Now() + } sender := am.NewTransfer(ns, transfer) if waitConfirm { err = sender.SendAndWait(ctx) @@ -157,6 +170,10 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns string, transfer return nil, i18n.NewError(ctx, i18n.MsgCannotTransferToSelf) } + if am.metricsEnabled { + metrics.TransferSubmittedCounter.Inc() + am.startTime = time.Now() + } sender := am.NewTransfer(ns, transfer) if waitConfirm { err = sender.SendAndWait(ctx) diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 292d331159..00f0a35b15 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/json" + "time" "github.com/hyperledger/firefly/internal/batch" "github.com/hyperledger/firefly/internal/batchpin" @@ -48,6 +49,7 @@ type Manager interface { BroadcastTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPoolAnnouncement, waitConfirm bool) (msg *fftypes.Message, err error) Start() error WaitStop() + GetStartTime() time.Time } type broadcastManager struct { @@ -62,6 +64,12 @@ type broadcastManager struct { syncasync syncasync.Bridge batchpin batchpin.Submitter maxBatchPayloadLength int64 + metricsEnabled bool + startTime time.Time +} + +func (bm *broadcastManager) GetStartTime() time.Time { + return bm.startTime } func NewBroadcastManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, bi blockchain.Plugin, dx dataexchange.Plugin, pi publicstorage.Plugin, ba batch.Manager, sa syncasync.Bridge, bp batchpin.Submitter) (Manager, error) { @@ -80,6 +88,7 @@ func NewBroadcastManager(ctx context.Context, di database.Plugin, im identity.Ma syncasync: sa, batchpin: bp, maxBatchPayloadLength: config.GetByteSize(config.BroadcastBatchPayloadLimit), + metricsEnabled: config.GetBool(config.MetricsEnabled), } bo := batch.Options{ BatchMaxSize: config.GetUint(config.BroadcastBatchSize), diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index ad393f3026..ebf3b3160f 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -23,6 +23,7 @@ import ( "io" "io/ioutil" "testing" + "time" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/mocks/batchmocks" @@ -72,6 +73,15 @@ func newTestBroadcast(t *testing.T) (*broadcastManager, func()) { return b.(*broadcastManager), cancel } +func TestGetStartTime(t *testing.T) { + am, cancel := newTestBroadcast(t) + now := time.Now() + am.startTime = now + defer cancel() + + assert.Equal(t, am.GetStartTime(), now) +} + func TestInitFail(t *testing.T) { _, err := NewBroadcastManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index 23a14a46ee..1351d55eea 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -19,8 +19,10 @@ package broadcast import ( "context" "encoding/json" + "time" "github.com/hyperledger/firefly/internal/i18n" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -37,6 +39,10 @@ func (bm *broadcastManager) NewBroadcast(ns string, in *fftypes.MessageInOut) sy } func (bm *broadcastManager) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { + if bm.metricsEnabled { + metrics.BroadcastSubmittedCounter.Inc() + bm.startTime = time.Now() + } broadcast := bm.NewBroadcast(ns, in) if waitConfirm { err = broadcast.SendAndWait(ctx) diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index a12617b82f..cc48eaf8aa 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -21,11 +21,15 @@ import ( "crypto/sha256" "database/sql/driver" "encoding/binary" + "time" + "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/definitions" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/metrics" + "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/retry" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -36,6 +40,7 @@ const ( ) type aggregator struct { + bm broadcast.Manager ctx context.Context database database.Plugin definitions definitions.DefinitionHandlers @@ -43,20 +48,25 @@ type aggregator struct { eventPoller *eventPoller newPins chan int64 offchainBatches chan *fftypes.UUID + pm privatemessaging.Manager queuedRewinds chan *fftypes.UUID retry *retry.Retry + metricsEnabled bool } -func newAggregator(ctx context.Context, di database.Plugin, sh definitions.DefinitionHandlers, dm data.Manager, en *eventNotifier) *aggregator { +func newAggregator(ctx context.Context, di database.Plugin, sh definitions.DefinitionHandlers, dm data.Manager, en *eventNotifier, bm broadcast.Manager, pm privatemessaging.Manager) *aggregator { batchSize := config.GetInt(config.EventAggregatorBatchSize) ag := &aggregator{ + bm: bm, ctx: log.WithLogField(ctx, "role", "aggregator"), database: di, definitions: sh, data: dm, newPins: make(chan int64), offchainBatches: make(chan *fftypes.UUID, 1), // hops to queuedRewinds with a shouldertab on the event poller + pm: pm, queuedRewinds: make(chan *fftypes.UUID, batchSize), + metricsEnabled: config.GetBool(config.MetricsEnabled), } firstEvent := fftypes.SubOptsFirstEvent(config.GetString(config.EventAggregatorFirstEvent)) ag.eventPoller = newEventPoller(ctx, di, en, &eventPollerConf{ @@ -560,6 +570,28 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M return nil }) + // Metrics for broadcast/private message + if ag.metricsEnabled { + switch msg.Header.Type { + case fftypes.MessageTypeBroadcast: + metrics.BroadcastHistogram.Observe(time.Since(ag.bm.GetStartTime()).Seconds()) + if eventType == fftypes.EventTypeMessageConfirmed { + metrics.BroadcastConfirmedCounter.Inc() + } else if eventType == fftypes.EventTypeMessageRejected { + metrics.BroadcastRejectedCounter.Inc() + } + case fftypes.MessageTypePrivate: + metrics.PrivateMsgHistogram.Observe(time.Since(ag.pm.GetStartTime()).Seconds()) + if eventType == fftypes.EventTypeMessageConfirmed { + metrics.PrivateMsgConfirmedCounter.Inc() + } else if eventType == fftypes.EventTypeMessageRejected { + metrics.PrivateMsgRejectedCounter.Inc() + } + default: + break + } + } + return true, nil } diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 461ae1e24d..26465a4b0f 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -24,9 +24,11 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/definitions" + "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/definitionsmocks" + "github.com/hyperledger/firefly/mocks/privatemessagingmocks" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" @@ -37,8 +39,10 @@ func newTestAggregator() (*aggregator, func()) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} msh := &definitionsmocks.DefinitionHandlers{} + mbm := &broadcastmocks.Manager{} + mpm := &privatemessagingmocks.Manager{} ctx, cancel := context.WithCancel(context.Background()) - ag := newAggregator(ctx, mdi, msh, mdm, newEventNotifier(ctx, "ut")) + ag := newAggregator(ctx, mdi, msh, mdm, newEventNotifier(ctx, "ut"), mbm, mpm) return ag, cancel } diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index ee06536ef3..1387f117de 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -94,6 +94,7 @@ type eventManager struct { opCorrelationRetries int defaultTransport string internalEvents *system.Events + metricsEnabled bool } func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publicstorage.Plugin, di database.Plugin, im identity.Manager, dh definitions.DefinitionHandlers, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager) (EventManager, error) { @@ -123,7 +124,8 @@ func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publ opCorrelationRetries: config.GetInt(config.EventAggregatorOpCorrelationRetries), newEventNotifier: newEventNotifier, newPinNotifier: newPinNotifier, - aggregator: newAggregator(ctx, di, dh, dm, newPinNotifier), + aggregator: newAggregator(ctx, di, dh, dm, newPinNotifier, bm, pm), + metricsEnabled: config.GetBool(config.MetricsEnabled), } ie, _ := eifactory.GetPlugin(ctx, system.SystemEventsTransport) em.internalEvents = ie.(*system.Events) diff --git a/internal/events/operation_update.go b/internal/events/operation_update.go index 5d6f18be49..f75db7d2e4 100644 --- a/internal/events/operation_update.go +++ b/internal/events/operation_update.go @@ -20,6 +20,7 @@ import ( "context" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) @@ -42,6 +43,17 @@ func (em *eventManager) operationUpdateCtx(ctx context.Context, operationID *fft // Special handling for OpTypeTokenTransfer, which writes an event when it fails if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed { event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID) + + if em.metricsEnabled { + // Mint + metrics.MintRejectedCounter.Inc() + // TODO: Figure out way to determine transfer type + // Transfer + // metrics.TransferRejectedCounter.Inc() + // Burn + // metrics.BurnRejectedCounter.Inc() + } + if err := em.database.InsertEvent(ctx, event); err != nil { return err } diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 5fc3666e2f..fef7b2eab6 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -18,8 +18,10 @@ package events import ( "context" + "time" "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -52,6 +54,7 @@ func (em *eventManager) loadTransferOperation(ctx context.Context, tx *fftypes.U } func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *tokens.TokenTransfer) (valid bool, err error) { + countMetric := true // Check that transfer has not already been recorded if existing, err := em.database.GetTokenTransferByProtocolID(ctx, transfer.Connector, transfer.ProtocolID); err != nil { return false, err @@ -87,6 +90,7 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke if existing, err := em.database.GetTokenTransfer(ctx, transfer.LocalID); err != nil { return false, err } else if existing != nil { + countMetric = false transfer.LocalID = fftypes.NewUUID() } } else { @@ -108,6 +112,23 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke return false, err } log.L(ctx).Infof("Token transfer recorded id=%s author=%s", transfer.ProtocolID, transfer.Key) + + if em.metricsEnabled && countMetric { + switch transfer.Type { + case fftypes.TokenTransferTypeMint: + metrics.MintHistogram.Observe(time.Since(em.assets.GetStartTime()).Seconds()) + metrics.MintConfirmedCounter.Inc() + case fftypes.TokenTransferTypeTransfer: + metrics.TransferHistogram.Observe(time.Since(em.assets.GetStartTime()).Seconds()) + metrics.TransferConfirmedCounter.Inc() + case fftypes.TokenTransferTypeBurn: + metrics.BurnHistogram.Observe(time.Since(em.assets.GetStartTime()).Seconds()) + metrics.BurnConfirmedCounter.Inc() + default: + break + } + } + return true, nil } diff --git a/internal/metrics/broadcast.go b/internal/metrics/broadcast.go new file mode 100644 index 0000000000..545e398a49 --- /dev/null +++ b/internal/metrics/broadcast.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var BroadcastSubmittedCounter prometheus.Counter +var BroadcastConfirmedCounter prometheus.Counter +var BroadcastRejectedCounter prometheus.Counter +var BroadcastHistogram prometheus.Histogram + +// BroadcastSubmittedCounterName is the prometheus metric for tracking the total number of broadcasts submitted +var BroadcastSubmittedCounterName = "ff_broadcast_submitted_total" + +// BroadcastConfirmedCounterName is the prometheus metric for tracking the total number of broadcasts confirmed +var BroadcastConfirmedCounterName = "ff_broadcast_confirmed_total" + +// BroadcastRejectedCounterName is the prometheus metric for tracking the total number of broadcasts rejected +var BroadcastRejectedCounterName = "ff_broadcast_rejected_total" + +// BroadcastHistogramName is the prometheus metric for tracking the total number of broadcast messages - histogram +var BroadcastHistogramName = "ff_broadcast_histogram" + +func InitBroadcastMetrics() { + BroadcastSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BroadcastSubmittedCounterName, + Help: "Number of submitted broadcasts", + }) + BroadcastConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BroadcastConfirmedCounterName, + Help: "Number of confirmed broadcasts", + }) + BroadcastRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BroadcastRejectedCounterName, + Help: "Number of rejected broadcasts", + }) + BroadcastHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: BroadcastHistogramName, + Help: "Histogram of broadcasts, bucketed by time to finished", + }) +} + +func RegisterBroadcastMetrics() { + registry.MustRegister(BroadcastSubmittedCounter) + registry.MustRegister(BroadcastConfirmedCounter) + registry.MustRegister(BroadcastRejectedCounter) + registry.MustRegister(BroadcastHistogram) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4a56f11b0b..95cfcab379 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -71,6 +71,9 @@ func newInstrumentation(subsystem string) *muxprom.Instrumentation { } func initMetricsCollectors() { + InitBroadcastMetrics() + InitPrivateMsgMetrics() + InitTokenMintMetrics() BatchPinCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: MetricsBatchPin, Help: "Number of batch pins submitted", @@ -81,6 +84,10 @@ func registerMetricsCollectors() { registry.MustRegister(collectors.NewGoCollector()) registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) registry.MustRegister(BatchPinCounter) + + RegisterBroadcastMetrics() + RegisterPrivateMsgMetrics() + RegisterTokenMintMetrics() } // Clear will reset the Prometheus metrics registry and instrumentations, useful for testing diff --git a/internal/metrics/private_msg.go b/internal/metrics/private_msg.go new file mode 100644 index 0000000000..1f3c7cea96 --- /dev/null +++ b/internal/metrics/private_msg.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var PrivateMsgSubmittedCounter prometheus.Counter +var PrivateMsgConfirmedCounter prometheus.Counter +var PrivateMsgRejectedCounter prometheus.Counter +var PrivateMsgHistogram prometheus.Histogram + +// PrivateMsgSubmittedCounterName is the prometheus metric for tracking the total number of private messages submitted +var PrivateMsgSubmittedCounterName = "ff_private_msg_submitted_total" + +// PrivateMsgConfirmedCounterName is the prometheus metric for tracking the total number of private messages confirmed +var PrivateMsgConfirmedCounterName = "ff_private_msg_confirmed_total" + +// PrivateMsgRejectedCounterName is the prometheus metric for tracking the total number of private messages rejected +var PrivateMsgRejectedCounterName = "ff_private_msg_rejected_total" + +// PrivateMsgHistogramName is the prometheus metric for tracking the total number of private messages - histogram +var PrivateMsgHistogramName = "ff_private_msg_histogram" + +func InitPrivateMsgMetrics() { + PrivateMsgSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: PrivateMsgSubmittedCounterName, + Help: "Number of submitted private messages", + }) + PrivateMsgConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: PrivateMsgConfirmedCounterName, + Help: "Number of confirmed private messages", + }) + PrivateMsgRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: PrivateMsgRejectedCounterName, + Help: "Number of rejected private messages", + }) + PrivateMsgHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: PrivateMsgHistogramName, + Help: "Histogram of private messages, bucketed by time to finished", + }) +} + +func RegisterPrivateMsgMetrics() { + registry.MustRegister(PrivateMsgSubmittedCounter) + registry.MustRegister(PrivateMsgConfirmedCounter) + registry.MustRegister(PrivateMsgRejectedCounter) + registry.MustRegister(PrivateMsgHistogram) +} diff --git a/internal/metrics/token_burn.go b/internal/metrics/token_burn.go new file mode 100644 index 0000000000..9b0eefea7f --- /dev/null +++ b/internal/metrics/token_burn.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var BurnSubmittedCounter prometheus.Counter +var BurnConfirmedCounter prometheus.Counter +var BurnRejectedCounter prometheus.Counter +var BurnHistogram prometheus.Histogram + +// BurnSubmittedCounterName is the prometheus metric for tracking the total number of burns submitted +var BurnSubmittedCounterName = "ff_burn_submitted_total" + +// BurnConfirmedCounterName is the prometheus metric for tracking the total number of burns confirmed +var BurnConfirmedCounterName = "ff_burn_confirmed_total" + +// BurnRejectedCounterName is the prometheus metric for tracking the total number of burns rejected +var BurnRejectedCounterName = "ff_burn_rejected_total" + +// BurnHistogramName is the prometheus metric for tracking the total number of burns - histogram +var BurnHistogramName = "ff_burn_histogram" + +func InitTokenBurnMetrics() { + BurnSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BurnSubmittedCounterName, + Help: "Number of submitted burns", + }) + BurnConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BurnConfirmedCounterName, + Help: "Number of confirmed burns", + }) + BurnRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: BurnRejectedCounterName, + Help: "Number of rejected burns", + }) + BurnHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: BurnHistogramName, + Help: "Histogram of burns, bucketed by time to finished", + }) +} + +func RegisterTokenBurnMetrics() { + registry.MustRegister(BurnSubmittedCounter) + registry.MustRegister(BurnConfirmedCounter) + registry.MustRegister(BurnRejectedCounter) + registry.MustRegister(BurnHistogram) +} diff --git a/internal/metrics/token_mint.go b/internal/metrics/token_mint.go new file mode 100644 index 0000000000..36aa9eb4dd --- /dev/null +++ b/internal/metrics/token_mint.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var MintSubmittedCounter prometheus.Counter +var MintConfirmedCounter prometheus.Counter +var MintRejectedCounter prometheus.Counter +var MintHistogram prometheus.Histogram + +// MintSubmittedCounterName is the prometheus metric for tracking the total number of mints submitted +var MintSubmittedCounterName = "ff_mint_submitted_total" + +// MintConfirmedCounterName is the prometheus metric for tracking the total number of mints confirmed +var MintConfirmedCounterName = "ff_mint_confirmed_total" + +// MintRejectedCounterName is the prometheus metric for tracking the total number of mints rejected +var MintRejectedCounterName = "ff_mint_rejected_total" + +// MintHistogramName is the prometheus metric for tracking the total number of mints - histogram +var MintHistogramName = "ff_mint_histogram" + +func InitTokenMintMetrics() { + MintSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: MintSubmittedCounterName, + Help: "Number of submitted mints", + }) + MintConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: MintConfirmedCounterName, + Help: "Number of confirmed mints", + }) + MintRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: MintRejectedCounterName, + Help: "Number of rejected mints", + }) + MintHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: MintHistogramName, + Help: "Histogram of mints, bucketed by time to finished", + }) +} + +func RegisterTokenMintMetrics() { + registry.MustRegister(MintSubmittedCounter) + registry.MustRegister(MintConfirmedCounter) + registry.MustRegister(MintRejectedCounter) + registry.MustRegister(MintHistogram) +} diff --git a/internal/metrics/token_transfer.go b/internal/metrics/token_transfer.go new file mode 100644 index 0000000000..0aaf2571ac --- /dev/null +++ b/internal/metrics/token_transfer.go @@ -0,0 +1,64 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var TransferSubmittedCounter prometheus.Counter +var TransferConfirmedCounter prometheus.Counter +var TransferRejectedCounter prometheus.Counter +var TransferHistogram prometheus.Histogram + +// TransferSubmittedCounterName is the prometheus metric for tracking the total number of transfers submitted +var TransferSubmittedCounterName = "ff_transfer_submitted_total" + +// TransferConfirmedCounterName is the prometheus metric for tracking the total number of transfers confirmed +var TransferConfirmedCounterName = "ff_transfer_confirmed_total" + +// TransferRejectedCounterName is the prometheus metric for tracking the total number of transfers rejected +var TransferRejectedCounterName = "ff_transfer_rejected_total" + +// TransferHistogramName is the prometheus metric for tracking the total number of transfers - histogram +var TransferHistogramName = "ff_transfer_histogram" + +func InitTokenTransferMetrics() { + TransferSubmittedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: TransferSubmittedCounterName, + Help: "Number of submitted transfers", + }) + TransferConfirmedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: TransferConfirmedCounterName, + Help: "Number of confirmed transfers", + }) + TransferRejectedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: TransferRejectedCounterName, + Help: "Number of rejected transfers", + }) + TransferHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: TransferHistogramName, + Help: "Histogram of transfers, bucketed by time to finished", + }) +} + +func RegisterTokenTransferMetrics() { + registry.MustRegister(TransferSubmittedCounter) + registry.MustRegister(TransferConfirmedCounter) + registry.MustRegister(TransferRejectedCounter) + registry.MustRegister(TransferHistogram) +} diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index bbe9ac1ab1..8ff6043f71 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -18,8 +18,10 @@ package privatemessaging import ( "context" + "time" "github.com/hyperledger/firefly/internal/i18n" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/internal/sysmessaging" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -36,6 +38,10 @@ func (pm *privateMessaging) NewMessage(ns string, in *fftypes.MessageInOut) sysm } func (pm *privateMessaging) SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { + if pm.metricsEnabled { + metrics.PrivateMsgSubmittedCounter.Inc() + pm.startTime = time.Now() + } message := pm.NewMessage(ns, in) if waitConfirm { err = message.SendAndWait(ctx) diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index dde0e4deb4..2d2dff91ad 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -19,6 +19,7 @@ package privatemessaging import ( "context" "encoding/json" + "time" "github.com/hyperledger/firefly/internal/batch" "github.com/hyperledger/firefly/internal/batchpin" @@ -44,6 +45,7 @@ type Manager interface { NewMessage(ns string, msg *fftypes.MessageInOut) sysmessaging.MessageSender SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) RequestReply(ctx context.Context, ns string, request *fftypes.MessageInOut) (reply *fftypes.MessageInOut, err error) + GetStartTime() time.Time } type privateMessaging struct { @@ -63,6 +65,8 @@ type privateMessaging struct { localNodeID *fftypes.UUID // lookup and cached on first use, as might not be registered at startup opCorrelationRetries int maxBatchPayloadLength int64 + metricsEnabled bool + startTime time.Time } func NewPrivateMessaging(ctx context.Context, di database.Plugin, im identity.Manager, dx dataexchange.Plugin, bi blockchain.Plugin, ba batch.Manager, dm data.Manager, sa syncasync.Bridge, bp batchpin.Submitter) (Manager, error) { @@ -93,6 +97,7 @@ func NewPrivateMessaging(ctx context.Context, di database.Plugin, im identity.Ma }, opCorrelationRetries: config.GetInt(config.PrivateMessagingOpCorrelationRetries), maxBatchPayloadLength: config.GetByteSize(config.PrivateMessagingBatchPayloadLimit), + metricsEnabled: config.GetBool(config.MetricsEnabled), } pm.groupManager.groupCache = ccache.New( // We use a LRU cache with a size-aware max @@ -116,6 +121,10 @@ func NewPrivateMessaging(ctx context.Context, di database.Plugin, im identity.Ma return pm, nil } +func (pm *privateMessaging) GetStartTime() time.Time { + return pm.startTime +} + func (pm *privateMessaging) Start() error { return pm.exchange.Start() } diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 776794fc57..ac9cd20934 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/mocks/batchmocks" @@ -172,6 +173,15 @@ func TestDispatchBatchWithBlobs(t *testing.T) { mdx.AssertExpectations(t) } +func TestGetStartTime(t *testing.T) { + am, cancel := newTestPrivateMessaging(t) + now := time.Now() + am.startTime = now + defer cancel() + + assert.Equal(t, am.GetStartTime(), now) +} + func TestNewPrivateMessagingMissingDeps(t *testing.T) { _, err := NewPrivateMessaging(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index 726f7cf428..50395a62ed 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -11,6 +11,8 @@ import ( mock "github.com/stretchr/testify/mock" sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" + + time "time" ) // Manager is an autogenerated mock type for the Manager type @@ -78,6 +80,20 @@ func (_m *Manager) CreateTokenPool(ctx context.Context, ns string, pool *fftypes return r0, r1 } +// GetStartTime provides a mock function with given fields: +func (_m *Manager) GetStartTime() time.Time { + ret := _m.Called() + + var r0 time.Time + if rf, ok := ret.Get(0).(func() time.Time); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Time) + } + + return r0 +} + // GetTokenAccountPools provides a mock function with given fields: ctx, ns, key, filter func (_m *Manager) GetTokenAccountPools(ctx context.Context, ns string, key string, filter database.AndFilter) ([]*fftypes.TokenAccountPool, *database.FilterResult, error) { ret := _m.Called(ctx, ns, key, filter) diff --git a/mocks/broadcastmocks/manager.go b/mocks/broadcastmocks/manager.go index f419341a4f..7b797bdc65 100644 --- a/mocks/broadcastmocks/manager.go +++ b/mocks/broadcastmocks/manager.go @@ -9,6 +9,8 @@ import ( mock "github.com/stretchr/testify/mock" sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" + + time "time" ) // Manager is an autogenerated mock type for the Manager type @@ -177,6 +179,20 @@ func (_m *Manager) BroadcastTokenPool(ctx context.Context, ns string, pool *ffty return r0, r1 } +// GetStartTime provides a mock function with given fields: +func (_m *Manager) GetStartTime() time.Time { + ret := _m.Called() + + var r0 time.Time + if rf, ok := ret.Get(0).(func() time.Time); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Time) + } + + return r0 +} + // NewBroadcast provides a mock function with given fields: ns, in func (_m *Manager) NewBroadcast(ns string, in *fftypes.MessageInOut) sysmessaging.MessageSender { ret := _m.Called(ns, in) diff --git a/mocks/privatemessagingmocks/manager.go b/mocks/privatemessagingmocks/manager.go index be51e56a61..8763a81ee4 100644 --- a/mocks/privatemessagingmocks/manager.go +++ b/mocks/privatemessagingmocks/manager.go @@ -11,6 +11,8 @@ import ( mock "github.com/stretchr/testify/mock" sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" + + time "time" ) // Manager is an autogenerated mock type for the Manager type @@ -94,6 +96,20 @@ func (_m *Manager) GetGroupsNS(ctx context.Context, ns string, filter database.A return r0, r1, r2 } +// GetStartTime provides a mock function with given fields: +func (_m *Manager) GetStartTime() time.Time { + ret := _m.Called() + + var r0 time.Time + if rf, ok := ret.Get(0).(func() time.Time); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Time) + } + + return r0 +} + // NewMessage provides a mock function with given fields: ns, msg func (_m *Manager) NewMessage(ns string, msg *fftypes.MessageInOut) sysmessaging.MessageSender { ret := _m.Called(ns, msg) From 7f46f1f6060fce5ec24d91fed542d4a9ba23548a Mon Sep 17 00:00:00 2001 From: David Echelberger Date: Fri, 4 Feb 2022 14:42:08 -0500 Subject: [PATCH 02/14] [new-prom-metrics] using global time map Signed-off-by: David Echelberger --- internal/assets/manager.go | 8 -------- internal/assets/manager_test.go | 10 ---------- internal/assets/token_transfer.go | 11 +++++------ internal/broadcast/manager.go | 7 ------- internal/broadcast/manager_test.go | 13 +++---------- internal/broadcast/message.go | 2 +- internal/events/aggregator.go | 14 +++++--------- internal/events/aggregator_test.go | 6 +----- internal/events/event_manager.go | 2 +- internal/events/tokens_transferred.go | 8 +++++--- internal/metrics/metrics.go | 3 +++ internal/privatemessaging/message.go | 2 +- internal/privatemessaging/privatemessaging.go | 7 ------- .../privatemessaging/privatemessaging_test.go | 13 +++---------- mocks/assetmocks/manager.go | 16 ---------------- mocks/broadcastmocks/manager.go | 16 ---------------- mocks/privatemessagingmocks/manager.go | 16 ---------------- 17 files changed, 28 insertions(+), 126 deletions(-) diff --git a/internal/assets/manager.go b/internal/assets/manager.go index cfa3686d8d..854a2490ac 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -18,7 +18,6 @@ package assets import ( "context" - "time" "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/config" @@ -56,8 +55,6 @@ type Manager interface { GetTokenConnectors(ctx context.Context, ns string) ([]*fftypes.TokenConnector, error) - GetStartTime() time.Time - Start() error WaitStop() } @@ -74,7 +71,6 @@ type assetManager struct { tokens map[string]tokens.Plugin retry retry.Retry metricsEnabled bool - startTime time.Time } func NewAssetManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, pm privatemessaging.Manager, ti map[string]tokens.Plugin) (Manager, error) { @@ -144,10 +140,6 @@ func (am *assetManager) GetTokenConnectors(ctx context.Context, ns string) ([]*f return connectors, nil } -func (am *assetManager) GetStartTime() time.Time { - return am.startTime -} - func (am *assetManager) Start() error { return nil } diff --git a/internal/assets/manager_test.go b/internal/assets/manager_test.go index c9a9dfdbb9..ccd867ea6e 100644 --- a/internal/assets/manager_test.go +++ b/internal/assets/manager_test.go @@ -17,7 +17,6 @@ package assets import ( "context" "testing" - "time" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/mocks/broadcastmocks" @@ -57,15 +56,6 @@ func newTestAssets(t *testing.T) (*assetManager, func()) { return am, cancel } -func TestGetStartTime(t *testing.T) { - am, cancel := newTestAssets(t) - now := time.Now() - am.startTime = now - defer cancel() - - assert.Equal(t, am.GetStartTime(), now) -} - func TestInitFail(t *testing.T) { _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index c3eb834991..9ea102bf4a 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -45,6 +45,8 @@ func (am *assetManager) GetTokenTransferByID(ctx context.Context, ns, id string) } func (am *assetManager) NewTransfer(ns string, transfer *fftypes.TokenTransferInput) sysmessaging.MessageSender { + metrics.Registry() + config.Set(config.MetricsEnabled, true) sender := &transferSender{ metricsEnabled: config.GetBool(config.MetricsEnabled), mgr: am, @@ -128,10 +130,9 @@ func (am *assetManager) MintTokens(ctx context.Context, ns string, transfer *fft if err := am.validateTransfer(ctx, ns, transfer); err != nil { return nil, err } - if am.metricsEnabled { metrics.MintSubmittedCounter.Inc() - am.startTime = time.Now() + metrics.TimeMap[transfer.TX.ID.String()] = time.Now() } sender := am.NewTransfer(ns, transfer) if waitConfirm { @@ -147,10 +148,9 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns string, transfer *fft if err := am.validateTransfer(ctx, ns, transfer); err != nil { return nil, err } - if am.metricsEnabled { metrics.BurnSubmittedCounter.Inc() - am.startTime = time.Now() + metrics.TimeMap[transfer.TX.ID.String()] = time.Now() } sender := am.NewTransfer(ns, transfer) if waitConfirm { @@ -169,10 +169,9 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns string, transfer if transfer.From == transfer.To { return nil, i18n.NewError(ctx, i18n.MsgCannotTransferToSelf) } - if am.metricsEnabled { metrics.TransferSubmittedCounter.Inc() - am.startTime = time.Now() + metrics.TimeMap[transfer.TX.ID.String()] = time.Now() } sender := am.NewTransfer(ns, transfer) if waitConfirm { diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index f2e0be6297..9230b664ca 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "encoding/json" - "time" "github.com/hyperledger/firefly/internal/batch" "github.com/hyperledger/firefly/internal/batchpin" @@ -49,7 +48,6 @@ type Manager interface { BroadcastTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPoolAnnouncement, waitConfirm bool) (msg *fftypes.Message, err error) Start() error WaitStop() - GetStartTime() time.Time } type broadcastManager struct { @@ -65,11 +63,6 @@ type broadcastManager struct { batchpin batchpin.Submitter maxBatchPayloadLength int64 metricsEnabled bool - startTime time.Time -} - -func (bm *broadcastManager) GetStartTime() time.Time { - return bm.startTime } func NewBroadcastManager(ctx context.Context, di database.Plugin, im identity.Manager, dm data.Manager, bi blockchain.Plugin, dx dataexchange.Plugin, pi publicstorage.Plugin, ba batch.Manager, sa syncasync.Bridge, bp batchpin.Submitter) (Manager, error) { diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index ebf3b3160f..6d1f2d7bb4 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -23,9 +23,9 @@ import ( "io" "io/ioutil" "testing" - "time" "github.com/hyperledger/firefly/internal/config" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/batchmocks" "github.com/hyperledger/firefly/mocks/batchpinmocks" "github.com/hyperledger/firefly/mocks/blockchainmocks" @@ -43,6 +43,8 @@ import ( func newTestBroadcast(t *testing.T) (*broadcastManager, func()) { config.Reset() + metrics.Registry() + config.Set(config.MetricsEnabled, true) mdi := &databasemocks.Plugin{} mim := &identitymanagermocks.Manager{} mdm := &datamocks.Manager{} @@ -73,15 +75,6 @@ func newTestBroadcast(t *testing.T) (*broadcastManager, func()) { return b.(*broadcastManager), cancel } -func TestGetStartTime(t *testing.T) { - am, cancel := newTestBroadcast(t) - now := time.Now() - am.startTime = now - defer cancel() - - assert.Equal(t, am.GetStartTime(), now) -} - func TestInitFail(t *testing.T) { _, err := NewBroadcastManager(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index 1351d55eea..c58e0669d0 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -41,7 +41,7 @@ func (bm *broadcastManager) NewBroadcast(ns string, in *fftypes.MessageInOut) sy func (bm *broadcastManager) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { if bm.metricsEnabled { metrics.BroadcastSubmittedCounter.Inc() - bm.startTime = time.Now() + metrics.TimeMap[in.Header.ID.String()] = time.Now() } broadcast := bm.NewBroadcast(ns, in) if waitConfirm { diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index cc48eaf8aa..50f04a575e 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -23,13 +23,11 @@ import ( "encoding/binary" "time" - "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/definitions" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/metrics" - "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/retry" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" @@ -40,7 +38,6 @@ const ( ) type aggregator struct { - bm broadcast.Manager ctx context.Context database database.Plugin definitions definitions.DefinitionHandlers @@ -48,23 +45,20 @@ type aggregator struct { eventPoller *eventPoller newPins chan int64 offchainBatches chan *fftypes.UUID - pm privatemessaging.Manager queuedRewinds chan *fftypes.UUID retry *retry.Retry metricsEnabled bool } -func newAggregator(ctx context.Context, di database.Plugin, sh definitions.DefinitionHandlers, dm data.Manager, en *eventNotifier, bm broadcast.Manager, pm privatemessaging.Manager) *aggregator { +func newAggregator(ctx context.Context, di database.Plugin, sh definitions.DefinitionHandlers, dm data.Manager, en *eventNotifier) *aggregator { batchSize := config.GetInt(config.EventAggregatorBatchSize) ag := &aggregator{ - bm: bm, ctx: log.WithLogField(ctx, "role", "aggregator"), database: di, definitions: sh, data: dm, newPins: make(chan int64), offchainBatches: make(chan *fftypes.UUID, 1), // hops to queuedRewinds with a shouldertab on the event poller - pm: pm, queuedRewinds: make(chan *fftypes.UUID, batchSize), metricsEnabled: config.GetBool(config.MetricsEnabled), } @@ -572,16 +566,18 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M // Metrics for broadcast/private message if ag.metricsEnabled { + timeElapsed := time.Since(metrics.TimeMap[msg.Header.ID.String()]).Seconds() + delete(metrics.TimeMap, msg.Header.ID.String()) switch msg.Header.Type { case fftypes.MessageTypeBroadcast: - metrics.BroadcastHistogram.Observe(time.Since(ag.bm.GetStartTime()).Seconds()) + metrics.BroadcastHistogram.Observe(timeElapsed) if eventType == fftypes.EventTypeMessageConfirmed { metrics.BroadcastConfirmedCounter.Inc() } else if eventType == fftypes.EventTypeMessageRejected { metrics.BroadcastRejectedCounter.Inc() } case fftypes.MessageTypePrivate: - metrics.PrivateMsgHistogram.Observe(time.Since(ag.pm.GetStartTime()).Seconds()) + metrics.PrivateMsgHistogram.Observe(timeElapsed) if eventType == fftypes.EventTypeMessageConfirmed { metrics.PrivateMsgConfirmedCounter.Inc() } else if eventType == fftypes.EventTypeMessageRejected { diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 26465a4b0f..461ae1e24d 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -24,11 +24,9 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/definitions" - "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/definitionsmocks" - "github.com/hyperledger/firefly/mocks/privatemessagingmocks" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/stretchr/testify/assert" @@ -39,10 +37,8 @@ func newTestAggregator() (*aggregator, func()) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} msh := &definitionsmocks.DefinitionHandlers{} - mbm := &broadcastmocks.Manager{} - mpm := &privatemessagingmocks.Manager{} ctx, cancel := context.WithCancel(context.Background()) - ag := newAggregator(ctx, mdi, msh, mdm, newEventNotifier(ctx, "ut"), mbm, mpm) + ag := newAggregator(ctx, mdi, msh, mdm, newEventNotifier(ctx, "ut")) return ag, cancel } diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 1387f117de..03eca7a129 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -124,7 +124,7 @@ func NewEventManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, pi publ opCorrelationRetries: config.GetInt(config.EventAggregatorOpCorrelationRetries), newEventNotifier: newEventNotifier, newPinNotifier: newPinNotifier, - aggregator: newAggregator(ctx, di, dh, dm, newPinNotifier, bm, pm), + aggregator: newAggregator(ctx, di, dh, dm, newPinNotifier), metricsEnabled: config.GetBool(config.MetricsEnabled), } ie, _ := eifactory.GetPlugin(ctx, system.SystemEventsTransport) diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index fef7b2eab6..54ab1ddeb5 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -114,15 +114,17 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke log.L(ctx).Infof("Token transfer recorded id=%s author=%s", transfer.ProtocolID, transfer.Key) if em.metricsEnabled && countMetric { + timeElapsed := time.Since(metrics.TimeMap[transfer.TX.ID.String()]).Seconds() + delete(metrics.TimeMap, transfer.TX.ID.String()) switch transfer.Type { case fftypes.TokenTransferTypeMint: - metrics.MintHistogram.Observe(time.Since(em.assets.GetStartTime()).Seconds()) + metrics.MintHistogram.Observe(timeElapsed) metrics.MintConfirmedCounter.Inc() case fftypes.TokenTransferTypeTransfer: - metrics.TransferHistogram.Observe(time.Since(em.assets.GetStartTime()).Seconds()) + metrics.TransferHistogram.Observe(timeElapsed) metrics.TransferConfirmedCounter.Inc() case fftypes.TokenTransferTypeBurn: - metrics.BurnHistogram.Observe(time.Since(em.assets.GetStartTime()).Seconds()) + metrics.BurnHistogram.Observe(timeElapsed) metrics.BurnConfirmedCounter.Inc() default: break diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 95cfcab379..46124cb589 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -17,6 +17,8 @@ package metrics import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" muxprom "gitlab.com/hfuss/mux-prometheus/pkg/middleware" @@ -26,6 +28,7 @@ var registry *prometheus.Registry var adminInstrumentation *muxprom.Instrumentation var restInstrumentation *muxprom.Instrumentation var BatchPinCounter prometheus.Counter +var TimeMap = make(map[string]time.Time) // MetricsBatchPin is the prometheus metric for total number of batch pins submitted var MetricsBatchPin = "ff_batchpin_total" diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index 8ff6043f71..acc8f01f89 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -40,7 +40,7 @@ func (pm *privateMessaging) NewMessage(ns string, in *fftypes.MessageInOut) sysm func (pm *privateMessaging) SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { if pm.metricsEnabled { metrics.PrivateMsgSubmittedCounter.Inc() - pm.startTime = time.Now() + metrics.TimeMap[in.Header.ID.String()] = time.Now() } message := pm.NewMessage(ns, in) if waitConfirm { diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index d155048a97..90d0bee2a2 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -19,7 +19,6 @@ package privatemessaging import ( "context" "encoding/json" - "time" "github.com/hyperledger/firefly/internal/batch" "github.com/hyperledger/firefly/internal/batchpin" @@ -45,7 +44,6 @@ type Manager interface { NewMessage(ns string, msg *fftypes.MessageInOut) sysmessaging.MessageSender SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) RequestReply(ctx context.Context, ns string, request *fftypes.MessageInOut) (reply *fftypes.MessageInOut, err error) - GetStartTime() time.Time } type privateMessaging struct { @@ -66,7 +64,6 @@ type privateMessaging struct { opCorrelationRetries int maxBatchPayloadLength int64 metricsEnabled bool - startTime time.Time } func NewPrivateMessaging(ctx context.Context, di database.Plugin, im identity.Manager, dx dataexchange.Plugin, bi blockchain.Plugin, ba batch.Manager, dm data.Manager, sa syncasync.Bridge, bp batchpin.Submitter) (Manager, error) { @@ -121,10 +118,6 @@ func NewPrivateMessaging(ctx context.Context, di database.Plugin, im identity.Ma return pm, nil } -func (pm *privateMessaging) GetStartTime() time.Time { - return pm.startTime -} - func (pm *privateMessaging) Start() error { return pm.exchange.Start() } diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index ac9cd20934..da2be4942a 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -20,9 +20,9 @@ import ( "context" "fmt" "testing" - "time" "github.com/hyperledger/firefly/internal/config" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/batchmocks" "github.com/hyperledger/firefly/mocks/batchpinmocks" "github.com/hyperledger/firefly/mocks/blockchainmocks" @@ -37,10 +37,12 @@ import ( ) func newTestPrivateMessaging(t *testing.T) (*privateMessaging, func()) { + metrics.Registry() config.Reset() config.Set(config.NodeName, "node1") config.Set(config.GroupCacheTTL, "1m") config.Set(config.GroupCacheSize, "1m") + config.Set(config.MetricsEnabled, true) mdi := &databasemocks.Plugin{} mim := &identitymanagermocks.Manager{} @@ -173,15 +175,6 @@ func TestDispatchBatchWithBlobs(t *testing.T) { mdx.AssertExpectations(t) } -func TestGetStartTime(t *testing.T) { - am, cancel := newTestPrivateMessaging(t) - now := time.Now() - am.startTime = now - defer cancel() - - assert.Equal(t, am.GetStartTime(), now) -} - func TestNewPrivateMessagingMissingDeps(t *testing.T) { _, err := NewPrivateMessaging(context.Background(), nil, nil, nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index 50395a62ed..726f7cf428 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -11,8 +11,6 @@ import ( mock "github.com/stretchr/testify/mock" sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" - - time "time" ) // Manager is an autogenerated mock type for the Manager type @@ -80,20 +78,6 @@ func (_m *Manager) CreateTokenPool(ctx context.Context, ns string, pool *fftypes return r0, r1 } -// GetStartTime provides a mock function with given fields: -func (_m *Manager) GetStartTime() time.Time { - ret := _m.Called() - - var r0 time.Time - if rf, ok := ret.Get(0).(func() time.Time); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(time.Time) - } - - return r0 -} - // GetTokenAccountPools provides a mock function with given fields: ctx, ns, key, filter func (_m *Manager) GetTokenAccountPools(ctx context.Context, ns string, key string, filter database.AndFilter) ([]*fftypes.TokenAccountPool, *database.FilterResult, error) { ret := _m.Called(ctx, ns, key, filter) diff --git a/mocks/broadcastmocks/manager.go b/mocks/broadcastmocks/manager.go index 7b797bdc65..f419341a4f 100644 --- a/mocks/broadcastmocks/manager.go +++ b/mocks/broadcastmocks/manager.go @@ -9,8 +9,6 @@ import ( mock "github.com/stretchr/testify/mock" sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" - - time "time" ) // Manager is an autogenerated mock type for the Manager type @@ -179,20 +177,6 @@ func (_m *Manager) BroadcastTokenPool(ctx context.Context, ns string, pool *ffty return r0, r1 } -// GetStartTime provides a mock function with given fields: -func (_m *Manager) GetStartTime() time.Time { - ret := _m.Called() - - var r0 time.Time - if rf, ok := ret.Get(0).(func() time.Time); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(time.Time) - } - - return r0 -} - // NewBroadcast provides a mock function with given fields: ns, in func (_m *Manager) NewBroadcast(ns string, in *fftypes.MessageInOut) sysmessaging.MessageSender { ret := _m.Called(ns, in) diff --git a/mocks/privatemessagingmocks/manager.go b/mocks/privatemessagingmocks/manager.go index 8763a81ee4..be51e56a61 100644 --- a/mocks/privatemessagingmocks/manager.go +++ b/mocks/privatemessagingmocks/manager.go @@ -11,8 +11,6 @@ import ( mock "github.com/stretchr/testify/mock" sysmessaging "github.com/hyperledger/firefly/internal/sysmessaging" - - time "time" ) // Manager is an autogenerated mock type for the Manager type @@ -96,20 +94,6 @@ func (_m *Manager) GetGroupsNS(ctx context.Context, ns string, filter database.A return r0, r1, r2 } -// GetStartTime provides a mock function with given fields: -func (_m *Manager) GetStartTime() time.Time { - ret := _m.Called() - - var r0 time.Time - if rf, ok := ret.Get(0).(func() time.Time); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(time.Time) - } - - return r0 -} - // NewMessage provides a mock function with given fields: ns, msg func (_m *Manager) NewMessage(ns string, msg *fftypes.MessageInOut) sysmessaging.MessageSender { ret := _m.Called(ns, msg) From a62009e85084f8445bdc0a7eb7337b143ef9fdf6 Mon Sep 17 00:00:00 2001 From: David Echelberger Date: Sun, 6 Feb 2022 17:28:35 -0500 Subject: [PATCH 03/14] [new-prom-metrics] adding global time map Signed-off-by: David Echelberger --- internal/assets/token_transfer.go | 7 +++---- internal/broadcast/message.go | 3 +-- internal/events/aggregator.go | 6 +++--- internal/events/tokens_transferred.go | 4 ++-- internal/metrics/metrics.go | 20 +++++++++++++++++++- internal/privatemessaging/message.go | 3 +-- 6 files changed, 29 insertions(+), 14 deletions(-) diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index 9ea102bf4a..5a635fc1a2 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -19,7 +19,6 @@ package assets import ( "context" "fmt" - "time" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/i18n" @@ -132,7 +131,7 @@ func (am *assetManager) MintTokens(ctx context.Context, ns string, transfer *fft } if am.metricsEnabled { metrics.MintSubmittedCounter.Inc() - metrics.TimeMap[transfer.TX.ID.String()] = time.Now() + metrics.AddTime(transfer.TX.ID.String()) } sender := am.NewTransfer(ns, transfer) if waitConfirm { @@ -150,7 +149,7 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns string, transfer *fft } if am.metricsEnabled { metrics.BurnSubmittedCounter.Inc() - metrics.TimeMap[transfer.TX.ID.String()] = time.Now() + metrics.AddTime(transfer.TX.ID.String()) } sender := am.NewTransfer(ns, transfer) if waitConfirm { @@ -171,7 +170,7 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns string, transfer } if am.metricsEnabled { metrics.TransferSubmittedCounter.Inc() - metrics.TimeMap[transfer.TX.ID.String()] = time.Now() + metrics.AddTime(transfer.TX.ID.String()) } sender := am.NewTransfer(ns, transfer) if waitConfirm { diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index c58e0669d0..cd44a16de0 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -19,7 +19,6 @@ package broadcast import ( "context" "encoding/json" - "time" "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/metrics" @@ -41,7 +40,7 @@ func (bm *broadcastManager) NewBroadcast(ns string, in *fftypes.MessageInOut) sy func (bm *broadcastManager) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { if bm.metricsEnabled { metrics.BroadcastSubmittedCounter.Inc() - metrics.TimeMap[in.Header.ID.String()] = time.Now() + metrics.AddTime(in.Header.ID.String()) } broadcast := bm.NewBroadcast(ns, in) if waitConfirm { diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index bef177c0d2..7f1e66ae09 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -388,10 +388,10 @@ func (ag *aggregator) attemptMessageDispatch(ctx context.Context, msg *fftypes.M return nil }) - // Metrics for broadcast/private message + // Increment confirmer/rejected metrics for broadcast or private message if ag.metricsEnabled { - timeElapsed := time.Since(metrics.TimeMap[msg.Header.ID.String()]).Seconds() - delete(metrics.TimeMap, msg.Header.ID.String()) + timeElapsed := time.Since(metrics.GetTime(msg.Header.ID.String())).Seconds() + metrics.DeleteTime(msg.Header.ID.String()) switch msg.Header.Type { case fftypes.MessageTypeBroadcast: metrics.BroadcastHistogram.Observe(timeElapsed) diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 54ab1ddeb5..86c6cdc306 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -114,8 +114,8 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke log.L(ctx).Infof("Token transfer recorded id=%s author=%s", transfer.ProtocolID, transfer.Key) if em.metricsEnabled && countMetric { - timeElapsed := time.Since(metrics.TimeMap[transfer.TX.ID.String()]).Seconds() - delete(metrics.TimeMap, transfer.TX.ID.String()) + timeElapsed := time.Since(metrics.GetTime(transfer.TX.ID.String())).Seconds() + metrics.DeleteTime(transfer.TX.ID.String()) switch transfer.Type { case fftypes.TokenTransferTypeMint: metrics.MintHistogram.Observe(timeElapsed) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 46124cb589..16c7ebbaef 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -17,6 +17,7 @@ package metrics import ( + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -28,7 +29,8 @@ var registry *prometheus.Registry var adminInstrumentation *muxprom.Instrumentation var restInstrumentation *muxprom.Instrumentation var BatchPinCounter prometheus.Counter -var TimeMap = make(map[string]time.Time) +var timeMap = make(map[string]time.Time) +var mutex = &sync.Mutex{} // MetricsBatchPin is the prometheus metric for total number of batch pins submitted var MetricsBatchPin = "ff_batchpin_total" @@ -99,3 +101,19 @@ func Clear() { adminInstrumentation = nil restInstrumentation = nil } + +func AddTime(id string) { + mutex.Lock() + timeMap[id] = time.Now() + mutex.Unlock() +} + +func GetTime(id string) time.Time { + return timeMap[id] +} + +func DeleteTime(id string) { + mutex.Lock() + delete(timeMap, id) + mutex.Unlock() +} diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index acc8f01f89..bf931ea5e7 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -18,7 +18,6 @@ package privatemessaging import ( "context" - "time" "github.com/hyperledger/firefly/internal/i18n" "github.com/hyperledger/firefly/internal/metrics" @@ -40,7 +39,7 @@ func (pm *privateMessaging) NewMessage(ns string, in *fftypes.MessageInOut) sysm func (pm *privateMessaging) SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { if pm.metricsEnabled { metrics.PrivateMsgSubmittedCounter.Inc() - metrics.TimeMap[in.Header.ID.String()] = time.Now() + metrics.AddTime(in.Header.ID.String()) } message := pm.NewMessage(ns, in) if waitConfirm { From c4d8455a5433f462c109a20f24098e78378179bd Mon Sep 17 00:00:00 2001 From: David Echelberger Date: Mon, 7 Feb 2022 00:34:56 -0500 Subject: [PATCH 04/14] [new-prom-metrics] global timemap enhancements Signed-off-by: David Echelberger --- internal/assets/token_transfer.go | 12 ++++++------ internal/broadcast/message.go | 4 +++- internal/events/tokens_transferred.go | 4 ++-- internal/metrics/metrics.go | 8 ++++++-- internal/privatemessaging/message.go | 5 +++++ 5 files changed, 22 insertions(+), 11 deletions(-) diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index 5a635fc1a2..322551f4df 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -129,11 +129,11 @@ func (am *assetManager) MintTokens(ctx context.Context, ns string, transfer *fft if err := am.validateTransfer(ctx, ns, transfer); err != nil { return nil, err } + sender := am.NewTransfer(ns, transfer) if am.metricsEnabled { metrics.MintSubmittedCounter.Inc() - metrics.AddTime(transfer.TX.ID.String()) + metrics.AddTime(transfer.LocalID.String()) } - sender := am.NewTransfer(ns, transfer) if waitConfirm { err = sender.SendAndWait(ctx) } else { @@ -147,11 +147,11 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns string, transfer *fft if err := am.validateTransfer(ctx, ns, transfer); err != nil { return nil, err } + sender := am.NewTransfer(ns, transfer) if am.metricsEnabled { metrics.BurnSubmittedCounter.Inc() - metrics.AddTime(transfer.TX.ID.String()) + metrics.AddTime(transfer.LocalID.String()) } - sender := am.NewTransfer(ns, transfer) if waitConfirm { err = sender.SendAndWait(ctx) } else { @@ -168,11 +168,11 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns string, transfer if transfer.From == transfer.To { return nil, i18n.NewError(ctx, i18n.MsgCannotTransferToSelf) } + sender := am.NewTransfer(ns, transfer) if am.metricsEnabled { metrics.TransferSubmittedCounter.Inc() - metrics.AddTime(transfer.TX.ID.String()) + metrics.AddTime(transfer.LocalID.String()) } - sender := am.NewTransfer(ns, transfer) if waitConfirm { err = sender.SendAndWait(ctx) } else { diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index cd44a16de0..164e959d0b 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -38,11 +38,13 @@ func (bm *broadcastManager) NewBroadcast(ns string, in *fftypes.MessageInOut) sy } func (bm *broadcastManager) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { + broadcast := bm.NewBroadcast(ns, in) + if bm.metricsEnabled { metrics.BroadcastSubmittedCounter.Inc() metrics.AddTime(in.Header.ID.String()) } - broadcast := bm.NewBroadcast(ns, in) + if waitConfirm { err = broadcast.SendAndWait(ctx) } else { diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 86c6cdc306..02d813f297 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -114,8 +114,8 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke log.L(ctx).Infof("Token transfer recorded id=%s author=%s", transfer.ProtocolID, transfer.Key) if em.metricsEnabled && countMetric { - timeElapsed := time.Since(metrics.GetTime(transfer.TX.ID.String())).Seconds() - metrics.DeleteTime(transfer.TX.ID.String()) + timeElapsed := time.Since(metrics.GetTime(transfer.LocalID.String())).Seconds() + metrics.DeleteTime(transfer.LocalID.String()) switch transfer.Type { case fftypes.TokenTransferTypeMint: metrics.MintHistogram.Observe(timeElapsed) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 16c7ebbaef..956595033b 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -29,7 +29,7 @@ var registry *prometheus.Registry var adminInstrumentation *muxprom.Instrumentation var restInstrumentation *muxprom.Instrumentation var BatchPinCounter prometheus.Counter -var timeMap = make(map[string]time.Time) +var timeMap map[string]time.Time var mutex = &sync.Mutex{} // MetricsBatchPin is the prometheus metric for total number of batch pins submitted @@ -83,6 +83,7 @@ func initMetricsCollectors() { Name: MetricsBatchPin, Help: "Number of batch pins submitted", }) + timeMap = make(map[string]time.Time) } func registerMetricsCollectors() { @@ -100,11 +101,14 @@ func Clear() { registry = nil adminInstrumentation = nil restInstrumentation = nil + timeMap = make(map[string]time.Time) } func AddTime(id string) { mutex.Lock() - timeMap[id] = time.Now() + if len(id) > 0 { + timeMap[id] = time.Now() + } mutex.Unlock() } diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index bf931ea5e7..b8a50998fa 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -42,6 +42,11 @@ func (pm *privateMessaging) SendMessage(ctx context.Context, ns string, in *ffty metrics.AddTime(in.Header.ID.String()) } message := pm.NewMessage(ns, in) + + if pm.metricsEnabled { + metrics.PrivateMsgSubmittedCounter.Inc() + metrics.AddTime(in.Header.ID.String()) + } if waitConfirm { err = message.SendAndWait(ctx) } else { From 9d0cd9a36e7ee77fbbbbbc2c22b95e0a10573ffc Mon Sep 17 00:00:00 2001 From: David Echelberger Date: Mon, 7 Feb 2022 00:59:29 -0500 Subject: [PATCH 05/14] [new-prom-metrics] removing duplicate metric Signed-off-by: David Echelberger --- internal/assets/token_transfer.go | 6 +++--- internal/broadcast/message.go | 2 +- internal/metrics/metrics.go | 4 +--- internal/privatemessaging/message.go | 6 +----- 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index 322551f4df..9f6e66d401 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -130,7 +130,7 @@ func (am *assetManager) MintTokens(ctx context.Context, ns string, transfer *fft return nil, err } sender := am.NewTransfer(ns, transfer) - if am.metricsEnabled { + if am.metricsEnabled && len(transfer.LocalID.String()) > 0 { metrics.MintSubmittedCounter.Inc() metrics.AddTime(transfer.LocalID.String()) } @@ -148,7 +148,7 @@ func (am *assetManager) BurnTokens(ctx context.Context, ns string, transfer *fft return nil, err } sender := am.NewTransfer(ns, transfer) - if am.metricsEnabled { + if am.metricsEnabled && len(transfer.LocalID.String()) > 0 { metrics.BurnSubmittedCounter.Inc() metrics.AddTime(transfer.LocalID.String()) } @@ -169,7 +169,7 @@ func (am *assetManager) TransferTokens(ctx context.Context, ns string, transfer return nil, i18n.NewError(ctx, i18n.MsgCannotTransferToSelf) } sender := am.NewTransfer(ns, transfer) - if am.metricsEnabled { + if am.metricsEnabled && len(transfer.LocalID.String()) > 0 { metrics.TransferSubmittedCounter.Inc() metrics.AddTime(transfer.LocalID.String()) } diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index 164e959d0b..db4f73af3c 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -40,7 +40,7 @@ func (bm *broadcastManager) NewBroadcast(ns string, in *fftypes.MessageInOut) sy func (bm *broadcastManager) BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { broadcast := bm.NewBroadcast(ns, in) - if bm.metricsEnabled { + if bm.metricsEnabled && len(in.Header.ID.String()) > 0 { metrics.BroadcastSubmittedCounter.Inc() metrics.AddTime(in.Header.ID.String()) } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 956595033b..f376bf25d3 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -106,9 +106,7 @@ func Clear() { func AddTime(id string) { mutex.Lock() - if len(id) > 0 { - timeMap[id] = time.Now() - } + timeMap[id] = time.Now() mutex.Unlock() } diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index b8a50998fa..9a1425116b 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -37,13 +37,9 @@ func (pm *privateMessaging) NewMessage(ns string, in *fftypes.MessageInOut) sysm } func (pm *privateMessaging) SendMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) { - if pm.metricsEnabled { - metrics.PrivateMsgSubmittedCounter.Inc() - metrics.AddTime(in.Header.ID.String()) - } message := pm.NewMessage(ns, in) - if pm.metricsEnabled { + if pm.metricsEnabled && len(in.Header.ID.String()) > 0 { metrics.PrivateMsgSubmittedCounter.Inc() metrics.AddTime(in.Header.ID.String()) } From 634a5d8d9fc30f1933c8c8c051be5523b3716191 Mon Sep 17 00:00:00 2001 From: David Echelberger Date: Tue, 8 Feb 2022 09:54:05 -0500 Subject: [PATCH 06/14] [new-prom-metrics] fixing concurrent metrics issue Signed-off-by: David Echelberger --- internal/metrics/metrics.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index f376bf25d3..8a5c31bd1c 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -111,7 +111,10 @@ func AddTime(id string) { } func GetTime(id string) time.Time { - return timeMap[id] + mutex.Lock() + time := timeMap[id] + mutex.Unlock() + return time } func DeleteTime(id string) { From a704fefc1621689e5ead06e2da827c972d8a7c42 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 13:53:23 -0500 Subject: [PATCH 07/14] Logging in rewind Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager.go | 7 +++++-- internal/broadcast/message.go | 2 +- internal/privatemessaging/message.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index d1dbdfe9a0..49495a6b07 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -219,7 +219,7 @@ func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (data []*fftyp if !foundAll { return nil, i18n.NewError(bm.ctx, i18n.MsgDataNotFound, msg.Header.ID) } - log.L(bm.ctx).Infof("Detected new batch-pinned message %s", msg.Header.ID) + log.L(bm.ctx).Infof("Detected new batch-pinned message %s sequence=%d", msg.Header.ID, msg.Sequence) return data, nil } @@ -227,10 +227,12 @@ func (bm *batchManager) markRewind(rewindTo int64) { bm.rewindMux.Lock() // Make sure we only rewind backwards - as we might get multiple shoulder taps // for different message sequences during a single poll cycle. - if bm.rewindTo < 0 || rewindTo < bm.rewindTo { + previousRewind := bm.rewindTo + if previousRewind < 0 || rewindTo < previousRewind { bm.rewindTo = rewindTo } bm.rewindMux.Unlock() + log.L(bm.ctx).Debugf("Marking rewind to sequence=%d (previous=%d)", rewindTo, previousRewind) } func (bm *batchManager) popRewind() int64 { @@ -245,6 +247,7 @@ func (bm *batchManager) readPage() ([]*fftypes.Message, error) { rewindTo := bm.popRewind() if rewindTo >= 0 && rewindTo < bm.offset { + log.L(bm.ctx).Debugf("Rewinding to sequence=%d", rewindTo) if err := bm.updateOffset(true, rewindTo); err != nil { return nil, err } diff --git a/internal/broadcast/message.go b/internal/broadcast/message.go index 81a93eb919..673f91e0d4 100644 --- a/internal/broadcast/message.go +++ b/internal/broadcast/message.go @@ -166,7 +166,7 @@ func (s *broadcastSender) sendInternal(ctx context.Context, method sendMethod) ( if err := s.mgr.database.UpsertMessage(ctx, &s.msg.Message, database.UpsertOptimizationNew); err != nil { return err } - log.L(ctx).Infof("Sent broadcast message %s:%s", s.msg.Header.Namespace, s.msg.Header.ID) + log.L(ctx).Infof("Sent broadcast message %s:%s sequence=%d", s.msg.Header.Namespace, s.msg.Header.ID, s.msg.Sequence) return err } diff --git a/internal/privatemessaging/message.go b/internal/privatemessaging/message.go index 3376b1265d..2cb7bef6a4 100644 --- a/internal/privatemessaging/message.go +++ b/internal/privatemessaging/message.go @@ -185,7 +185,7 @@ func (s *messageSender) sendInternal(ctx context.Context, method sendMethod) err if err := s.mgr.database.UpsertMessage(ctx, &s.msg.Message, database.UpsertOptimizationNew); err != nil { return err } - log.L(ctx).Infof("Sent private message %s:%s", s.msg.Header.Namespace, s.msg.Header.ID) + log.L(ctx).Infof("Sent private message %s:%s sequence=%d", s.msg.Header.Namespace, s.msg.Header.ID, s.msg.Sequence) if method == methodSendImmediate { if err := s.sendUnpinned(ctx); err != nil { From 3934af8111c08e85fcdcedb90633e2292395bc20 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 14:25:43 -0500 Subject: [PATCH 08/14] Add more debug to show dispatch attempts Signed-off-by: Peter Broadhurst --- internal/batch/batch_manager.go | 2 +- internal/events/aggregator.go | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 49495a6b07..1e0c7f19b6 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -51,7 +51,7 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts), dispatchers: make(map[fftypes.MessageType]*dispatcher), shoulderTap: make(chan bool, 1), - newMessages: make(chan int64, readPageSize), + newMessages: make(chan int64), sequencerClosed: make(chan struct{}), retry: &retry.Retry{ InitialDelay: config.GetDuration(config.BatchRetryInitDelay), diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 0e1c231954..83360f29d5 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -243,6 +243,8 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat } func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, pin *fftypes.Pin, msgBaseIndex int64, msg *fftypes.Message, state *batchState) (err error) { + l := log.L(ctx) + // Check if it's ready to be processed unmaskedContexts := make([]*fftypes.Bytes32, 0, len(msg.Header.Topics)) nextPins := make([]*nextPinState, 0, len(msg.Header.Topics)) @@ -250,14 +252,14 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, // Private messages have one or more masked "pin" hashes that allow us to work // out if it's the next message in the sequence, given the previous messages if msg.Header.Group == nil || len(msg.Pins) == 0 || len(msg.Header.Topics) != len(msg.Pins) { - log.L(ctx).Errorf("Message '%s' in batch '%s' has invalid pin data pins=%v topics=%v", msg.Header.ID, batch.ID, msg.Pins, msg.Header.Topics) + l.Errorf("Message '%s' in batch '%s' has invalid pin data pins=%v topics=%v", msg.Header.ID, batch.ID, msg.Pins, msg.Header.Topics) return nil } for i, pinStr := range msg.Pins { var msgContext fftypes.Bytes32 err := msgContext.UnmarshalText([]byte(pinStr)) if err != nil { - log.L(ctx).Errorf("Message '%s' in batch '%s' has invalid pin at index %d: '%s'", msg.Header.ID, batch.ID, i, pinStr) + l.Errorf("Message '%s' in batch '%s' has invalid pin at index %d: '%s'", msg.Header.ID, batch.ID, i, pinStr) return nil } nextPin, err := state.CheckMaskedContextReady(ctx, msg, msg.Header.Topics[i], pin.Sequence, &msgContext) @@ -280,6 +282,7 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, } + l.Infof("Attempt dispatch msg=%s broadcastContexts=%v privatePins=%v", msg.Header.ID, unmaskedContexts, msg.Pins) dispatched, err := ag.attemptMessageDispatch(ctx, msg, state) if err != nil { return err From 9ce34ddb055a4a60b3f3a8e17cdb01d7cf71477a Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 14:41:29 -0500 Subject: [PATCH 09/14] Fix logging of unmasked context when blocked Signed-off-by: Peter Broadhurst --- internal/events/aggregator.go | 4 ++-- internal/events/aggregator_batch_state.go | 6 +++--- internal/events/aggregator_batch_state_test.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 83360f29d5..4d674e7a12 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -274,7 +274,7 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, h.Write([]byte(topic)) msgContext := fftypes.HashResult(h) unmaskedContexts = append(unmaskedContexts, msgContext) - ready, err := state.CheckUnmaskedContextReady(ctx, *msgContext, msg, msg.Header.Topics[i], pin.Sequence) + ready, err := state.CheckUnmaskedContextReady(ctx, msgContext, msg, msg.Header.Topics[i], pin.Sequence) if err != nil || !ready { return err } @@ -282,7 +282,7 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, } - l.Infof("Attempt dispatch msg=%s broadcastContexts=%v privatePins=%v", msg.Header.ID, unmaskedContexts, msg.Pins) + l.Debugf("Attempt dispatch msg=%s broadcastContexts=%v privatePins=%v", msg.Header.ID, unmaskedContexts, msg.Pins) dispatched, err := ag.attemptMessageDispatch(ctx, msg, state) if err != nil { return err diff --git a/internal/events/aggregator_batch_state.go b/internal/events/aggregator_batch_state.go index 287c5cd561..5510c54445 100644 --- a/internal/events/aggregator_batch_state.go +++ b/internal/events/aggregator_batch_state.go @@ -137,12 +137,12 @@ func (bs *batchState) RunFinalize(ctx context.Context) error { return bs.flushPins(ctx) } -func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnmasked fftypes.Bytes32, msg *fftypes.Message, topic string, firstMsgPinSequence int64) (bool, error) { +func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnmasked *fftypes.Bytes32, msg *fftypes.Message, topic string, firstMsgPinSequence int64) (bool, error) { - ucs, found := bs.unmaskedContexts[contextUnmasked] + ucs, found := bs.unmaskedContexts[*contextUnmasked] if !found { ucs = &contextState{blockedBy: -1} - bs.unmaskedContexts[contextUnmasked] = ucs + bs.unmaskedContexts[*contextUnmasked] = ucs // We need to check there's no earlier sequences with the same unmasked context fb := database.PinQueryFactory.NewFilterLimit(ctx, 1) // only need the first one diff --git a/internal/events/aggregator_batch_state_test.go b/internal/events/aggregator_batch_state_test.go index e846082b7c..c35ec69783 100644 --- a/internal/events/aggregator_batch_state_test.go +++ b/internal/events/aggregator_batch_state_test.go @@ -52,7 +52,7 @@ func TestSetContextBlockedByNoState(t *testing.T) { unmaskedContext := fftypes.NewRandB32() bs.SetContextBlockedBy(ag.ctx, *unmaskedContext, 10) - ready, err := bs.CheckUnmaskedContextReady(ag.ctx, *unmaskedContext, &fftypes.Message{}, "topic1", 1) + ready, err := bs.CheckUnmaskedContextReady(ag.ctx, unmaskedContext, &fftypes.Message{}, "topic1", 1) assert.NoError(t, err) assert.False(t, ready) } From d4e153ea3390c61d3cd4cd51dc52129e066b9d78 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 15:03:05 -0500 Subject: [PATCH 10/14] Ensure index 0 hits Signed-off-by: Peter Broadhurst --- internal/events/aggregator.go | 14 +++++++------- internal/events/aggregator_batch_state.go | 1 + internal/events/aggregator_test.go | 2 ++ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 4d674e7a12..53acfa0815 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -192,7 +192,7 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat // We must check all the contexts in the message, and mark them dispatched together. dupMsgCheck := make(map[fftypes.UUID]bool) for _, pin := range pins { - l.Debugf("Aggregating pin %.10d batch=%s hash=%s masked=%t", pin.Sequence, pin.Batch, pin.Hash, pin.Masked) + l.Debugf("Aggregating pin %.10d batch=%s index=%d hash=%s masked=%t", pin.Sequence, pin.Batch, pin.Index, pin.Hash, pin.Masked) if batch == nil || *batch.ID != *pin.Batch { batch, err = ag.database.GetBatchByID(ctx, pin.Batch) @@ -207,17 +207,17 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat // Extract the message from the batch - where the index is of a topic within a message var msg *fftypes.Message - var i int64 = -1 + var batchPinIndex int64 var msgBaseIndex int64 - for iM := 0; i < pin.Index && iM < len(batch.Payload.Messages); iM++ { + for iM := 0; batchPinIndex <= pin.Index && iM < len(batch.Payload.Messages); iM++ { msg = batch.Payload.Messages[iM] - msgBaseIndex = i - for iT := 0; i < pin.Index && iT < len(msg.Header.Topics); iT++ { - i++ + msgBaseIndex = batchPinIndex + for iT := 0; batchPinIndex < pin.Index && iT < len(msg.Header.Topics); iT++ { + batchPinIndex++ } } - if i < pin.Index { + if batchPinIndex < pin.Index { l.Errorf("Batch %s does not have message-topic index %d - pin %s is invalid", pin.Batch, pin.Index, pin.Hash) continue } diff --git a/internal/events/aggregator_batch_state.go b/internal/events/aggregator_batch_state.go index 5510c54445..917dd6c39e 100644 --- a/internal/events/aggregator_batch_state.go +++ b/internal/events/aggregator_batch_state.go @@ -261,6 +261,7 @@ func (bs *batchState) flushPins(ctx context.Context) error { fb.Gte("index", dm.firstPinIndex), fb.Lte("index", dm.lastPinIndex), ) + log.L(ctx).Debugf("Marking message dispatched batch=%s msg=%s firstIndex=%d lastIndex=%d", dm.batchID, dm.msgID, dm.firstPinIndex, dm.lastPinIndex) update := database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true) if err := bs.database.UpdatePins(ctx, filter, update); err != nil { return err diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index fd9c0197b5..5ea592574f 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/definitions" + "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/definitionsmocks" @@ -166,6 +167,7 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { } func TestAggregationMaskedNextSequenceMatch(t *testing.T) { + log.SetLevel("debug") ag, cancel := newTestAggregator() defer cancel() From f2bb6bbca4d806cc5fc028b962bce9a7f64ba20f Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 15:48:28 -0500 Subject: [PATCH 11/14] Extra logging Signed-off-by: Peter Broadhurst --- internal/events/aggregator.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 53acfa0815..469f340741 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -192,7 +192,6 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat // We must check all the contexts in the message, and mark them dispatched together. dupMsgCheck := make(map[fftypes.UUID]bool) for _, pin := range pins { - l.Debugf("Aggregating pin %.10d batch=%s index=%d hash=%s masked=%t", pin.Sequence, pin.Batch, pin.Index, pin.Hash, pin.Masked) if batch == nil || *batch.ID != *pin.Batch { batch, err = ag.database.GetBatchByID(ctx, pin.Batch) @@ -217,11 +216,11 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat } } + l.Debugf("Aggregating pin %.10d batch=%s msg=%s pinIndex=%d msgBaseIndex=%d hash=%s masked=%t", pin.Sequence, pin.Batch, msg.Header.ID, pin.Index, msgBaseIndex, pin.Hash, pin.Masked) if batchPinIndex < pin.Index { l.Errorf("Batch %s does not have message-topic index %d - pin %s is invalid", pin.Batch, pin.Index, pin.Hash) continue } - l.Tracef("Batch %s message %d: %+v", batch.ID, pin.Index, msg) if msg == nil || msg.Header.ID == nil { l.Errorf("null message entry %d in batch '%s'", pin.Index, batch.ID) continue From 9f13716ecba15fa5acc3b1fd98b23650fdabaa01 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 8 Feb 2022 15:58:12 -0500 Subject: [PATCH 12/14] Avoid incrementing to next message Signed-off-by: Peter Broadhurst --- internal/events/aggregator.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 469f340741..a4af24ee3a 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -208,16 +208,19 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat var msg *fftypes.Message var batchPinIndex int64 var msgBaseIndex int64 - for iM := 0; batchPinIndex <= pin.Index && iM < len(batch.Payload.Messages); iM++ { + for iM := 0; iM < len(batch.Payload.Messages); iM++ { msg = batch.Payload.Messages[iM] msgBaseIndex = batchPinIndex for iT := 0; batchPinIndex < pin.Index && iT < len(msg.Header.Topics); iT++ { batchPinIndex++ } + if batchPinIndex == pin.Index { + break + } } l.Debugf("Aggregating pin %.10d batch=%s msg=%s pinIndex=%d msgBaseIndex=%d hash=%s masked=%t", pin.Sequence, pin.Batch, msg.Header.ID, pin.Index, msgBaseIndex, pin.Hash, pin.Masked) - if batchPinIndex < pin.Index { + if batchPinIndex != pin.Index { l.Errorf("Batch %s does not have message-topic index %d - pin %s is invalid", pin.Batch, pin.Index, pin.Hash) continue } From 1b9f576063dc10f77a59260d06da9012dd478612 Mon Sep 17 00:00:00 2001 From: David Echelberger Date: Tue, 8 Feb 2022 16:55:32 -0500 Subject: [PATCH 13/14] [new-prom-metrics] unit testing for prom metrics --- internal/assets/manager_test.go | 3 + internal/events/aggregator_test.go | 74 +++++++++++- internal/events/event_manager_test.go | 3 + internal/events/operation_update.go | 20 ++-- internal/events/tokens_transferred.go | 2 - internal/events/tokens_transferred_test.go | 124 +++++++++++++++++++++ internal/metrics/metrics.go | 4 + 7 files changed, 216 insertions(+), 14 deletions(-) diff --git a/internal/assets/manager_test.go b/internal/assets/manager_test.go index ccd867ea6e..851838ccf2 100644 --- a/internal/assets/manager_test.go +++ b/internal/assets/manager_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/hyperledger/firefly/internal/config" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" @@ -36,6 +37,8 @@ import ( func newTestAssets(t *testing.T) (*assetManager, func()) { config.Reset() + metrics.Registry() + config.Set(config.MetricsEnabled, true) mdi := &databasemocks.Plugin{} mim := &identitymanagermocks.Manager{} mdm := &datamocks.Manager{} diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index fd9c0197b5..c1d4a1823b 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/definitions" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/definitionsmocks" @@ -34,6 +35,8 @@ import ( ) func newTestAggregator() (*aggregator, func()) { + metrics.Registry() + config.Set(config.MetricsEnabled, true) mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} msh := &definitionsmocks.DefinitionHandlers{} @@ -1306,6 +1309,75 @@ func TestAttemptMessageDispatchGroupInit(t *testing.T) { } +func TestAttemptMessageDispatchSuccessBroadcast(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdi := ag.database.(*databasemocks.Plugin) + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(true, nil) + mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) + + _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypeBroadcast, + }, + }, bs) + assert.NoError(t, err) +} + +func TestAttemptMessageDispatchRejectedBroadcast(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdi := ag.database.(*databasemocks.Plugin) + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(false, nil) + mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) + + _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypeBroadcast, + }, + Data: fftypes.DataRefs{ + {ID: fftypes.NewUUID()}, + }, + }, bs) + assert.NoError(t, err) +} + +func TestAttemptMessageDispatchRejectedPrivate(t *testing.T) { + ag, cancel := newTestAggregator() + defer cancel() + bs := newBatchState(ag) + + mdi := ag.database.(*databasemocks.Plugin) + mdm := ag.data.(*datamocks.Manager) + mdm.On("GetMessageData", ag.ctx, mock.Anything, true).Return([]*fftypes.Data{}, true, nil) + mdm.On("ValidateAll", ag.ctx, mock.Anything).Return(false, nil) + mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertEvent", ag.ctx, mock.Anything).Return(nil) + + _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Type: fftypes.MessageTypePrivate, + }, + Data: fftypes.DataRefs{ + {ID: fftypes.NewUUID()}, + }, + }, bs) + assert.NoError(t, err) +} + func TestAttemptMessageUpdateMessageFail(t *testing.T) { ag, cancel := newTestAggregator() defer cancel() @@ -1318,7 +1390,7 @@ func TestAttemptMessageUpdateMessageFail(t *testing.T) { mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ - Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, + Header: fftypes.MessageHeader{ID: fftypes.NewUUID(), Type: fftypes.MessageTypeBroadcast}, }, bs) assert.NoError(t, err) diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index fa6b222ba6..e4dd27191a 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -23,6 +23,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/events/system" + "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/assetmocks" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" @@ -43,6 +44,8 @@ var testNodeID = fftypes.NewUUID() func newTestEventManager(t *testing.T) (*eventManager, func()) { config.Reset() + metrics.Registry() + config.Set(config.MetricsEnabled, true) ctx, cancel := context.WithCancel(context.Background()) mdi := &databasemocks.Plugin{} mim := &identitymanagermocks.Manager{} diff --git a/internal/events/operation_update.go b/internal/events/operation_update.go index f75db7d2e4..dc594bdf10 100644 --- a/internal/events/operation_update.go +++ b/internal/events/operation_update.go @@ -20,7 +20,6 @@ import ( "context" "github.com/hyperledger/firefly/internal/log" - "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" ) @@ -43,16 +42,15 @@ func (em *eventManager) operationUpdateCtx(ctx context.Context, operationID *fft // Special handling for OpTypeTokenTransfer, which writes an event when it fails if op.Type == fftypes.OpTypeTokenTransfer && txState == fftypes.OpStatusFailed { event := fftypes.NewEvent(fftypes.EventTypeTransferOpFailed, op.Namespace, op.ID) - - if em.metricsEnabled { - // Mint - metrics.MintRejectedCounter.Inc() - // TODO: Figure out way to determine transfer type - // Transfer - // metrics.TransferRejectedCounter.Inc() - // Burn - // metrics.BurnRejectedCounter.Inc() - } + // TODO: Figure out way to determine transfer type + // if em.metricsEnabled { + // Mint + // metrics.MintRejectedCounter.Inc() + // Transfer + // metrics.TransferRejectedCounter.Inc() + // Burn + // metrics.BurnRejectedCounter.Inc() + // } if err := em.database.InsertEvent(ctx, event); err != nil { return err diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 02d813f297..3700833ec9 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -126,8 +126,6 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke case fftypes.TokenTransferTypeBurn: metrics.BurnHistogram.Observe(timeElapsed) metrics.BurnConfirmedCounter.Inc() - default: - break } } diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index c768a87d7e..a0477327f9 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -381,6 +381,68 @@ func TestTokensTransferredWithMessageReceived(t *testing.T) { mti.AssertExpectations(t) } +func TestTokensMintWithMessageSend(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + mdi := em.database.(*databasemocks.Plugin) + mti := &tokenmocks.Plugin{} + + uri := "firefly://token/1" + info := fftypes.JSONObject{"some": "info"} + transfer := &tokens.TokenTransfer{ + PoolProtocolID: "F1", + TokenTransfer: fftypes.TokenTransfer{ + Type: fftypes.TokenTransferTypeMint, + TokenIndex: "0", + URI: uri, + Connector: "erc1155", + Key: "0x12345", + From: "0x1", + To: "0x2", + ProtocolID: "123", + Message: fftypes.NewUUID(), + Amount: *fftypes.NewFFBigInt(1), + }, + Event: blockchain.Event{ + BlockchainTXID: "0xffffeeee", + ProtocolID: "0000/0000/0000", + Info: info, + }, + } + pool := &fftypes.TokenPool{ + Namespace: "ns1", + } + message := &fftypes.Message{ + BatchID: fftypes.NewUUID(), + State: fftypes.MessageStateStaged, + } + + mdi.On("GetTokenTransferByProtocolID", em.ctx, "erc1155", "123").Return(nil, nil).Times(2) + mdi.On("GetTokenPoolByProtocolID", em.ctx, "erc1155", "F1").Return(pool, nil).Times(2) + mdi.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *fftypes.BlockchainEvent) bool { + return e.Namespace == pool.Namespace && e.Name == transfer.Event.Name + })).Return(nil).Times(2) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeBlockchainEvent && ev.Namespace == pool.Namespace + })).Return(nil).Times(2) + mdi.On("UpsertTokenTransfer", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2) + mdi.On("UpdateTokenBalances", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2) + mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2) + mdi.On("ReplaceMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateReady + })).Return(fmt.Errorf("pop")) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace + })).Return(nil).Once() + + err := em.TokensTransferred(mti, transfer) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} + func TestTokensTransferredWithMessageSend(t *testing.T) { em, cancel := newTestEventManager(t) defer cancel() @@ -442,3 +504,65 @@ func TestTokensTransferredWithMessageSend(t *testing.T) { mdi.AssertExpectations(t) mti.AssertExpectations(t) } + +func TestTokensBurndWithMessageSend(t *testing.T) { + em, cancel := newTestEventManager(t) + defer cancel() + + mdi := em.database.(*databasemocks.Plugin) + mti := &tokenmocks.Plugin{} + + uri := "firefly://token/1" + info := fftypes.JSONObject{"some": "info"} + transfer := &tokens.TokenTransfer{ + PoolProtocolID: "F1", + TokenTransfer: fftypes.TokenTransfer{ + Type: fftypes.TokenTransferTypeBurn, + TokenIndex: "0", + URI: uri, + Connector: "erc1155", + Key: "0x12345", + From: "0x1", + To: "0x2", + ProtocolID: "123", + Message: fftypes.NewUUID(), + Amount: *fftypes.NewFFBigInt(1), + }, + Event: blockchain.Event{ + BlockchainTXID: "0xffffeeee", + ProtocolID: "0000/0000/0000", + Info: info, + }, + } + pool := &fftypes.TokenPool{ + Namespace: "ns1", + } + message := &fftypes.Message{ + BatchID: fftypes.NewUUID(), + State: fftypes.MessageStateStaged, + } + + mdi.On("GetTokenTransferByProtocolID", em.ctx, "erc1155", "123").Return(nil, nil).Times(2) + mdi.On("GetTokenPoolByProtocolID", em.ctx, "erc1155", "F1").Return(pool, nil).Times(2) + mdi.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *fftypes.BlockchainEvent) bool { + return e.Namespace == pool.Namespace && e.Name == transfer.Event.Name + })).Return(nil).Times(2) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeBlockchainEvent && ev.Namespace == pool.Namespace + })).Return(nil).Times(2) + mdi.On("UpsertTokenTransfer", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2) + mdi.On("UpdateTokenBalances", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2) + mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2) + mdi.On("ReplaceMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool { + return msg.State == fftypes.MessageStateReady + })).Return(fmt.Errorf("pop")) + mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { + return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace + })).Return(nil).Once() + + err := em.TokensTransferred(mti, transfer) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mti.AssertExpectations(t) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8a5c31bd1c..711239e38d 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -79,6 +79,8 @@ func initMetricsCollectors() { InitBroadcastMetrics() InitPrivateMsgMetrics() InitTokenMintMetrics() + InitTokenTransferMetrics() + InitTokenBurnMetrics() BatchPinCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: MetricsBatchPin, Help: "Number of batch pins submitted", @@ -94,6 +96,8 @@ func registerMetricsCollectors() { RegisterBroadcastMetrics() RegisterPrivateMsgMetrics() RegisterTokenMintMetrics() + RegisterTokenTransferMetrics() + RegisterTokenBurnMetrics() } // Clear will reset the Prometheus metrics registry and instrumentations, useful for testing From 32732eab6b7db740d0302c82c81f8fe9799e0bd9 Mon Sep 17 00:00:00 2001 From: David Echelberger Date: Tue, 8 Feb 2022 17:09:32 -0500 Subject: [PATCH 14/14] [new-prom-metrics] removing batch logs --- internal/batch/batch_manager.go | 9 +++---- internal/events/aggregator.go | 27 ++++++++----------- internal/events/aggregator_batch_state.go | 7 +++-- .../events/aggregator_batch_state_test.go | 2 +- internal/events/aggregator_test.go | 4 +-- 5 files changed, 19 insertions(+), 30 deletions(-) diff --git a/internal/batch/batch_manager.go b/internal/batch/batch_manager.go index 1e0c7f19b6..d1dbdfe9a0 100644 --- a/internal/batch/batch_manager.go +++ b/internal/batch/batch_manager.go @@ -51,7 +51,7 @@ func NewBatchManager(ctx context.Context, ni sysmessaging.LocalNodeInfo, di data startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts), dispatchers: make(map[fftypes.MessageType]*dispatcher), shoulderTap: make(chan bool, 1), - newMessages: make(chan int64), + newMessages: make(chan int64, readPageSize), sequencerClosed: make(chan struct{}), retry: &retry.Retry{ InitialDelay: config.GetDuration(config.BatchRetryInitDelay), @@ -219,7 +219,7 @@ func (bm *batchManager) assembleMessageData(msg *fftypes.Message) (data []*fftyp if !foundAll { return nil, i18n.NewError(bm.ctx, i18n.MsgDataNotFound, msg.Header.ID) } - log.L(bm.ctx).Infof("Detected new batch-pinned message %s sequence=%d", msg.Header.ID, msg.Sequence) + log.L(bm.ctx).Infof("Detected new batch-pinned message %s", msg.Header.ID) return data, nil } @@ -227,12 +227,10 @@ func (bm *batchManager) markRewind(rewindTo int64) { bm.rewindMux.Lock() // Make sure we only rewind backwards - as we might get multiple shoulder taps // for different message sequences during a single poll cycle. - previousRewind := bm.rewindTo - if previousRewind < 0 || rewindTo < previousRewind { + if bm.rewindTo < 0 || rewindTo < bm.rewindTo { bm.rewindTo = rewindTo } bm.rewindMux.Unlock() - log.L(bm.ctx).Debugf("Marking rewind to sequence=%d (previous=%d)", rewindTo, previousRewind) } func (bm *batchManager) popRewind() int64 { @@ -247,7 +245,6 @@ func (bm *batchManager) readPage() ([]*fftypes.Message, error) { rewindTo := bm.popRewind() if rewindTo >= 0 && rewindTo < bm.offset { - log.L(bm.ctx).Debugf("Rewinding to sequence=%d", rewindTo) if err := bm.updateOffset(true, rewindTo); err != nil { return nil, err } diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 248057733a..129bfe3fb3 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -196,6 +196,7 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat // We must check all the contexts in the message, and mark them dispatched together. dupMsgCheck := make(map[fftypes.UUID]bool) for _, pin := range pins { + l.Debugf("Aggregating pin %.10d batch=%s hash=%s masked=%t", pin.Sequence, pin.Batch, pin.Hash, pin.Masked) if batch == nil || *batch.ID != *pin.Batch { batch, err = ag.database.GetBatchByID(ctx, pin.Batch) @@ -210,24 +211,21 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat // Extract the message from the batch - where the index is of a topic within a message var msg *fftypes.Message - var batchPinIndex int64 + var i int64 = -1 var msgBaseIndex int64 - for iM := 0; iM < len(batch.Payload.Messages); iM++ { + for iM := 0; i < pin.Index && iM < len(batch.Payload.Messages); iM++ { msg = batch.Payload.Messages[iM] - msgBaseIndex = batchPinIndex - for iT := 0; batchPinIndex < pin.Index && iT < len(msg.Header.Topics); iT++ { - batchPinIndex++ - } - if batchPinIndex == pin.Index { - break + msgBaseIndex = i + for iT := 0; i < pin.Index && iT < len(msg.Header.Topics); iT++ { + i++ } } - l.Debugf("Aggregating pin %.10d batch=%s msg=%s pinIndex=%d msgBaseIndex=%d hash=%s masked=%t", pin.Sequence, pin.Batch, msg.Header.ID, pin.Index, msgBaseIndex, pin.Hash, pin.Masked) - if batchPinIndex != pin.Index { + if i < pin.Index { l.Errorf("Batch %s does not have message-topic index %d - pin %s is invalid", pin.Batch, pin.Index, pin.Hash) continue } + l.Tracef("Batch %s message %d: %+v", batch.ID, pin.Index, msg) if msg == nil || msg.Header.ID == nil { l.Errorf("null message entry %d in batch '%s'", pin.Index, batch.ID) continue @@ -249,8 +247,6 @@ func (ag *aggregator) processPins(ctx context.Context, pins []*fftypes.Pin, stat } func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, pin *fftypes.Pin, msgBaseIndex int64, msg *fftypes.Message, state *batchState) (err error) { - l := log.L(ctx) - // Check if it's ready to be processed unmaskedContexts := make([]*fftypes.Bytes32, 0, len(msg.Header.Topics)) nextPins := make([]*nextPinState, 0, len(msg.Header.Topics)) @@ -258,14 +254,14 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, // Private messages have one or more masked "pin" hashes that allow us to work // out if it's the next message in the sequence, given the previous messages if msg.Header.Group == nil || len(msg.Pins) == 0 || len(msg.Header.Topics) != len(msg.Pins) { - l.Errorf("Message '%s' in batch '%s' has invalid pin data pins=%v topics=%v", msg.Header.ID, batch.ID, msg.Pins, msg.Header.Topics) + log.L(ctx).Errorf("Message '%s' in batch '%s' has invalid pin data pins=%v topics=%v", msg.Header.ID, batch.ID, msg.Pins, msg.Header.Topics) return nil } for i, pinStr := range msg.Pins { var msgContext fftypes.Bytes32 err := msgContext.UnmarshalText([]byte(pinStr)) if err != nil { - l.Errorf("Message '%s' in batch '%s' has invalid pin at index %d: '%s'", msg.Header.ID, batch.ID, i, pinStr) + log.L(ctx).Errorf("Message '%s' in batch '%s' has invalid pin at index %d: '%s'", msg.Header.ID, batch.ID, i, pinStr) return nil } nextPin, err := state.CheckMaskedContextReady(ctx, msg, msg.Header.Topics[i], pin.Sequence, &msgContext) @@ -280,7 +276,7 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, h.Write([]byte(topic)) msgContext := fftypes.HashResult(h) unmaskedContexts = append(unmaskedContexts, msgContext) - ready, err := state.CheckUnmaskedContextReady(ctx, msgContext, msg, msg.Header.Topics[i], pin.Sequence) + ready, err := state.CheckUnmaskedContextReady(ctx, *msgContext, msg, msg.Header.Topics[i], pin.Sequence) if err != nil || !ready { return err } @@ -288,7 +284,6 @@ func (ag *aggregator) processMessage(ctx context.Context, batch *fftypes.Batch, } - l.Debugf("Attempt dispatch msg=%s broadcastContexts=%v privatePins=%v", msg.Header.ID, unmaskedContexts, msg.Pins) dispatched, err := ag.attemptMessageDispatch(ctx, msg, state) if err != nil { return err diff --git a/internal/events/aggregator_batch_state.go b/internal/events/aggregator_batch_state.go index 917dd6c39e..287c5cd561 100644 --- a/internal/events/aggregator_batch_state.go +++ b/internal/events/aggregator_batch_state.go @@ -137,12 +137,12 @@ func (bs *batchState) RunFinalize(ctx context.Context) error { return bs.flushPins(ctx) } -func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnmasked *fftypes.Bytes32, msg *fftypes.Message, topic string, firstMsgPinSequence int64) (bool, error) { +func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnmasked fftypes.Bytes32, msg *fftypes.Message, topic string, firstMsgPinSequence int64) (bool, error) { - ucs, found := bs.unmaskedContexts[*contextUnmasked] + ucs, found := bs.unmaskedContexts[contextUnmasked] if !found { ucs = &contextState{blockedBy: -1} - bs.unmaskedContexts[*contextUnmasked] = ucs + bs.unmaskedContexts[contextUnmasked] = ucs // We need to check there's no earlier sequences with the same unmasked context fb := database.PinQueryFactory.NewFilterLimit(ctx, 1) // only need the first one @@ -261,7 +261,6 @@ func (bs *batchState) flushPins(ctx context.Context) error { fb.Gte("index", dm.firstPinIndex), fb.Lte("index", dm.lastPinIndex), ) - log.L(ctx).Debugf("Marking message dispatched batch=%s msg=%s firstIndex=%d lastIndex=%d", dm.batchID, dm.msgID, dm.firstPinIndex, dm.lastPinIndex) update := database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true) if err := bs.database.UpdatePins(ctx, filter, update); err != nil { return err diff --git a/internal/events/aggregator_batch_state_test.go b/internal/events/aggregator_batch_state_test.go index c35ec69783..e846082b7c 100644 --- a/internal/events/aggregator_batch_state_test.go +++ b/internal/events/aggregator_batch_state_test.go @@ -52,7 +52,7 @@ func TestSetContextBlockedByNoState(t *testing.T) { unmaskedContext := fftypes.NewRandB32() bs.SetContextBlockedBy(ag.ctx, *unmaskedContext, 10) - ready, err := bs.CheckUnmaskedContextReady(ag.ctx, unmaskedContext, &fftypes.Message{}, "topic1", 1) + ready, err := bs.CheckUnmaskedContextReady(ag.ctx, *unmaskedContext, &fftypes.Message{}, "topic1", 1) assert.NoError(t, err) assert.False(t, ready) } diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 8fa9b6c896..f795466841 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -24,7 +24,6 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/definitions" - "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/metrics" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" @@ -170,7 +169,6 @@ func TestAggregationMaskedZeroNonceMatch(t *testing.T) { } func TestAggregationMaskedNextSequenceMatch(t *testing.T) { - log.SetLevel("debug") ag, cancel := newTestAggregator() defer cancel() @@ -1392,7 +1390,7 @@ func TestAttemptMessageUpdateMessageFail(t *testing.T) { mdi.On("UpdateMessage", ag.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) _, err := ag.attemptMessageDispatch(ag.ctx, &fftypes.Message{ - Header: fftypes.MessageHeader{ID: fftypes.NewUUID(), Type: fftypes.MessageTypeBroadcast}, + Header: fftypes.MessageHeader{ID: fftypes.NewUUID()}, }, bs) assert.NoError(t, err)