Skip to content

Commit

Permalink
fix: Use old-style prometheus metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
tyler-smith committed Jan 16, 2024
1 parent 18677a4 commit b121460
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 223 deletions.
8 changes: 4 additions & 4 deletions eth/filters/dropped_tx_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio
dropped := make(chan core.DropTxsEvent)
droppedSub := api.sys.backend.SubscribeDropTxsEvent(dropped)

metricsDroppedTxsNew.Inc()
defer metricsDroppedTxsEnd.Inc()
metricsDroppedTxsNew.Inc(1)
defer metricsDroppedTxsEnd.Inc(1)

for {
select {
case d := <-dropped:
metricsDroppedTxsReceived.Add(float64(len(d.Txs)))
metricsDroppedTxsReceived.Inc(int64(len(d.Txs)))
for _, tx := range d.Txs {
notification := &dropNotification{
Tx: newRPCPendingTransaction(tx),
Expand All @@ -142,7 +142,7 @@ func (api *FilterAPI) DroppedTransactions(ctx context.Context) (*rpc.Subscriptio
peerid, _ := txPeerMap.Get(tx.Hash())
notification.Peer, _ = peerIDMap.Load(peerid)
}
metricsDroppedTxsSent.Inc()
metricsDroppedTxsSent.Inc(1)
if err := notifier.Notify(rpcSub.ID, notification); err != nil {
log.Error("dropped_txs_stream: failed to notify", "err", err)
return
Expand Down
225 changes: 23 additions & 202 deletions eth/filters/metrics.go
Original file line number Diff line number Diff line change
@@ -1,209 +1,30 @@
package filters

import (
"os"

"github.com/prometheus/client_golang/prometheus"

bnPrometheus "github.com/ethereum/go-ethereum/bn/prometheus"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

const streamSubsystem string = "stream"

var (
metricsHostName string

metricsPendingTxsNew = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_new",
Help: "Number of pending tx streams created",
},
)

metricsPendingTxsEnd = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_end",
Help: "Number of pending tx streams ended",
},
)

metricsPendingTxsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_received",
Help: "Number of pending txs received",
},
)

metricsPendingTxsGasTooLow = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_gas_too_low",
Help: "Number txs ignore because of gas",
},
)

metricsPendingTxsTraceSuccess = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_trace_success",
Help: "Number txs successfully traced",
},
)

metricsPendingTxsTraceFailed = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_trace_failed",
Help: "Number txs failed to trace",
},
)

metricsPendingTxsSent = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "pending_txs_sent",
Help: "Number of pending txs sent",
},
)

metricsBlocksNew = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_new",
Help: "Number of block streams created",
},
)

metricsBlocksEnd = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_end",
Help: "Number of block streams ended",
},
)

metricsBlocksReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_received",
Help: "Number of blocks received",
},
)

metricsBlocksTraceSuccess = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_trace_success",
Help: "Number of blocks successfully traced",
},
)

metricsBlocksTraceFailed = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_trace_failed",
Help: "Number of blocks failed to trace",
},
)

metricsBlocksSent = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "blocks_sent",
Help: "Number of blocks sent",
},
)

metricsDroppedTxsNew = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "dropped_txs_new",
Help: "Number of dropped tx streams created",
},
)

metricsDroppedTxsEnd = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "dropped_txs_end",
Help: "Number of dropped tx streams ended",
},
)

metricsDroppedTxsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "dropped_txs_received",
Help: "Number of dropped txs received",
},
)

metricsDroppedTxsSent = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: streamSubsystem,
Name: "dropped_txs_sent",
Help: "Number of dropped txs sent",
},
)

metricsTracePendingTxTimer = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: streamSubsystem,
Name: "trace_pending_tx_duration",
Help: "Trace pending tx duration in seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
},
[]string{},
)

