Skip to content

Commit

Permalink
feat: Metrics
Browse files Browse the repository at this point in the history
Signed-off-by: MeihaoZuyu <anqur@riema.xyz>
  • Loading branch information
MeihaoZuyu committed May 14, 2024
1 parent 7d97405 commit 1a57785
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 127 deletions.
8 changes: 2 additions & 6 deletions apis/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"

"github.com/RiemaLabs/modular-indexer-committee/internal/metrics"
"github.com/RiemaLabs/modular-indexer-committee/ord"
"github.com/RiemaLabs/modular-indexer-committee/ord/stateless"
)
Expand Down Expand Up @@ -220,19 +221,14 @@ func StartService(queue *stateless.Queue, enableCommittee, enableDebug, enablePp
}
r := gin.Default()

// TODO: Medium. Add the TRUSTED_PROXIES to our config
// trustedProxies := os.Getenv("TRUSTED_PROXIES")
// if trustedProxies != "" {
// r.SetTrustedProxies([]string{trustedProxies})
// }

r.Use(gin.Recovery(), gin.Logger(), cors.New(cors.Config{
AllowOrigins: []string{"*"},
AllowMethods: []string{"POST", "GET"},
AllowHeaders: []string{"*"},
AllowCredentials: true,
MaxAge: 12 * time.Hour,
}))
r.Use(metrics.HTTP)

if enablePprof {
pprof.Register(r)
Expand Down
5 changes: 4 additions & 1 deletion cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type RuntimeArguments struct {
ConfigFilePath string
CommitteeIndexerName string
CommitteeIndexerURL string
ProtocolName string
ProtocolName string
MetricAddr string
}

func NewRuntimeArguments() *RuntimeArguments {
Expand Down Expand Up @@ -58,6 +59,7 @@ leveraging Bitcoin's immutable and decentralized nature to provide a Turing-comp
log.Printf("The name of the committee indexer service is %s\n", arguments.CommitteeIndexerName)
log.Printf("The url of the committee indexer service is %s\n", arguments.CommitteeIndexerURL)
log.Printf("The meta protocol chosen is %s\n", arguments.ProtocolName)
log.Println("Metrics listen at:", arguments.MetricAddr)

Execution(arguments)
},
Expand All @@ -73,5 +75,6 @@ leveraging Bitcoin's immutable and decentralized nature to provide a Turing-comp
rootCmd.Flags().StringVarP(&arguments.CommitteeIndexerName, "name", "n", "", "Indicate the name of the committee indexer service")
rootCmd.Flags().StringVarP(&arguments.CommitteeIndexerURL, "url", "u", "", "Indicate the url of the committee indexer service")
rootCmd.Flags().StringVar(&arguments.ProtocolName, "protocol", "brc-20", "Indicate the meta protocol supported by the committee indexer")
rootCmd.Flags().StringVar(&arguments.MetricAddr, "metrics", "0.0.0.0:8081", "Metrics listening address")
return rootCmd
}
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ require (
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.9.1
github.com/holiman/uint256 v1.2.4
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.5.0
github.com/spf13/cobra v1.8.0
golang.org/x/crypto v0.21.0
gorm.io/driver/postgres v1.5.7
Expand Down Expand Up @@ -143,7 +145,6 @@ require (
github.com/lightningnetwork/lnd/tor v1.1.0 // indirect
github.com/ltcsuite/ltcd v0.0.0-20190101042124-f37f8bf35796 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mholt/archiver/v3 v3.5.0 // indirect
github.com/miekg/dns v1.1.43 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
Expand All @@ -154,10 +155,8 @@ require (
github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/rogpeppe/fastuuid v1.2.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
Expand Down
72 changes: 8 additions & 64 deletions go.sum

Large diffs are not rendered by default.

86 changes: 86 additions & 0 deletions internal/metrics/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package metrics

import (
"errors"
"log"
"net/http"
"strconv"
"time"

"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
StageInitializing = iota + 1
StageCatchup
StageServing
StageUpdating
StageReorg
)

func fqn(name string) string {
return prometheus.BuildFQName("nubit", "modular_committee", name)
}

var (
Version = prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: fqn("version")},
[]string{"version"},
)

Stage = prometheus.NewGauge(prometheus.GaugeOpts{Name: fqn("stage")})

DBQueryDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: fqn("dbquery_duration"),
Buckets: []float64{0.02, 0.05, 0.1, 0.2, 0.5, 1, 5},
},
[]string{"op"},
)

CurrentHeight = prometheus.NewGauge(prometheus.GaugeOpts{Name: fqn("current_height")})

HttpDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: fqn("http_duration"),
Buckets: []float64{0.01, 0.05, 0.1, 0.2, 0.5, 1, 5, 15},
},
[]string{"method", "path", "status"},
)
)

func ObserveDBQuery(op string, started time.Time) {
DBQueryDuration.WithLabelValues(op).Observe(time.Since(started).Seconds())
}

func HTTP(c *gin.Context) {
started := time.Now()

c.Next()

HttpDuration.WithLabelValues(
c.Request.Method,
c.Request.URL.Path,
strconv.Itoa(c.Writer.Status()),
).Observe(time.Since(started).Seconds())
}

func init() {
prometheus.MustRegister(
Version,
Stage,
DBQueryDuration,
CurrentHeight,
HttpDuration,
)
}

func ListenAndServe(addr string) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
if err := (&http.Server{Addr: addr, Handler: mux}).ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}
60 changes: 60 additions & 0 deletions internal/metrics/metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package metrics

import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/client_golang/prometheus/testutil/promlint"
prompb "github.com/prometheus/client_model/go"
)

func TestHTTP(t *testing.T) {
const (
elapsed = 100 * time.Millisecond
rootPath = "/"
metricsPath = "/metrics"
)
g := gin.New()
g.Use(HTTP)
g.GET(rootPath, func(*gin.Context) { time.Sleep(elapsed) })
g.GET(metricsPath, gin.WrapH(promhttp.Handler()))
testServer := httptest.NewServer(g.Handler())
if _, err := testServer.Client().Get(testServer.URL); err != nil {
t.Fatal(err)
}
rsp, err := testServer.Client().Get(testServer.URL + metricsPath)
if err != nil {
t.Fatal(err)
}
defer func() { _ = rsp.Body.Close() }()
l := promlint.New(rsp.Body)
l.AddCustomValidations(func(mf *prompb.MetricFamily) []error {
for _, metric := range mf.GetMetric() {
if h := metric.Histogram; h != nil {
if sum := time.Duration(*h.SampleSum * float64(time.Second)); sum <= elapsed {
t.Fatal(sum)
}
if count := *h.SampleCount; count != 1 {
t.Fatal(count)
}
if v := *metric.Label[0].Value; v != http.MethodGet {
t.Fatal(v)
}
if v := *metric.Label[1].Value; v != rootPath {
t.Fatal(v)
}
if v := *metric.Label[2].Value; v != "200" {
t.Fatal(v)
}
}
}
return nil
})
if _, err := l.Lint(); err != nil {
t.Fatal(err)
}
}
15 changes: 13 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import (

"github.com/RiemaLabs/modular-indexer-committee/apis"
"github.com/RiemaLabs/modular-indexer-committee/checkpoint"
"github.com/RiemaLabs/modular-indexer-committee/internal/metrics"
"github.com/RiemaLabs/modular-indexer-committee/ord"
"github.com/RiemaLabs/modular-indexer-committee/ord/getter"
"github.com/RiemaLabs/modular-indexer-committee/ord/stateless"
)

