diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go new file mode 100644 index 0000000000..eec1a93254 --- /dev/null +++ b/dot/metrics/collector.go @@ -0,0 +1,141 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package metrics + +import ( + "context" + "runtime" + "sync" + "time" + + ethmetrics "github.com/ethereum/go-ethereum/metrics" +) + +// GaugeMetrics interface allows the exportation of many gauge metrics +// the implementer could exports +type GaugeMetrics interface { + CollectGauge() map[string]int64 +} + +// Collector struct controls the metrics and executes polling to extract the values +type Collector struct { + ctx context.Context + gauges []GaugeMetrics + wg sync.WaitGroup +} + +// NewCollector creates a new Collector +func NewCollector(ctx context.Context) *Collector { + return &Collector{ + ctx: ctx, + wg: sync.WaitGroup{}, + gauges: make([]GaugeMetrics, 0), + } +} + +// Start will start one goroutine to collect all the gauges registered and +// a separate goroutine to collect process metrics +func (c *Collector) Start() { + ethmetrics.Enabled = true + c.wg.Add(2) + + go c.startCollectProccessMetrics() + go c.startCollectGauges() + + c.wg.Wait() +} + +// AddGauge adds a GaugeMetrics implementer on gauges list +func (c *Collector) AddGauge(g GaugeMetrics) { + c.gauges = append(c.gauges, g) +} + +func (c *Collector) startCollectGauges() { + t := time.NewTicker(Refresh) + defer func() { + t.Stop() + c.wg.Done() + }() + + for { + select { + case <-c.ctx.Done(): + return + case <-t.C: + for _, g := range c.gauges { + m := g.CollectGauge() + + for label, value := range m { + pooltx := ethmetrics.GetOrRegisterGauge(label, nil) + pooltx.Update(value) + } + } + } + } +} + +func (c *Collector) startCollectProccessMetrics() { + cpuStats := make([]*ethmetrics.CPUStats, 2) + memStats := make([]*runtime.MemStats, 2) + for i := 0; i < len(memStats); i++ { + cpuStats[i] = new(ethmetrics.CPUStats) + memStats[i] = new(runtime.MemStats) + } + + // Define the various metrics to collect + var ( + cpuSysLoad = ethmetrics.GetOrRegisterGauge("system/cpu/sysload", ethmetrics.DefaultRegistry) + cpuSysWait = ethmetrics.GetOrRegisterGauge("system/cpu/syswait", ethmetrics.DefaultRegistry) + cpuProcLoad = ethmetrics.GetOrRegisterGauge("system/cpu/procload", ethmetrics.DefaultRegistry) + cpuGoroutines = ethmetrics.GetOrRegisterGauge("system/cpu/goroutines", ethmetrics.DefaultRegistry) + + memPauses = ethmetrics.GetOrRegisterMeter("system/memory/pauses", ethmetrics.DefaultRegistry) + memAlloc = ethmetrics.GetOrRegisterMeter("system/memory/allocs", ethmetrics.DefaultRegistry) + memFrees = ethmetrics.GetOrRegisterMeter("system/memory/frees", ethmetrics.DefaultRegistry) + memHeld = ethmetrics.GetOrRegisterGauge("system/memory/held", ethmetrics.DefaultRegistry) + memUsed = ethmetrics.GetOrRegisterGauge("system/memory/used", ethmetrics.DefaultRegistry) + ) + + t := time.NewTicker(Refresh) + defer func() { + t.Stop() + c.wg.Done() + }() + + for i := 1; ; i++ { + select { + case <-c.ctx.Done(): + return + case <-t.C: + location1 := i % 2 + location2 := (i - 1) % 2 + + ethmetrics.ReadCPUStats(cpuStats[location1]) + cpuSysLoad.Update((cpuStats[location1].GlobalTime - cpuStats[location2].GlobalTime) / refreshFreq) + cpuSysWait.Update((cpuStats[location1].GlobalWait - cpuStats[location2].GlobalWait) / refreshFreq) + cpuProcLoad.Update((cpuStats[location1].LocalTime - cpuStats[location2].LocalTime) / refreshFreq) + cpuGoroutines.Update(int64(runtime.NumGoroutine())) + + runtime.ReadMemStats(memStats[location1]) + memPauses.Mark(int64(memStats[location1].PauseTotalNs - memStats[location2].PauseTotalNs)) + memAlloc.Mark(int64(memStats[location1].Mallocs - memStats[location2].Mallocs)) + memFrees.Mark(int64(memStats[location1].Frees - memStats[location2].Frees)) + memHeld.Update(int64(memStats[location1].HeapSys - memStats[location1].HeapReleased)) + memUsed.Update(int64(memStats[location1].Alloc)) + } + } +} diff --git a/dot/metrics/metrics.go b/dot/metrics/metrics.go index d48818ed91..94c2067317 100644 --- a/dot/metrics/metrics.go +++ b/dot/metrics/metrics.go @@ -1,60 +1,53 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + package metrics import ( - "runtime" + "fmt" + "net/http" "time" - "github.com/ethereum/go-ethereum/metrics" + log "github.com/ChainSafe/log15" + ethmetrics "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/metrics/prometheus" ) +var logger = log.New("pkg", "metrics") + const ( // Refresh is the refresh time for publishing metrics. Refresh = time.Second * 10 refreshFreq = int64(Refresh / time.Second) ) -// CollectProcessMetrics periodically collects various metrics about the running process. -func CollectProcessMetrics() { - // Create the various data collectors - cpuStats := make([]*metrics.CPUStats, 2) - memStats := make([]*runtime.MemStats, 2) - for i := 0; i < len(memStats); i++ { - cpuStats[i] = new(metrics.CPUStats) - memStats[i] = new(runtime.MemStats) - } - - // Define the various metrics to collect - var ( - cpuSysLoad = metrics.GetOrRegisterGauge("system/cpu/sysload", metrics.DefaultRegistry) - cpuSysWait = metrics.GetOrRegisterGauge("system/cpu/syswait", metrics.DefaultRegistry) - cpuProcLoad = metrics.GetOrRegisterGauge("system/cpu/procload", metrics.DefaultRegistry) - cpuGoroutines = metrics.GetOrRegisterGauge("system/cpu/goroutines", metrics.DefaultRegistry) - - memPauses = metrics.GetOrRegisterMeter("system/memory/pauses", metrics.DefaultRegistry) - memAlloc = metrics.GetOrRegisterMeter("system/memory/allocs", metrics.DefaultRegistry) - memFrees = metrics.GetOrRegisterMeter("system/memory/frees", metrics.DefaultRegistry) - memHeld = metrics.GetOrRegisterGauge("system/memory/held", metrics.DefaultRegistry) - memUsed = metrics.GetOrRegisterGauge("system/memory/used", metrics.DefaultRegistry) - ) - - // Iterate loading the different stats and updating the meters - for i := 1; ; i++ { - location1 := i % 2 - location2 := (i - 1) % 2 - - metrics.ReadCPUStats(cpuStats[location1]) - cpuSysLoad.Update((cpuStats[location1].GlobalTime - cpuStats[location2].GlobalTime) / refreshFreq) - cpuSysWait.Update((cpuStats[location1].GlobalWait - cpuStats[location2].GlobalWait) / refreshFreq) - cpuProcLoad.Update((cpuStats[location1].LocalTime - cpuStats[location2].LocalTime) / refreshFreq) - cpuGoroutines.Update(int64(runtime.NumGoroutine())) - - runtime.ReadMemStats(memStats[location1]) - memPauses.Mark(int64(memStats[location1].PauseTotalNs - memStats[location2].PauseTotalNs)) - memAlloc.Mark(int64(memStats[location1].Mallocs - memStats[location2].Mallocs)) - memFrees.Mark(int64(memStats[location1].Frees - memStats[location2].Frees)) - memHeld.Update(int64(memStats[location1].HeapSys - memStats[location1].HeapReleased)) - memUsed.Update(int64(memStats[location1].Alloc)) +// PublishMetrics function will export the /metrics endpoint to prometheus process +func PublishMetrics(address string) { + ethmetrics.Enabled = true + setupMetricsServer(address) +} - time.Sleep(Refresh) - } +// setupMetricsServer starts a dedicated metrics server at the given address. +func setupMetricsServer(address string) { + m := http.NewServeMux() + m.Handle("/metrics", prometheus.Handler(ethmetrics.DefaultRegistry)) + logger.Info("Starting metrics server", "addr", fmt.Sprintf("http://%s/metrics", address)) + go func() { + if err := http.ListenAndServe(address, m); err != nil { + log.Error("Failure in running metrics server", "err", err) + } + }() } diff --git a/dot/node.go b/dot/node.go index 9af3efdb8a..15eba9210d 100644 --- a/dot/node.go +++ b/dot/node.go @@ -17,8 +17,8 @@ package dot import ( + "context" "fmt" - "net/http" "os" "os/signal" "path" @@ -28,7 +28,7 @@ import ( "syscall" "time" - gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" + "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/dot/state/pruner" @@ -39,8 +39,6 @@ import ( "github.com/ChainSafe/gossamer/lib/services" "github.com/ChainSafe/gossamer/lib/utils" log "github.com/ChainSafe/log15" - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/metrics/prometheus" ) var logger = log.New("pkg", "dot") @@ -332,7 +330,14 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, } if cfg.Global.PublishMetrics { - publishMetrics(cfg) + c := metrics.NewCollector(context.Background()) + c.AddGauge(stateSrvc) + + go c.Start() + + address := fmt.Sprintf("%s:%d", cfg.RPC.Host, cfg.Global.MetricsPort) + log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address) + metrics.PublishMetrics(address) } gd, err := stateSrvc.Base.LoadGenesisData() @@ -361,28 +366,6 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, return node, nil } -func publishMetrics(cfg *Config) { - metrics.Enabled = true - address := fmt.Sprintf("%s:%d", cfg.RPC.Host, cfg.Global.MetricsPort) - log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address) - setupMetricsServer(address) - - // Start system runtime metrics collection - go gssmrmetrics.CollectProcessMetrics() -} - -// setupMetricsServer starts a dedicated metrics server at the given address. -func setupMetricsServer(address string) { - m := http.NewServeMux() - m.Handle("/metrics", prometheus.Handler(metrics.DefaultRegistry)) - log.Info("Starting metrics server", "addr", fmt.Sprintf("http://%s/metrics", address)) - go func() { - if err := http.ListenAndServe(address, m); err != nil { - log.Error("Failure in running metrics server", "err", err) - } - }() -} - // stores the global node name to reuse func storeGlobalNodeName(name, basepath string) (err error) { db, err := utils.SetupDatabase(basepath, false) diff --git a/dot/state/service.go b/dot/state/service.go index a617df4df6..0bb239b35c 100644 --- a/dot/state/service.go +++ b/dot/state/service.go @@ -33,6 +33,9 @@ import ( log "github.com/ChainSafe/log15" ) +const readyPoolTransactionsMetrics = "gossamer/ready/pool/transaction/metrics" +const readyPriorityQueueTransactions = "gossamer/ready/queue/transaction/metrics" + var logger = log.New("pkg", "state") // Service is the struct that holds storage, block and network states @@ -395,3 +398,11 @@ func (s *Service) Import(header *types.Header, t *trie.Trie, firstSlot uint64) e return s.db.Close() } + +// CollectGauge exports 2 metrics related to valid transaction pool and queue +func (s *Service) CollectGauge() map[string]int64 { + return map[string]int64{ + readyPoolTransactionsMetrics: int64(s.Transaction.pool.Len()), + readyPriorityQueueTransactions: int64(s.Transaction.queue.Len()), + } +} diff --git a/dot/state/service_test.go b/dot/state/service_test.go index 5278a789b3..00e7258dcf 100644 --- a/dot/state/service_test.go +++ b/dot/state/service_test.go @@ -17,21 +17,25 @@ package state import ( + "context" "fmt" "io/ioutil" "math/big" "testing" "time" + "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/state/pruner" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" + "github.com/ChainSafe/gossamer/lib/transaction" "github.com/ChainSafe/gossamer/lib/trie" "github.com/ChainSafe/gossamer/lib/utils" "github.com/ChainSafe/chaindb" log "github.com/ChainSafe/log15" + ethmetrics "github.com/ethereum/go-ethereum/metrics" "github.com/stretchr/testify/require" ) @@ -400,3 +404,53 @@ func TestService_Import(t *testing.T) { err = serv.Stop() require.NoError(t, err) } + +func TestStateServiceMetrics(t *testing.T) { + testDir := utils.NewTestDir(t) + defer utils.RemoveTestDir(t) + + config := Config{ + Path: testDir, + LogLevel: log.LvlInfo, + } + ethmetrics.Enabled = true + serv := NewService(config) + serv.Transaction = NewTransactionState() + + m := metrics.NewCollector(context.Background()) + m.AddGauge(serv) + go m.Start() + + vtxs := []*transaction.ValidTransaction{ + { + Extrinsic: []byte("a"), + Validity: &transaction.Validity{Priority: 1}, + }, + { + Extrinsic: []byte("b"), + Validity: &transaction.Validity{Priority: 4}, + }, + } + + hashes := make([]common.Hash, len(vtxs)) + for i, v := range vtxs { + h := serv.Transaction.pool.Insert(v) + serv.Transaction.queue.Push(v) + + hashes[i] = h + } + + time.Sleep(time.Second + metrics.Refresh) + gpool := ethmetrics.GetOrRegisterGauge(readyPoolTransactionsMetrics, nil) + gqueue := ethmetrics.GetOrRegisterGauge(readyPriorityQueueTransactions, nil) + + require.Equal(t, int64(2), gpool.Value()) + require.Equal(t, int64(2), gqueue.Value()) + + serv.Transaction.pool.Remove(hashes[0]) + serv.Transaction.queue.Pop() + + time.Sleep(time.Second + metrics.Refresh) + require.Equal(t, int64(1), gpool.Value()) + require.Equal(t, int64(1), gqueue.Value()) +} diff --git a/lib/transaction/pool.go b/lib/transaction/pool.go index 3d126740eb..1f9c3f8944 100644 --- a/lib/transaction/pool.go +++ b/lib/transaction/pool.go @@ -1,3 +1,19 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + package transaction import ( @@ -49,3 +65,11 @@ func (p *Pool) Remove(hash common.Hash) { defer p.mu.Unlock() delete(p.transactions, hash) } + +// Len return the current length of the pool +func (p *Pool) Len() int { + p.mu.Lock() + defer p.mu.Unlock() + + return len(p.transactions) +} diff --git a/lib/transaction/priority_queue.go b/lib/transaction/priority_queue.go index f14207bd3f..538d72bc17 100644 --- a/lib/transaction/priority_queue.go +++ b/lib/transaction/priority_queue.go @@ -94,6 +94,7 @@ func NewPriorityQueue() *PriorityQueue { pq: make(priorityQueue, 0), txs: make(map[common.Hash]*Item), } + heap.Init(&spq.pq) return spq } @@ -171,3 +172,11 @@ func (spq *PriorityQueue) Pending() []*ValidTransaction { } return txns } + +// Len return the current length of the queue +func (spq *PriorityQueue) Len() int { + spq.Lock() + defer spq.Unlock() + + return spq.pq.Len() +}