metricsTraceBlockTimer = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: streamSubsystem,
Name: "trace_blocks_duration",
Help: "Trace blocks duration in seconds",
Buckets: []float64{.01, .025, .05, .1, .25, .5, 1, 5, 10, 15},
},
[]string{},
)
metricsPendingTxsNew = metrics.NewRegisteredCounter("stream/pending_txs/new", nil)
metricsPendingTxsEnd = metrics.NewRegisteredCounter("stream/pending_txs/end", nil)
metricsPendingTxsReceived = metrics.NewRegisteredCounter("stream/pending_txs/received", nil)
metricsPendingTxsGasTooLow = metrics.NewRegisteredCounter("stream/pending_txs/gas_too_low", nil)
metricsPendingTxsTraceSuccess = metrics.NewRegisteredCounter("stream/pending_txs/trace_success", nil)
metricsPendingTxsTraceFailed = metrics.NewRegisteredCounter("stream/pending_txs/trace_failed", nil)
metricsPendingTxsSent = metrics.NewRegisteredCounter("stream/pending_txs/sent", nil)

metricsBlocksNew = metrics.NewRegisteredCounter("stream/blocks/new", nil)
metricsBlocksEnd = metrics.NewRegisteredCounter("stream/blocks/end", nil)
metricsBlocksReceived = metrics.NewRegisteredCounter("stream/blocks/received", nil)
metricsBlocksTraceSuccess = metrics.NewRegisteredCounter("stream/blocks/trace_success", nil)
metricsBlocksTraceFailed = metrics.NewRegisteredCounter("stream/blocks/trace_failed", nil)
metricsBlocksSent = metrics.NewRegisteredCounter("stream/blocks/sent", nil)

metricsDroppedTxsNew = metrics.NewRegisteredCounter("stream/dropped_txs/new", nil)
metricsDroppedTxsEnd = metrics.NewRegisteredCounter("stream/dropped_txs/end", nil)
metricsDroppedTxsReceived = metrics.NewRegisteredCounter("stream/dropped_txs/received", nil)
metricsDroppedTxsSent = metrics.NewRegisteredCounter("stream/dropped_txs/sent", nil)

metricsTracePendingTxTimer = metrics.NewRegisteredHistogram("stream/pending_txs/trace_duration", nil, metrics.NewExpDecaySample(1028, 0.015))
metricsTraceBlockTimer = metrics.NewRegisteredHistogram("stream/blocks/trace_duration", nil, metrics.NewExpDecaySample(1028, 0.015))
)

func init() {
var err error
metricsHostName, err = os.Hostname()
if err != nil {
log.Error("failed to get hostname for metrics", "err", err)
}

register := func(c prometheus.Collector) {
if err := bnPrometheus.Metrics.Register(c); err != nil {
log.Error("failed to register metrics", "err", err)
}
}

register(metricsPendingTxsNew)
register(metricsPendingTxsEnd)
register(metricsPendingTxsReceived)
register(metricsPendingTxsGasTooLow)
register(metricsPendingTxsTraceSuccess)
register(metricsPendingTxsTraceFailed)
register(metricsPendingTxsSent)

register(metricsBlocksNew)
register(metricsBlocksEnd)
register(metricsBlocksReceived)
register(metricsBlocksTraceSuccess)
register(metricsBlocksTraceFailed)
register(metricsBlocksSent)

register(metricsDroppedTxsNew)
register(metricsDroppedTxsEnd)
register(metricsDroppedTxsReceived)
register(metricsDroppedTxsSent)

register(metricsTracePendingTxTimer)
register(metricsTraceBlockTimer)
}
34 changes: 17 additions & 17 deletions eth/filters/trace_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
return
}

metricsPendingTxsNew.Inc()
defer metricsPendingTxsEnd.Inc()
metricsPendingTxsNew.Inc(1)
defer metricsPendingTxsEnd.Inc(1)