func CatchupStage(ordGetter getter.OrdGetter, arguments *RuntimeArguments, initHeight uint, latestHeight uint) (*stateless.Queue, error) {
metrics.Stage.Set(metrics.StageCatchup)

// Fetch the latest block height.
header := stateless.LoadHeader(arguments.EnableStateRootCache, initHeight)
curHeight := header.Height
Expand Down Expand Up @@ -51,7 +54,6 @@ func CatchupStage(ordGetter getter.OrdGetter, arguments *RuntimeArguments, initH
}
header.Lock()
stateless.Exec(header, ordTransfer, i)
// header.Height ++
header.Paging(ordGetter, false, stateless.NodeResolveFn)
header.Unlock()
if i%1000 == 0 {
Expand Down Expand Up @@ -97,6 +99,8 @@ func CatchupStage(ordGetter getter.OrdGetter, arguments *RuntimeArguments, initH
}

func ServiceStage(ordGetter getter.OrdGetter, arguments *RuntimeArguments, queue *stateless.Queue, interval time.Duration) {
metrics.Stage.Set(metrics.StageServing)

// Create a channel to listen for SIGINT (Ctrl+C) signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT)
Expand Down Expand Up @@ -127,10 +131,12 @@ func ServiceStage(ordGetter getter.OrdGetter, arguments *RuntimeArguments, queue
}

if curHeight < latestHeight {
metrics.Stage.Set(metrics.StageUpdating)
err := queue.Update(ordGetter, latestHeight)
if err != nil {
log.Fatalf("Failed to update the queue: %v", err)
}
metrics.Stage.Set(metrics.StageServing)
}

reorgHeight, err := queue.CheckForReorg(ordGetter)
Expand All @@ -140,10 +146,12 @@ func ServiceStage(ordGetter getter.OrdGetter, arguments *RuntimeArguments, queue
}

if reorgHeight != 0 {
metrics.Stage.Set(metrics.StageReorg)
err := queue.Recovery(ordGetter, reorgHeight)
if err != nil {
log.Fatalf("Failed to update the queue: %v", err)
}
metrics.Stage.Set(metrics.StageServing)
}

if arguments.EnableCommittee {
Expand Down Expand Up @@ -218,10 +226,13 @@ func ServiceStage(ordGetter getter.OrdGetter, arguments *RuntimeArguments, queue
}

func Execution(arguments *RuntimeArguments) {

// TODO: High. Get the version from Git Tag.
Version = "v0.2.0"

go metrics.ListenAndServe(arguments.MetricAddr)
metrics.Version.WithLabelValues(Version).Set(1)
metrics.Stage.Set(metrics.StageInitializing)

// Get the configuration.
configFile, err := os.ReadFile(arguments.ConfigFilePath)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions ord/getter/opiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package getter

import (
"fmt"
"time"

"gorm.io/driver/postgres"
"gorm.io/gorm"

"github.com/RiemaLabs/modular-indexer-committee/internal/metrics"
)

type DatabaseConfig struct {
Expand Down Expand Up @@ -41,6 +44,8 @@ func NewOPIOrdGetter(config *DatabaseConfig) (*OPIOrdGetter, error) {
}

func (opi *OPIOrdGetter) GetLatestBlockHeight() (uint, error) {
defer metrics.ObserveDBQuery("getLatestBlockHeight", time.Now())

var blockHeight int
sql := `
SELECT block_height
Expand All @@ -54,6 +59,8 @@ func (opi *OPIOrdGetter) GetLatestBlockHeight() (uint, error) {
}

func (opi *OPIOrdGetter) GetBlockHash(blockHeight uint) (string, error) {
defer metrics.ObserveDBQuery("getBlockHash", time.Now())

var blockHash string
sql := `
SELECT block_hash
Expand All @@ -68,6 +75,8 @@ func (opi *OPIOrdGetter) GetBlockHash(blockHeight uint) (string, error) {
}

func (opi *OPIOrdGetter) GetOrdTransfers(blockHeight uint) ([]OrdTransfer, error) {
defer metrics.ObserveDBQuery("getOrdTransfers", time.Now())

var ordTransfers []OrdTransfer
sql := `
SELECT ot.id, ot.inscription_id, ot.block_height, ot.old_satpoint, ot.new_satpoint, ot.new_pkscript, ot.new_wallet, ot.sent_as_fee, oc."content", oc.content_type, onti.parent_id
Expand Down
18 changes: 4 additions & 14 deletions ord/stateless/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,13 @@ import (
"fmt"
"sort"

verkle "github.com/ethereum/go-verkle"
uint256 "github.com/holiman/uint256"
"github.com/ethereum/go-verkle"
"github.com/holiman/uint256"

"github.com/RiemaLabs/modular-indexer-committee/internal/metrics"
"github.com/RiemaLabs/modular-indexer-committee/ord/getter"
)

func NewHeader(getter *getter.OrdGetter, initState *DiffState) *Header {
myHeader := Header{
Root: verkle.New(),
Height: initState.Height,
Hash: initState.Hash,
KV: make(KeyValueMap),
Access: AccessList{},
IntermediateKV: KeyValueMap{},
}
return &myHeader
}

func (h *Header) insert(key []byte, value []byte, nodeResolverFn verkle.NodeResolverFn) {
if len(key) != verkle.KeySize {
panic(fmt.Errorf("the length the key to insert bytes must be %d, current is: %d", verkle.KeySize, len(key)))
Expand Down Expand Up @@ -223,6 +212,7 @@ func (h *Header) Paging(ordGetter getter.OrdGetter, queryHash bool, nodeResolver
h.IntermediateKV = KeyValueMap{}
// Update height and hash
h.Height++
metrics.CurrentHeight.Set(float64(h.Height))
if queryHash {
hash, err := ordGetter.GetBlockHash(h.Height)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion ord/stateless/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func (queue *Queue) Update(getter getter.OrdGetter, latestHeight uint) error {
}

queue.Header.OrdTrans = ordTransfer
// header.Height ++
queue.Header.Paging(getter, true, NodeResolveFn)
}
return nil
Expand Down
Loading

0 comments on commit 1a57785

Please sign in to comment.