From 681cfafbd22ceaf5d2e5bf632799a65984232816 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 24 Jun 2021 10:14:07 -0400 Subject: [PATCH 01/12] pool ready transaction metrics --- lib/transaction/pool.go | 26 +++++++++++++++- lib/transaction/pool_test.go | 52 +++++++++++++++++++++++++++++++ lib/transaction/priority_queue.go | 2 ++ 3 files changed, 79 insertions(+), 1 deletion(-) diff --git a/lib/transaction/pool.go b/lib/transaction/pool.go index 3d126740eb..857737fd6f 100644 --- a/lib/transaction/pool.go +++ b/lib/transaction/pool.go @@ -2,10 +2,15 @@ package transaction import ( "sync" + "time" "github.com/ChainSafe/gossamer/lib/common" + ethmetrics "github.com/ethereum/go-ethereum/metrics" ) +const collectTxMetricsTimeout = time.Second * 5 +const readyTransactionsMetrics = "gossamer/ready/transaction/metrics" + // Pool represents the transaction pool type Pool struct { transactions map[common.Hash]*ValidTransaction @@ -14,9 +19,12 @@ type Pool struct { // NewPool returns a new empty Pool func NewPool() *Pool { - return &Pool{ + p := &Pool{ transactions: make(map[common.Hash]*ValidTransaction), } + + go p.collectMetrics() + return p } // Transactions returns all the transactions in the pool @@ -49,3 +57,19 @@ func (p *Pool) Remove(hash common.Hash) { defer p.mu.Unlock() delete(p.transactions, hash) } + +func (p *Pool) collectMetrics() { + t := time.NewTicker(collectTxMetricsTimeout) + defer t.Stop() + + for range t.C { + p.collect() + } + +} + +func (p *Pool) collect() { + ethmetrics.Enabled = true + pooltx := ethmetrics.GetOrRegisterGauge(readyTransactionsMetrics, nil) + pooltx.Update(int64(len(p.transactions))) +} diff --git a/lib/transaction/pool_test.go b/lib/transaction/pool_test.go index 8abbe4219a..9af5354ce0 100644 --- a/lib/transaction/pool_test.go +++ b/lib/transaction/pool_test.go @@ -1,11 +1,15 @@ package transaction import ( + "fmt" "sort" "testing" + "time" "github.com/ChainSafe/gossamer/lib/common" "github.com/stretchr/testify/require" + + ethmetrics "github.com/ethereum/go-ethereum/metrics" ) func TestPool(t *testing.T) { @@ -50,3 +54,51 @@ func TestPool(t *testing.T) { } require.Equal(t, 0, len(p.Transactions())) } + +func TestPoolCollectMetrics(t *testing.T) { + //reset metric + ethmetrics.Unregister(readyTransactionsMetrics) + + ethmetrics.Enabled = true + txmetrics := ethmetrics.GetOrRegisterGauge(readyTransactionsMetrics, nil) + + require.Equal(t, int64(0), txmetrics.Value()) + + validtx := []*ValidTransaction{ + { + Extrinsic: []byte("a"), + Validity: &Validity{Priority: 1}, + }, + { + Extrinsic: []byte("b"), + Validity: &Validity{Priority: 4}, + }, + { + Extrinsic: []byte("c"), + Validity: &Validity{Priority: 2}, + }, + { + Extrinsic: []byte("d"), + Validity: &Validity{Priority: 17}, + }, + { + Extrinsic: []byte("e"), + Validity: &Validity{Priority: 2}, + }, + } + + h := make([]common.Hash, len(validtx)) + p := NewPool() + for i, v := range validtx { + h[i] = p.Insert(v) + } + + time.Sleep(collectTxMetricsTimeout + time.Second) + require.Equal(t, int64(len(validtx)), txmetrics.Value()) + + p.Remove(h[0]) + + time.Sleep(collectTxMetricsTimeout + time.Second) + fmt.Println(len(p.transactions)) + require.Equal(t, int64(len(validtx)-1), txmetrics.Value()) +} diff --git a/lib/transaction/priority_queue.go b/lib/transaction/priority_queue.go index f14207bd3f..810bec4f6c 100644 --- a/lib/transaction/priority_queue.go +++ b/lib/transaction/priority_queue.go @@ -25,6 +25,8 @@ import ( "github.com/ChainSafe/gossamer/lib/common" ) +const readyPriorityQueueTransactions = "gossamer/ready/transaction/metrics" + // ErrTransactionExists is returned when trying to add a transaction to the queue that already exists var ErrTransactionExists = errors.New("transaction is already in queue") From 5f62d09a032e6958cb0faa1b3fa9f98e3a4fbc81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 24 Jun 2021 10:46:08 -0400 Subject: [PATCH 02/12] chore: add priority queue metrics --- dot/metrics/collector.go | 28 ++++++++++++++++++++++++++ lib/transaction/pool.go | 25 ++++++++--------------- lib/transaction/priority_queue.go | 10 +++++++++ lib/transaction/priority_queue_test.go | 19 +++++++++++++++++ 4 files changed, 65 insertions(+), 17 deletions(-) create mode 100644 dot/metrics/collector.go diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go new file mode 100644 index 0000000000..85ba3186d2 --- /dev/null +++ b/dot/metrics/collector.go @@ -0,0 +1,28 @@ +package metrics + +import ( + "time" + + ethmetrics "github.com/ethereum/go-ethereum/metrics" +) + +type GaugeCollector interface { + Update() int64 +} + +func CollectGaugeMetrics(timeout time.Duration, label string, c GaugeCollector) { + t := time.NewTicker(timeout) + defer t.Stop() + + collectGauge(label, c) + + for range t.C { + collectGauge(label, c) + } +} + +func collectGauge(label string, c GaugeCollector) { + ethmetrics.Enabled = true + pooltx := ethmetrics.GetOrRegisterGauge(label, nil) + pooltx.Update(c.Update()) +} diff --git a/lib/transaction/pool.go b/lib/transaction/pool.go index 857737fd6f..3595949375 100644 --- a/lib/transaction/pool.go +++ b/lib/transaction/pool.go @@ -4,8 +4,8 @@ import ( "sync" "time" + "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/lib/common" - ethmetrics "github.com/ethereum/go-ethereum/metrics" ) const collectTxMetricsTimeout = time.Second * 5 @@ -23,7 +23,12 @@ func NewPool() *Pool { transactions: make(map[common.Hash]*ValidTransaction), } - go p.collectMetrics() + go metrics.CollectGaugeMetrics( + collectTxMetricsTimeout, + readyTransactionsMetrics, + p, + ) + return p } @@ -58,18 +63,4 @@ func (p *Pool) Remove(hash common.Hash) { delete(p.transactions, hash) } -func (p *Pool) collectMetrics() { - t := time.NewTicker(collectTxMetricsTimeout) - defer t.Stop() - - for range t.C { - p.collect() - } - -} - -func (p *Pool) collect() { - ethmetrics.Enabled = true - pooltx := ethmetrics.GetOrRegisterGauge(readyTransactionsMetrics, nil) - pooltx.Update(int64(len(p.transactions))) -} +func (p *Pool) Update() int64 { return int64(len(p.transactions)) } diff --git a/lib/transaction/priority_queue.go b/lib/transaction/priority_queue.go index 810bec4f6c..8e9c0fc0ff 100644 --- a/lib/transaction/priority_queue.go +++ b/lib/transaction/priority_queue.go @@ -21,6 +21,7 @@ import ( "errors" "sync" + "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" ) @@ -96,6 +97,13 @@ func NewPriorityQueue() *PriorityQueue { pq: make(priorityQueue, 0), txs: make(map[common.Hash]*Item), } + + go metrics.CollectGaugeMetrics( + collectTxMetricsTimeout, + readyPriorityQueueTransactions, + spq, + ) + heap.Init(&spq.pq) return spq } @@ -173,3 +181,5 @@ func (spq *PriorityQueue) Pending() []*ValidTransaction { } return txns } + +func (spq *PriorityQueue) Update() int64 { return int64(spq.pq.Len()) } diff --git a/lib/transaction/priority_queue_test.go b/lib/transaction/priority_queue_test.go index 3f9e4e72a0..d8a08eb3fb 100644 --- a/lib/transaction/priority_queue_test.go +++ b/lib/transaction/priority_queue_test.go @@ -19,6 +19,10 @@ package transaction import ( "reflect" "testing" + "time" + + ethmetrics "github.com/ethereum/go-ethereum/metrics" + "github.com/stretchr/testify/require" ) func TestPriorityQueue(t *testing.T) { @@ -45,6 +49,13 @@ func TestPriorityQueue(t *testing.T) { }, } + //check metrics + ethmetrics.Unregister(readyPriorityQueueTransactions) + ethmetrics.Enabled = true + + priorityQueueM := ethmetrics.GetOrRegisterGauge(readyPriorityQueueTransactions, nil) + require.Equal(t, int64(0), priorityQueueM.Value()) + pq := NewPriorityQueue() expected := []int{3, 1, 2, 4, 0} @@ -52,6 +63,10 @@ func TestPriorityQueue(t *testing.T) { pq.Push(node) } + // wait for metrics to be collected + time.Sleep(collectTxMetricsTimeout + time.Second) + require.Equal(t, int64(len(tests)), priorityQueueM.Value()) + for i, exp := range expected { n := pq.Pop() if !reflect.DeepEqual(n, tests[exp]) { @@ -60,6 +75,10 @@ func TestPriorityQueue(t *testing.T) { t.Fatalf("Fail: iteration %d got %v expected %v", i, n, tests[exp]) } } + + // wait for metrics to be collected + time.Sleep(collectTxMetricsTimeout + time.Second) + require.Equal(t, int64(pq.pq.Len()), priorityQueueM.Value()) } func TestPriorityQueueAgain(t *testing.T) { From 34b7c33f0732efdc3b3b097f691fc0fad270c191 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 24 Jun 2021 11:21:59 -0400 Subject: [PATCH 03/12] chore: fix lint --- dot/metrics/collector.go | 4 ++++ lib/transaction/pool.go | 1 + lib/transaction/priority_queue.go | 1 + 3 files changed, 6 insertions(+) diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go index 85ba3186d2..40fce98a6f 100644 --- a/dot/metrics/collector.go +++ b/dot/metrics/collector.go @@ -6,10 +6,14 @@ import ( ethmetrics "github.com/ethereum/go-ethereum/metrics" ) +// GaugeCollector abstracts the Update function to collectors +// just implement the collection type GaugeCollector interface { Update() int64 } +// CollectGaugeMetrics receives an timeout, label and a gauge collector +// and acquire the metrics timeout by timeout to a ethereum metrics gauge func CollectGaugeMetrics(timeout time.Duration, label string, c GaugeCollector) { t := time.NewTicker(timeout) defer t.Stop() diff --git a/lib/transaction/pool.go b/lib/transaction/pool.go index 3595949375..78dacacd3f 100644 --- a/lib/transaction/pool.go +++ b/lib/transaction/pool.go @@ -63,4 +63,5 @@ func (p *Pool) Remove(hash common.Hash) { delete(p.transactions, hash) } +// Update returns the total of valid transactions in the pool func (p *Pool) Update() int64 { return int64(len(p.transactions)) } diff --git a/lib/transaction/priority_queue.go b/lib/transaction/priority_queue.go index 8e9c0fc0ff..d9de455f03 100644 --- a/lib/transaction/priority_queue.go +++ b/lib/transaction/priority_queue.go @@ -182,4 +182,5 @@ func (spq *PriorityQueue) Pending() []*ValidTransaction { return txns } +// Update returns the total of valid transactions in the priority queue func (spq *PriorityQueue) Update() int64 { return int64(spq.pq.Len()) } From cc20c9f40a3d661c0542ccbc5fd265b6f7c0a439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 24 Jun 2021 15:45:27 -0400 Subject: [PATCH 04/12] chore: remove println --- lib/transaction/pool_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/transaction/pool_test.go b/lib/transaction/pool_test.go index 9af5354ce0..eb86ebb116 100644 --- a/lib/transaction/pool_test.go +++ b/lib/transaction/pool_test.go @@ -1,7 +1,6 @@ package transaction import ( - "fmt" "sort" "testing" "time" @@ -99,6 +98,5 @@ func TestPoolCollectMetrics(t *testing.T) { p.Remove(h[0]) time.Sleep(collectTxMetricsTimeout + time.Second) - fmt.Println(len(p.transactions)) require.Equal(t, int64(len(validtx)-1), txmetrics.Value()) } From 4f15bd1ab9dddae92c08ecc72653cc4eeac46378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 24 Jun 2021 16:08:32 -0400 Subject: [PATCH 05/12] chore: add copyright comments --- dot/metrics/collector.go | 16 ++++++++++++++++ dot/metrics/metrics.go | 16 ++++++++++++++++ lib/transaction/pool.go | 16 ++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go index 40fce98a6f..7b9a4f676a 100644 --- a/dot/metrics/collector.go +++ b/dot/metrics/collector.go @@ -1,3 +1,19 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + package metrics import ( diff --git a/dot/metrics/metrics.go b/dot/metrics/metrics.go index d48818ed91..1e9840e0cc 100644 --- a/dot/metrics/metrics.go +++ b/dot/metrics/metrics.go @@ -1,3 +1,19 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + package metrics import ( diff --git a/lib/transaction/pool.go b/lib/transaction/pool.go index 78dacacd3f..a23f7fbd8c 100644 --- a/lib/transaction/pool.go +++ b/lib/transaction/pool.go @@ -1,3 +1,19 @@ +// Copyright 2019 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + package transaction import ( From 8227b2f38114ce5e537f4eff1ad9a1dd50f5424e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Mon, 28 Jun 2021 11:47:06 -0400 Subject: [PATCH 06/12] chore: resolve comments --- dot/metrics/collector.go | 114 ++++++++++++++++++++++--- dot/metrics/metrics.go | 66 +++++--------- dot/node.go | 36 ++------ dot/state/service.go | 10 +++ dot/state/service_test.go | 54 ++++++++++++ lib/transaction/pool.go | 21 ++--- lib/transaction/pool_test.go | 50 ----------- lib/transaction/priority_queue.go | 15 +--- lib/transaction/priority_queue_test.go | 19 ----- 9 files changed, 203 insertions(+), 182 deletions(-) diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go index 7b9a4f676a..1998c424c5 100644 --- a/dot/metrics/collector.go +++ b/dot/metrics/collector.go @@ -17,32 +17,118 @@ package metrics import ( + "context" + "runtime" "time" ethmetrics "github.com/ethereum/go-ethereum/metrics" ) -// GaugeCollector abstracts the Update function to collectors -// just implement the collection -type GaugeCollector interface { - Update() int64 +type GaugeMetrics interface { + CollectGauge() map[string]int64 } -// CollectGaugeMetrics receives an timeout, label and a gauge collector -// and acquire the metrics timeout by timeout to a ethereum metrics gauge -func CollectGaugeMetrics(timeout time.Duration, label string, c GaugeCollector) { - t := time.NewTicker(timeout) +type Collector struct { + ctx context.Context + gauges []GaugeMetrics +} + +func NewCollector(ctx context.Context) *Collector { + return &Collector{ + ctx: ctx, + gauges: make([]GaugeMetrics, 0), + } +} + +func (c *Collector) Start() { + go c.startCollectProccessMetrics() + go c.startCollectGauges() +} + +func (c *Collector) Stop() { + for _, g := range c.gauges { + m := g.CollectGauge() + + for label := range m { + ethmetrics.Unregister(label) + } + } +} + +func (c *Collector) AddGauge(g GaugeMetrics) { + c.gauges = append(c.gauges, g) +} + +func (c *Collector) startCollectGauges() { + ethmetrics.Enabled = true + + t := time.NewTicker(Refresh) defer t.Stop() - collectGauge(label, c) + for { + select { + case <-c.ctx.Done(): + return + case <-t.C: + for _, g := range c.gauges { + m := g.CollectGauge() - for range t.C { - collectGauge(label, c) + for label, value := range m { + pooltx := ethmetrics.GetOrRegisterGauge(label, nil) + pooltx.Update(value) + } + } + } } } -func collectGauge(label string, c GaugeCollector) { +func (c *Collector) startCollectProccessMetrics() { ethmetrics.Enabled = true - pooltx := ethmetrics.GetOrRegisterGauge(label, nil) - pooltx.Update(c.Update()) + + cpuStats := make([]*ethmetrics.CPUStats, 2) + memStats := make([]*runtime.MemStats, 2) + for i := 0; i < len(memStats); i++ { + cpuStats[i] = new(ethmetrics.CPUStats) + memStats[i] = new(runtime.MemStats) + } + + // Define the various metrics to collect + var ( + cpuSysLoad = ethmetrics.GetOrRegisterGauge("system/cpu/sysload", ethmetrics.DefaultRegistry) + cpuSysWait = ethmetrics.GetOrRegisterGauge("system/cpu/syswait", ethmetrics.DefaultRegistry) + cpuProcLoad = ethmetrics.GetOrRegisterGauge("system/cpu/procload", ethmetrics.DefaultRegistry) + cpuGoroutines = ethmetrics.GetOrRegisterGauge("system/cpu/goroutines", ethmetrics.DefaultRegistry) + + memPauses = ethmetrics.GetOrRegisterMeter("system/memory/pauses", ethmetrics.DefaultRegistry) + memAlloc = ethmetrics.GetOrRegisterMeter("system/memory/allocs", ethmetrics.DefaultRegistry) + memFrees = ethmetrics.GetOrRegisterMeter("system/memory/frees", ethmetrics.DefaultRegistry) + memHeld = ethmetrics.GetOrRegisterGauge("system/memory/held", ethmetrics.DefaultRegistry) + memUsed = ethmetrics.GetOrRegisterGauge("system/memory/used", ethmetrics.DefaultRegistry) + ) + + t := time.NewTicker(Refresh) + defer t.Stop() + + for i := 1; ; i++ { + select { + case <-c.ctx.Done(): + return + case <-t.C: + location1 := i % 2 + location2 := (i - 1) % 2 + + ethmetrics.ReadCPUStats(cpuStats[location1]) + cpuSysLoad.Update((cpuStats[location1].GlobalTime - cpuStats[location2].GlobalTime) / refreshFreq) + cpuSysWait.Update((cpuStats[location1].GlobalWait - cpuStats[location2].GlobalWait) / refreshFreq) + cpuProcLoad.Update((cpuStats[location1].LocalTime - cpuStats[location2].LocalTime) / refreshFreq) + cpuGoroutines.Update(int64(runtime.NumGoroutine())) + + runtime.ReadMemStats(memStats[location1]) + memPauses.Mark(int64(memStats[location1].PauseTotalNs - memStats[location2].PauseTotalNs)) + memAlloc.Mark(int64(memStats[location1].Mallocs - memStats[location2].Mallocs)) + memFrees.Mark(int64(memStats[location1].Frees - memStats[location2].Frees)) + memHeld.Update(int64(memStats[location1].HeapSys - memStats[location1].HeapReleased)) + memUsed.Update(int64(memStats[location1].Alloc)) + } + } } diff --git a/dot/metrics/metrics.go b/dot/metrics/metrics.go index 1e9840e0cc..e94f98157f 100644 --- a/dot/metrics/metrics.go +++ b/dot/metrics/metrics.go @@ -17,60 +17,36 @@ package metrics import ( - "runtime" + "fmt" + "net/http" "time" - "github.com/ethereum/go-ethereum/metrics" + log "github.com/ChainSafe/log15" + ethmetrics "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/metrics/prometheus" ) +var logger = log.New("pkg", "metrics") + const ( // Refresh is the refresh time for publishing metrics. Refresh = time.Second * 10 refreshFreq = int64(Refresh / time.Second) ) -// CollectProcessMetrics periodically collects various metrics about the running process. -func CollectProcessMetrics() { - // Create the various data collectors - cpuStats := make([]*metrics.CPUStats, 2) - memStats := make([]*runtime.MemStats, 2) - for i := 0; i < len(memStats); i++ { - cpuStats[i] = new(metrics.CPUStats) - memStats[i] = new(runtime.MemStats) - } - - // Define the various metrics to collect - var ( - cpuSysLoad = metrics.GetOrRegisterGauge("system/cpu/sysload", metrics.DefaultRegistry) - cpuSysWait = metrics.GetOrRegisterGauge("system/cpu/syswait", metrics.DefaultRegistry) - cpuProcLoad = metrics.GetOrRegisterGauge("system/cpu/procload", metrics.DefaultRegistry) - cpuGoroutines = metrics.GetOrRegisterGauge("system/cpu/goroutines", metrics.DefaultRegistry) - - memPauses = metrics.GetOrRegisterMeter("system/memory/pauses", metrics.DefaultRegistry) - memAlloc = metrics.GetOrRegisterMeter("system/memory/allocs", metrics.DefaultRegistry) - memFrees = metrics.GetOrRegisterMeter("system/memory/frees", metrics.DefaultRegistry) - memHeld = metrics.GetOrRegisterGauge("system/memory/held", metrics.DefaultRegistry) - memUsed = metrics.GetOrRegisterGauge("system/memory/used", metrics.DefaultRegistry) - ) - - // Iterate loading the different stats and updating the meters - for i := 1; ; i++ { - location1 := i % 2 - location2 := (i - 1) % 2 - - metrics.ReadCPUStats(cpuStats[location1]) - cpuSysLoad.Update((cpuStats[location1].GlobalTime - cpuStats[location2].GlobalTime) / refreshFreq) - cpuSysWait.Update((cpuStats[location1].GlobalWait - cpuStats[location2].GlobalWait) / refreshFreq) - cpuProcLoad.Update((cpuStats[location1].LocalTime - cpuStats[location2].LocalTime) / refreshFreq) - cpuGoroutines.Update(int64(runtime.NumGoroutine())) - - runtime.ReadMemStats(memStats[location1]) - memPauses.Mark(int64(memStats[location1].PauseTotalNs - memStats[location2].PauseTotalNs)) - memAlloc.Mark(int64(memStats[location1].Mallocs - memStats[location2].Mallocs)) - memFrees.Mark(int64(memStats[location1].Frees - memStats[location2].Frees)) - memHeld.Update(int64(memStats[location1].HeapSys - memStats[location1].HeapReleased)) - memUsed.Update(int64(memStats[location1].Alloc)) +func PublishMetrics(address string) { + ethmetrics.Enabled = true + setupMetricsServer(address) +} - time.Sleep(Refresh) - } +// setupMetricsServer starts a dedicated metrics server at the given address. +func setupMetricsServer(address string) { + m := http.NewServeMux() + m.Handle("/metrics", prometheus.Handler(ethmetrics.DefaultRegistry)) + logger.Info("Starting metrics server", "addr", fmt.Sprintf("http://%s/metrics", address)) + go func() { + if err := http.ListenAndServe(address, m); err != nil { + log.Error("Failure in running metrics server", "err", err) + } + }() } diff --git a/dot/node.go b/dot/node.go index 016436a866..8042501fb9 100644 --- a/dot/node.go +++ b/dot/node.go @@ -17,8 +17,8 @@ package dot import ( + "context" "fmt" - "net/http" "os" "os/signal" "path" @@ -28,7 +28,7 @@ import ( "syscall" "time" - gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" + "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/dot/state/pruner" @@ -39,8 +39,6 @@ import ( "github.com/ChainSafe/gossamer/lib/services" "github.com/ChainSafe/gossamer/lib/utils" log "github.com/ChainSafe/log15" - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/metrics/prometheus" ) var logger = log.New("pkg", "dot") @@ -330,7 +328,13 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, } if cfg.Global.PublishMetrics { - publishMetrics(cfg) + c := metrics.NewCollector(context.Background()) + c.AddGauge(stateSrvc) + c.Start() + + address := fmt.Sprintf("%s:%d", cfg.RPC.Host, cfg.Global.MetricsPort) + log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address) + metrics.PublishMetrics(address) } gd, err := stateSrvc.Base.LoadGenesisData() @@ -360,28 +364,6 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, return node, nil } -func publishMetrics(cfg *Config) { - metrics.Enabled = true - address := fmt.Sprintf("%s:%d", cfg.RPC.Host, cfg.Global.MetricsPort) - log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address) - setupMetricsServer(address) - - // Start system runtime metrics collection - go gssmrmetrics.CollectProcessMetrics() -} - -// setupMetricsServer starts a dedicated metrics server at the given address. -func setupMetricsServer(address string) { - m := http.NewServeMux() - m.Handle("/metrics", prometheus.Handler(metrics.DefaultRegistry)) - log.Info("Starting metrics server", "addr", fmt.Sprintf("http://%s/metrics", address)) - go func() { - if err := http.ListenAndServe(address, m); err != nil { - log.Error("Failure in running metrics server", "err", err) - } - }() -} - // stores the global node name to reuse func storeGlobalNodeName(name, basepath string) (err error) { db, err := utils.SetupDatabase(basepath, false) diff --git a/dot/state/service.go b/dot/state/service.go index fd187d6865..21da65d84c 100644 --- a/dot/state/service.go +++ b/dot/state/service.go @@ -33,6 +33,9 @@ import ( log "github.com/ChainSafe/log15" ) +const readyPoolTransactionsMetrics = "gossamer/ready/pool/transaction/metrics" +const readyPriorityQueueTransactions = "gossamer/ready/queue/transaction/metrics" + var logger = log.New("pkg", "state") // Service is the struct that holds storage, block and network states @@ -396,3 +399,10 @@ func (s *Service) Import(header *types.Header, t *trie.Trie, firstSlot uint64) e return s.db.Close() } + +func (s *Service) CollectGauge() map[string]int64 { + return map[string]int64{ + readyPoolTransactionsMetrics: int64(s.Transaction.pool.Len()), + readyPriorityQueueTransactions: int64(s.Transaction.queue.Len()), + } +} diff --git a/dot/state/service_test.go b/dot/state/service_test.go index 70d0a0d48d..4b780481b3 100644 --- a/dot/state/service_test.go +++ b/dot/state/service_test.go @@ -17,21 +17,25 @@ package state import ( + "context" "fmt" "io/ioutil" "math/big" "testing" "time" + "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/state/pruner" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" + "github.com/ChainSafe/gossamer/lib/transaction" "github.com/ChainSafe/gossamer/lib/trie" "github.com/ChainSafe/gossamer/lib/utils" "github.com/ChainSafe/chaindb" log "github.com/ChainSafe/log15" + ethmetrics "github.com/ethereum/go-ethereum/metrics" "github.com/stretchr/testify/require" ) @@ -399,3 +403,53 @@ func TestService_Import(t *testing.T) { err = serv.Stop() require.NoError(t, err) } + +func TestStateServiceMetrics(t *testing.T) { + testDir := utils.NewTestDir(t) + defer utils.RemoveTestDir(t) + + config := Config{ + Path: testDir, + LogLevel: log.LvlInfo, + } + ethmetrics.Enabled = true + serv := NewService(config) + serv.Transaction = NewTransactionState() + + m := metrics.NewCollector(context.Background()) + m.AddGauge(serv) + m.Start() + + vtxs := []*transaction.ValidTransaction{ + { + Extrinsic: []byte("a"), + Validity: &transaction.Validity{Priority: 1}, + }, + { + Extrinsic: []byte("b"), + Validity: &transaction.Validity{Priority: 4}, + }, + } + + hashes := make([]common.Hash, len(vtxs)) + for i, v := range vtxs { + h := serv.Transaction.pool.Insert(v) + serv.Transaction.queue.Push(v) + + hashes[i] = h + } + + time.Sleep(time.Second + metrics.Refresh) + gpool := ethmetrics.GetOrRegisterGauge(readyPoolTransactionsMetrics, nil) + gqueue := ethmetrics.GetOrRegisterGauge(readyPriorityQueueTransactions, nil) + + require.Equal(t, int64(2), gpool.Value()) + require.Equal(t, int64(2), gqueue.Value()) + + serv.Transaction.pool.Remove(hashes[0]) + serv.Transaction.queue.Pop() + + time.Sleep(time.Second + metrics.Refresh) + require.Equal(t, int64(2), gpool.Value()) + require.Equal(t, int64(2), gqueue.Value()) +} diff --git a/lib/transaction/pool.go b/lib/transaction/pool.go index a23f7fbd8c..ae7fd38cda 100644 --- a/lib/transaction/pool.go +++ b/lib/transaction/pool.go @@ -18,15 +18,10 @@ package transaction import ( "sync" - "time" - "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/lib/common" ) -const collectTxMetricsTimeout = time.Second * 5 -const readyTransactionsMetrics = "gossamer/ready/transaction/metrics" - // Pool represents the transaction pool type Pool struct { transactions map[common.Hash]*ValidTransaction @@ -35,17 +30,9 @@ type Pool struct { // NewPool returns a new empty Pool func NewPool() *Pool { - p := &Pool{ + return &Pool{ transactions: make(map[common.Hash]*ValidTransaction), } - - go metrics.CollectGaugeMetrics( - collectTxMetricsTimeout, - readyTransactionsMetrics, - p, - ) - - return p } // Transactions returns all the transactions in the pool @@ -79,5 +66,7 @@ func (p *Pool) Remove(hash common.Hash) { delete(p.transactions, hash) } -// Update returns the total of valid transactions in the pool -func (p *Pool) Update() int64 { return int64(len(p.transactions)) } +// Len return the current length of the pool +func (p *Pool) Len() int { + return len(p.transactions) +} diff --git a/lib/transaction/pool_test.go b/lib/transaction/pool_test.go index eb86ebb116..8abbe4219a 100644 --- a/lib/transaction/pool_test.go +++ b/lib/transaction/pool_test.go @@ -3,12 +3,9 @@ package transaction import ( "sort" "testing" - "time" "github.com/ChainSafe/gossamer/lib/common" "github.com/stretchr/testify/require" - - ethmetrics "github.com/ethereum/go-ethereum/metrics" ) func TestPool(t *testing.T) { @@ -53,50 +50,3 @@ func TestPool(t *testing.T) { } require.Equal(t, 0, len(p.Transactions())) } - -func TestPoolCollectMetrics(t *testing.T) { - //reset metric - ethmetrics.Unregister(readyTransactionsMetrics) - - ethmetrics.Enabled = true - txmetrics := ethmetrics.GetOrRegisterGauge(readyTransactionsMetrics, nil) - - require.Equal(t, int64(0), txmetrics.Value()) - - validtx := []*ValidTransaction{ - { - Extrinsic: []byte("a"), - Validity: &Validity{Priority: 1}, - }, - { - Extrinsic: []byte("b"), - Validity: &Validity{Priority: 4}, - }, - { - Extrinsic: []byte("c"), - Validity: &Validity{Priority: 2}, - }, - { - Extrinsic: []byte("d"), - Validity: &Validity{Priority: 17}, - }, - { - Extrinsic: []byte("e"), - Validity: &Validity{Priority: 2}, - }, - } - - h := make([]common.Hash, len(validtx)) - p := NewPool() - for i, v := range validtx { - h[i] = p.Insert(v) - } - - time.Sleep(collectTxMetricsTimeout + time.Second) - require.Equal(t, int64(len(validtx)), txmetrics.Value()) - - p.Remove(h[0]) - - time.Sleep(collectTxMetricsTimeout + time.Second) - require.Equal(t, int64(len(validtx)-1), txmetrics.Value()) -} diff --git a/lib/transaction/priority_queue.go b/lib/transaction/priority_queue.go index d9de455f03..d900080bc6 100644 --- a/lib/transaction/priority_queue.go +++ b/lib/transaction/priority_queue.go @@ -21,13 +21,10 @@ import ( "errors" "sync" - "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" ) -const readyPriorityQueueTransactions = "gossamer/ready/transaction/metrics" - // ErrTransactionExists is returned when trying to add a transaction to the queue that already exists var ErrTransactionExists = errors.New("transaction is already in queue") @@ -98,12 +95,6 @@ func NewPriorityQueue() *PriorityQueue { txs: make(map[common.Hash]*Item), } - go metrics.CollectGaugeMetrics( - collectTxMetricsTimeout, - readyPriorityQueueTransactions, - spq, - ) - heap.Init(&spq.pq) return spq } @@ -182,5 +173,7 @@ func (spq *PriorityQueue) Pending() []*ValidTransaction { return txns } -// Update returns the total of valid transactions in the priority queue -func (spq *PriorityQueue) Update() int64 { return int64(spq.pq.Len()) } +// Len return the current length of the queue +func (spq *PriorityQueue) Len() int { + return spq.pq.Len() +} diff --git a/lib/transaction/priority_queue_test.go b/lib/transaction/priority_queue_test.go index d8a08eb3fb..3f9e4e72a0 100644 --- a/lib/transaction/priority_queue_test.go +++ b/lib/transaction/priority_queue_test.go @@ -19,10 +19,6 @@ package transaction import ( "reflect" "testing" - "time" - - ethmetrics "github.com/ethereum/go-ethereum/metrics" - "github.com/stretchr/testify/require" ) func TestPriorityQueue(t *testing.T) { @@ -49,13 +45,6 @@ func TestPriorityQueue(t *testing.T) { }, } - //check metrics - ethmetrics.Unregister(readyPriorityQueueTransactions) - ethmetrics.Enabled = true - - priorityQueueM := ethmetrics.GetOrRegisterGauge(readyPriorityQueueTransactions, nil) - require.Equal(t, int64(0), priorityQueueM.Value()) - pq := NewPriorityQueue() expected := []int{3, 1, 2, 4, 0} @@ -63,10 +52,6 @@ func TestPriorityQueue(t *testing.T) { pq.Push(node) } - // wait for metrics to be collected - time.Sleep(collectTxMetricsTimeout + time.Second) - require.Equal(t, int64(len(tests)), priorityQueueM.Value()) - for i, exp := range expected { n := pq.Pop() if !reflect.DeepEqual(n, tests[exp]) { @@ -75,10 +60,6 @@ func TestPriorityQueue(t *testing.T) { t.Fatalf("Fail: iteration %d got %v expected %v", i, n, tests[exp]) } } - - // wait for metrics to be collected - time.Sleep(collectTxMetricsTimeout + time.Second) - require.Equal(t, int64(pq.pq.Len()), priorityQueueM.Value()) } func TestPriorityQueueAgain(t *testing.T) { From f390373ed7154cb92b0fdff617b3bd024d2682fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Mon, 28 Jun 2021 14:07:02 -0400 Subject: [PATCH 07/12] chore: remove stop function and add comments to exported funcs --- dot/metrics/collector.go | 17 +++++++---------- dot/metrics/metrics.go | 1 + dot/state/service.go | 1 + 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go index 1998c424c5..bc8e813a9b 100644 --- a/dot/metrics/collector.go +++ b/dot/metrics/collector.go @@ -24,15 +24,19 @@ import ( ethmetrics "github.com/ethereum/go-ethereum/metrics" ) +// GaugeMetrics interface allows the exportation of many gauge metrics +// the implementer could exports type GaugeMetrics interface { CollectGauge() map[string]int64 } +// Collector struct controls the metrics and executes polling to extract the values type Collector struct { ctx context.Context gauges []GaugeMetrics } +// NewCollector creates a new Collector func NewCollector(ctx context.Context) *Collector { return &Collector{ ctx: ctx, @@ -40,21 +44,14 @@ func NewCollector(ctx context.Context) *Collector { } } +// Start will start one goroutine to collect all the gauges registereds and +// a separate goroutine to collect process metrics func (c *Collector) Start() { go c.startCollectProccessMetrics() go c.startCollectGauges() } -func (c *Collector) Stop() { - for _, g := range c.gauges { - m := g.CollectGauge() - - for label := range m { - ethmetrics.Unregister(label) - } - } -} - +// AddGauge adds a GaugeMetrics implementer on gauges list func (c *Collector) AddGauge(g GaugeMetrics) { c.gauges = append(c.gauges, g) } diff --git a/dot/metrics/metrics.go b/dot/metrics/metrics.go index e94f98157f..94c2067317 100644 --- a/dot/metrics/metrics.go +++ b/dot/metrics/metrics.go @@ -34,6 +34,7 @@ const ( refreshFreq = int64(Refresh / time.Second) ) +// PublishMetrics function will export the /metrics endpoint to prometheus process func PublishMetrics(address string) { ethmetrics.Enabled = true setupMetricsServer(address) diff --git a/dot/state/service.go b/dot/state/service.go index 73b7836aed..0bb239b35c 100644 --- a/dot/state/service.go +++ b/dot/state/service.go @@ -399,6 +399,7 @@ func (s *Service) Import(header *types.Header, t *trie.Trie, firstSlot uint64) e return s.db.Close() } +// CollectGauge exports 2 metrics related to valid transaction pool and queue func (s *Service) CollectGauge() map[string]int64 { return map[string]int64{ readyPoolTransactionsMetrics: int64(s.Transaction.pool.Len()), From 232ed98381f6a68adde4be6edcafc21f6da90239 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Mon, 28 Jun 2021 15:07:47 -0400 Subject: [PATCH 08/12] chore: adjust tests --- dot/state/service_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/state/service_test.go b/dot/state/service_test.go index fd420396f7..1bde24891d 100644 --- a/dot/state/service_test.go +++ b/dot/state/service_test.go @@ -451,6 +451,6 @@ func TestStateServiceMetrics(t *testing.T) { serv.Transaction.queue.Pop() time.Sleep(time.Second + metrics.Refresh) - require.Equal(t, int64(2), gpool.Value()) - require.Equal(t, int64(2), gqueue.Value()) + require.Equal(t, int64(1), gpool.Value()) + require.Equal(t, int64(1), gqueue.Value()) } From b394fc295b5540f7864c7b253894f797bceb83f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Tue, 29 Jun 2021 10:10:55 -0400 Subject: [PATCH 09/12] chore: make metrics.Start blocking and using wg to inner gorout --- dot/metrics/collector.go | 17 +++++++++++++++-- dot/node.go | 3 ++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go index bc8e813a9b..128c2e4d30 100644 --- a/dot/metrics/collector.go +++ b/dot/metrics/collector.go @@ -19,6 +19,7 @@ package metrics import ( "context" "runtime" + "sync" "time" ethmetrics "github.com/ethereum/go-ethereum/metrics" @@ -34,12 +35,14 @@ type GaugeMetrics interface { type Collector struct { ctx context.Context gauges []GaugeMetrics + wg sync.WaitGroup } // NewCollector creates a new Collector func NewCollector(ctx context.Context) *Collector { return &Collector{ ctx: ctx, + wg: sync.WaitGroup{}, gauges: make([]GaugeMetrics, 0), } } @@ -47,8 +50,12 @@ func NewCollector(ctx context.Context) *Collector { // Start will start one goroutine to collect all the gauges registereds and // a separate goroutine to collect process metrics func (c *Collector) Start() { + c.wg.Add(2) + go c.startCollectProccessMetrics() go c.startCollectGauges() + + c.wg.Wait() } // AddGauge adds a GaugeMetrics implementer on gauges list @@ -60,7 +67,10 @@ func (c *Collector) startCollectGauges() { ethmetrics.Enabled = true t := time.NewTicker(Refresh) - defer t.Stop() + defer func() { + t.Stop() + c.wg.Done() + }() for { select { @@ -104,7 +114,10 @@ func (c *Collector) startCollectProccessMetrics() { ) t := time.NewTicker(Refresh) - defer t.Stop() + defer func() { + t.Stop() + c.wg.Done() + }() for i := 1; ; i++ { select { diff --git a/dot/node.go b/dot/node.go index eb4ca8424c..101edc3b8e 100644 --- a/dot/node.go +++ b/dot/node.go @@ -332,7 +332,8 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, if cfg.Global.PublishMetrics { c := metrics.NewCollector(context.Background()) c.AddGauge(stateSrvc) - c.Start() + + go c.Start() address := fmt.Sprintf("%s:%d", cfg.RPC.Host, cfg.Global.MetricsPort) log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address) From 533bc6b70e5bc459b0cf3013cf53073a0e248869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Tue, 29 Jun 2021 10:11:21 -0400 Subject: [PATCH 10/12] chore: fix typo --- dot/metrics/collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go index 128c2e4d30..f656438dcc 100644 --- a/dot/metrics/collector.go +++ b/dot/metrics/collector.go @@ -47,7 +47,7 @@ func NewCollector(ctx context.Context) *Collector { } } -// Start will start one goroutine to collect all the gauges registereds and +// Start will start one goroutine to collect all the gauges registered and // a separate goroutine to collect process metrics func (c *Collector) Start() { c.wg.Add(2) From 63bb7abf489041e6c9f29887b66cdb0bbee65866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Wed, 30 Jun 2021 21:39:46 -0400 Subject: [PATCH 11/12] chore: fix metrics test --- dot/state/service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/state/service_test.go b/dot/state/service_test.go index 1bde24891d..00e7258dcf 100644 --- a/dot/state/service_test.go +++ b/dot/state/service_test.go @@ -419,7 +419,7 @@ func TestStateServiceMetrics(t *testing.T) { m := metrics.NewCollector(context.Background()) m.AddGauge(serv) - m.Start() + go m.Start() vtxs := []*transaction.ValidTransaction{ { From 6930c63f9d124c68e4f00f9777dbb8737b971743 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 1 Jul 2021 10:12:25 -0400 Subject: [PATCH 12/12] fix: solving data race condition on metrics tests --- dot/metrics/collector.go | 5 +---- lib/transaction/pool.go | 3 +++ lib/transaction/priority_queue.go | 3 +++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go index f656438dcc..eec1a93254 100644 --- a/dot/metrics/collector.go +++ b/dot/metrics/collector.go @@ -50,6 +50,7 @@ func NewCollector(ctx context.Context) *Collector { // Start will start one goroutine to collect all the gauges registered and // a separate goroutine to collect process metrics func (c *Collector) Start() { + ethmetrics.Enabled = true c.wg.Add(2) go c.startCollectProccessMetrics() @@ -64,8 +65,6 @@ func (c *Collector) AddGauge(g GaugeMetrics) { } func (c *Collector) startCollectGauges() { - ethmetrics.Enabled = true - t := time.NewTicker(Refresh) defer func() { t.Stop() @@ -90,8 +89,6 @@ func (c *Collector) startCollectGauges() { } func (c *Collector) startCollectProccessMetrics() { - ethmetrics.Enabled = true - cpuStats := make([]*ethmetrics.CPUStats, 2) memStats := make([]*runtime.MemStats, 2) for i := 0; i < len(memStats); i++ { diff --git a/lib/transaction/pool.go b/lib/transaction/pool.go index ae7fd38cda..1f9c3f8944 100644 --- a/lib/transaction/pool.go +++ b/lib/transaction/pool.go @@ -68,5 +68,8 @@ func (p *Pool) Remove(hash common.Hash) { // Len return the current length of the pool func (p *Pool) Len() int { + p.mu.Lock() + defer p.mu.Unlock() + return len(p.transactions) } diff --git a/lib/transaction/priority_queue.go b/lib/transaction/priority_queue.go index d900080bc6..538d72bc17 100644 --- a/lib/transaction/priority_queue.go +++ b/lib/transaction/priority_queue.go @@ -175,5 +175,8 @@ func (spq *PriorityQueue) Pending() []*ValidTransaction { // Len return the current length of the queue func (spq *PriorityQueue) Len() int { + spq.Lock() + defer spq.Unlock() + return spq.pq.Len() }