for {
select {
Expand Down Expand Up @@ -94,7 +94,7 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
return
}

metricsPendingTxsReceived.Add(float64(len(txs)))
metricsPendingTxsReceived.Inc(int64(len(txs)))
for _, tx := range txs {
msg, _ = core.TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
Expand All @@ -104,20 +104,20 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace

if msg.GasFeeCap.Cmp(header.BaseFee) < 0 {
log.Trace("pending_txs_stream: tx gas fee too low", "tx", tx.Hash(), "gasFeeCap", msg.GasFeeCap, "baseFee", header.BaseFee)
metricsPendingTxsGasTooLow.Inc()
metricsPendingTxsGasTooLow.Inc(1)
continue
}

traceCtx.TxHash = tx.Hash()
timer := prometheus.NewTimer(metricsTracePendingTxTimer.With(nil))
startTime := time.Now()
trace, err := traceTx(msg, traceCtx, blockCtx, chainConfig, statedb, tracerOpts)
if err != nil {
log.Error("pending_txs_stream: failed to trace tx", "err", err, "tx", tx.Hash())
metricsPendingTxsTraceFailed.Inc()
metricsPendingTxsTraceFailed.Inc(1)
continue
}
timer.ObserveDuration()
metricsPendingTxsTraceSuccess.Inc()
metricsTracePendingTxTimer.Update(time.Since(startTime).Milliseconds())
metricsPendingTxsTraceSuccess.Inc(1)

gasPrice := hexutil.Big(*tx.GasPrice())
rpcTx := newRPCPendingTransaction(tx)
Expand All @@ -137,7 +137,7 @@ func (api *FilterAPI) NewPendingTransactionsWithTrace(ctx context.Context, trace
log.Error("pending_txs_stream: failed to notify", "err", err)
return
}
metricsPendingTxsSent.Add(float64(len(tracedTxs)))
metricsPendingTxsSent.Inc(int64(len(tracedTxs)))
case <-rpcSub.Err():
return
case <-notifier.Closed():
Expand Down Expand Up @@ -174,8 +174,8 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
return
}

metricsBlocksNew.Inc()
defer metricsBlocksEnd.Inc()
metricsBlocksNew.Inc(1)
defer metricsBlocksEnd.Inc(1)

var hashes []common.Hash
for {
Expand All @@ -197,7 +197,7 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
return
}

metricsBlocksReceived.Add(float64(len(hashes)))
metricsBlocksReceived.Inc(int64(len(hashes)))
for _, hash := range hashes {
block, err := api.sys.backend.BlockByHash(ctx, hash)
if err != nil {
Expand All @@ -214,11 +214,11 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON

trace, err := traceBlock(block, chainConfig, api.sys.chain, tracerOpts)
if err != nil {
metricsBlocksTraceFailed.Inc()
metricsBlocksTraceFailed.Inc(1)
log.Error("block_stream: failed to trace block", "err", err, "block", block.Number())
continue
}
metricsBlocksTraceSuccess.Inc()
metricsBlocksTraceSuccess.Inc(1)
marshalBlock["trace"] = trace

marshalReceipts := make(map[common.Hash]map[string]interface{})
Expand Down Expand Up @@ -256,7 +256,7 @@ func (api *FilterAPI) NewFullBlocksWithTrace(ctx context.Context, tracerOptsJSON
return
}
log.Info("block_stream: sent block", "hash", hash, "number", block.Number(), "sub_id", rpcSub.ID)
metricsBlocksSent.Inc()
metricsBlocksSent.Inc(1)
}
}
}()
Expand Down Expand Up @@ -308,7 +308,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core
results = make([]*blocknative.Trace, len(txs))
)

timer := prometheus.NewTimer(metricsTraceBlockTimer.With(nil))
startTime := time.Now()
for i, tx := range txs {
msg, err := core.TransactionToMessage(tx, signer, block.BaseFee())
if err != nil {
Expand All @@ -326,7 +326,7 @@ func traceBlock(block *types.Block, chainConfig *params.ChainConfig, chain *core
}
statedb.Finalise(is158)
}
timer.ObserveDuration()
metricsTraceBlockTimer.Update(time.Since(startTime).Milliseconds())

return results, nil
}
Expand Down

0 comments on commit b121460

Please sign in to comment.