Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-node: Add channel_input_bytes metric #5385

Merged
merged 3 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions op-node/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Metricer interface {
RecordSequencerBuildingDiffTime(duration time.Duration)
RecordSequencerSealingTime(duration time.Duration)
Document() []metrics.DocumentedMetric
RecordChannelInputBytes(num int)
// P2P Metrics
SetPeerScores(scores map[string]float64)
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
Expand Down Expand Up @@ -131,6 +132,8 @@ type Metrics struct {
GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec

ChannelInputBytes prometheus.Counter

registry *prometheus.Registry
factory metrics.Factory
}
Expand Down Expand Up @@ -331,6 +334,12 @@ func NewMetrics(procName string) *Metrics {
"direction",
}),

ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "channel_input_bytes",
Help: "Number of compressed bytes added to the channel",
}),

P2PReqDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: "p2p",
Expand Down Expand Up @@ -635,6 +644,10 @@ func (m *Metrics) PayloadsQuarantineSize(n int) {
m.PayloadsQuarantineTotal.Set(float64(n))
}

func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) {
m.ChannelInputBytes.Add(float64(inputCompressedBytes))
}

type noopMetricer struct{}

var NoopMetrics Metricer = new(noopMetricer)
Expand Down Expand Up @@ -737,3 +750,6 @@ func (n *noopMetricer) ServerPayloadByNumberEvent(num uint64, resultCode byte, d

func (n *noopMetricer) PayloadsQuarantineSize(int) {
}

func (n *noopMetricer) RecordChannelInputBytes(int) {
}
10 changes: 7 additions & 3 deletions op-node/rollup/derive/channel_in_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ type ChannelInReader struct {
nextBatchFn func() (BatchWithL1InclusionBlock, error)

prev *ChannelBank

metrics Metrics
}

var _ ResetableStage = (*ChannelInReader)(nil)

// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader {
func NewChannelInReader(log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader {
return &ChannelInReader{
log: log,
prev: prev,
log: log,
prev: prev,
metrics: metrics,
}
}

Expand All @@ -41,6 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef {
func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil {
cr.nextBatchFn = f
cr.metrics.RecordChannelInputBytes(len(data))
return nil
} else {
cr.log.Error("Error creating batch reader from channel data", "err", err)
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Metrics interface {
RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef)
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
RecordChannelInputBytes(inputCompresedBytes int)
}

type L1Fetcher interface {
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher)
chInReader := NewChannelInReader(log, bank)
chInReader := NewChannelInReader(log, bank, metrics)
batchQueue := NewBatchQueue(log, cfg, chInReader)
attrBuilder := NewFetchingAttributesBuilder(cfg, l1Fetcher, engine)
attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue)
Expand Down
1 change: 1 addition & 0 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Metrics interface {

RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef)
RecordChannelInputBytes(inputCompresedBytes int)

RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)

Expand Down
19 changes: 14 additions & 5 deletions op-node/testutils/metrics.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package testutils

import "github.com/ethereum-optimism/optimism/op-node/eth"
import (
"github.com/ethereum-optimism/optimism/op-node/eth"
)

// TestDerivationMetrics implements the metrics used in the derivation pipeline as no-op operations.
// Optionally a test may hook into the metrics
type TestDerivationMetrics struct {
FnRecordL1ReorgDepth func(d uint64)
FnRecordL1Ref func(name string, ref eth.L1BlockRef)
FnRecordL2Ref func(name string, ref eth.L2BlockRef)
FnRecordUnsafePayloads func(length uint64, memSize uint64, next eth.BlockID)
FnRecordL1ReorgDepth func(d uint64)
FnRecordL1Ref func(name string, ref eth.L1BlockRef)
FnRecordL2Ref func(name string, ref eth.L2BlockRef)
FnRecordUnsafePayloads func(length uint64, memSize uint64, next eth.BlockID)
FnRecordChannelInputBytes func(inputCompresedBytes int)
}

func (t *TestDerivationMetrics) RecordL1ReorgDepth(d uint64) {
Expand All @@ -35,6 +38,12 @@ func (t *TestDerivationMetrics) RecordUnsafePayloadsBuffer(length uint64, memSiz
}
}

func (t *TestDerivationMetrics) RecordChannelInputBytes(inputCompresedBytes int) {
if t.FnRecordChannelInputBytes != nil {
t.FnRecordChannelInputBytes(inputCompresedBytes)
}
}

type TestRPCMetrics struct{}

func (n *TestRPCMetrics) RecordRPCServerRequest(method string) func() {
Expand Down