Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lib/transactions) ready transactions metrics #1656

Merged
merged 19 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
681cfaf
pool ready transaction metrics
EclesioMeloJunior Jun 24, 2021
5f62d09
chore: add priority queue metrics
EclesioMeloJunior Jun 24, 2021
365b8d7
Merge branch 'development' into eclesio/ready-txs-metrics
EclesioMeloJunior Jun 24, 2021
34b7c33
chore: fix lint
EclesioMeloJunior Jun 24, 2021
cc20c9f
chore: remove println
EclesioMeloJunior Jun 24, 2021
f864c3e
Merge branch 'development' into eclesio/ready-txs-metrics
EclesioMeloJunior Jun 24, 2021
4f15bd1
chore: add copyright comments
EclesioMeloJunior Jun 24, 2021
8227b2f
chore: resolve comments
EclesioMeloJunior Jun 28, 2021
38a7ef7
Merge branch 'eclesio/ready-txs-metrics' of github.com:ChainSafe/goss…
EclesioMeloJunior Jun 28, 2021
f390373
chore: remove stop function and add comments to exported funcs
EclesioMeloJunior Jun 28, 2021
232ed98
chore: adjust tests
EclesioMeloJunior Jun 28, 2021
c7e9fb4
Merge branch 'development' into eclesio/ready-txs-metrics
EclesioMeloJunior Jun 29, 2021
b394fc2
chore: make metrics.Start blocking and using wg to inner gorout
EclesioMeloJunior Jun 29, 2021
533bc6b
chore: fix typo
EclesioMeloJunior Jun 29, 2021
2f695b1
Merge branch 'eclesio/ready-txs-metrics' of github.com:ChainSafe/goss…
EclesioMeloJunior Jun 29, 2021
8faa274
Merge branch 'development' into eclesio/ready-txs-metrics
EclesioMeloJunior Jun 30, 2021
63bb7ab
chore: fix metrics test
EclesioMeloJunior Jul 1, 2021
6930c63
fix: solving data race condition on metrics tests
EclesioMeloJunior Jul 1, 2021
e9bb260
Merge branch 'development' into eclesio/ready-txs-metrics
EclesioMeloJunior Jul 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions dot/metrics/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package metrics

import (
"time"

ethmetrics "github.com/ethereum/go-ethereum/metrics"
)

// GaugeCollector abstracts the Update function to collectors
// just implement the collection
type GaugeCollector interface {
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
Update() int64
}

// CollectGaugeMetrics receives an timeout, label and a gauge collector
// and acquire the metrics timeout by timeout to a ethereum metrics gauge
func CollectGaugeMetrics(timeout time.Duration, label string, c GaugeCollector) {
t := time.NewTicker(timeout)
defer t.Stop()

collectGauge(label, c)

for range t.C {
collectGauge(label, c)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a context here? otherwise I don't see where this is stopped

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added context on all loops that get metrics

}

func collectGauge(label string, c GaugeCollector) {
ethmetrics.Enabled = true
pooltx := ethmetrics.GetOrRegisterGauge(label, nil)
pooltx.Update(c.Update())
}
18 changes: 17 additions & 1 deletion lib/transaction/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package transaction

import (
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/lib/common"
)

const collectTxMetricsTimeout = time.Second * 5
const readyTransactionsMetrics = "gossamer/ready/transaction/metrics"

// Pool represents the transaction pool
type Pool struct {
transactions map[common.Hash]*ValidTransaction
Expand All @@ -14,9 +19,17 @@ type Pool struct {

// NewPool returns a new empty Pool
func NewPool() *Pool {
return &Pool{
p := &Pool{
transactions: make(map[common.Hash]*ValidTransaction),
}

go metrics.CollectGaugeMetrics(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nicer if this was called outside of the NewPool constructor, and we could have some polling process to iterate over all the GaugeCollector implementations, rather than having multiple goroutines each polling their own gauge.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! I've implemented a Collector, that acts like a harvester, which has an AddGauge and a Start methods, then the AddGauge will "register" all the GaugeMetrics implementations so when Start is called (when the node starts) it triggers a goroutine that does polling in all the registereds implementations.

I think with this way we can remove all the spread goroutines and centralize in one in the metrics package.

collectTxMetricsTimeout,
readyTransactionsMetrics,
p,
)

return p
}

// Transactions returns all the transactions in the pool
Expand Down Expand Up @@ -49,3 +62,6 @@ func (p *Pool) Remove(hash common.Hash) {
defer p.mu.Unlock()
delete(p.transactions, hash)
}

// Update returns the total of valid transactions in the pool
func (p *Pool) Update() int64 { return int64(len(p.transactions)) }
50 changes: 50 additions & 0 deletions lib/transaction/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package transaction
import (
"sort"
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/stretchr/testify/require"

ethmetrics "github.com/ethereum/go-ethereum/metrics"
)

func TestPool(t *testing.T) {
Expand Down Expand Up @@ -50,3 +53,50 @@ func TestPool(t *testing.T) {
}
require.Equal(t, 0, len(p.Transactions()))
}

func TestPoolCollectMetrics(t *testing.T) {
//reset metric
ethmetrics.Unregister(readyTransactionsMetrics)

ethmetrics.Enabled = true
txmetrics := ethmetrics.GetOrRegisterGauge(readyTransactionsMetrics, nil)

require.Equal(t, int64(0), txmetrics.Value())

validtx := []*ValidTransaction{
{
Extrinsic: []byte("a"),
Validity: &Validity{Priority: 1},
},
{
Extrinsic: []byte("b"),
Validity: &Validity{Priority: 4},
},
{
Extrinsic: []byte("c"),
Validity: &Validity{Priority: 2},
},
{
Extrinsic: []byte("d"),
Validity: &Validity{Priority: 17},
},
{
Extrinsic: []byte("e"),
Validity: &Validity{Priority: 2},
},
}

h := make([]common.Hash, len(validtx))
p := NewPool()
for i, v := range validtx {
h[i] = p.Insert(v)
}

time.Sleep(collectTxMetricsTimeout + time.Second)
require.Equal(t, int64(len(validtx)), txmetrics.Value())

p.Remove(h[0])

time.Sleep(collectTxMetricsTimeout + time.Second)
require.Equal(t, int64(len(validtx)-1), txmetrics.Value())
}
13 changes: 13 additions & 0 deletions lib/transaction/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"errors"
"sync"

"github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
)

const readyPriorityQueueTransactions = "gossamer/ready/transaction/metrics"

// ErrTransactionExists is returned when trying to add a transaction to the queue that already exists
var ErrTransactionExists = errors.New("transaction is already in queue")

Expand Down Expand Up @@ -94,6 +97,13 @@ func NewPriorityQueue() *PriorityQueue {
pq: make(priorityQueue, 0),
txs: make(map[common.Hash]*Item),
}

go metrics.CollectGaugeMetrics(
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
collectTxMetricsTimeout,
readyPriorityQueueTransactions,
spq,
)

heap.Init(&spq.pq)
return spq
}
Expand Down Expand Up @@ -171,3 +181,6 @@ func (spq *PriorityQueue) Pending() []*ValidTransaction {
}
return txns
}

// Update returns the total of valid transactions in the priority queue
func (spq *PriorityQueue) Update() int64 { return int64(spq.pq.Len()) }
19 changes: 19 additions & 0 deletions lib/transaction/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package transaction
import (
"reflect"
"testing"
"time"

ethmetrics "github.com/ethereum/go-ethereum/metrics"
"github.com/stretchr/testify/require"
)

func TestPriorityQueue(t *testing.T) {
Expand All @@ -45,13 +49,24 @@ func TestPriorityQueue(t *testing.T) {
},
}

//check metrics
ethmetrics.Unregister(readyPriorityQueueTransactions)
ethmetrics.Enabled = true

priorityQueueM := ethmetrics.GetOrRegisterGauge(readyPriorityQueueTransactions, nil)
require.Equal(t, int64(0), priorityQueueM.Value())

pq := NewPriorityQueue()
expected := []int{3, 1, 2, 4, 0}

for _, node := range tests {
pq.Push(node)
}

// wait for metrics to be collected
time.Sleep(collectTxMetricsTimeout + time.Second)
require.Equal(t, int64(len(tests)), priorityQueueM.Value())

for i, exp := range expected {
n := pq.Pop()
if !reflect.DeepEqual(n, tests[exp]) {
Expand All @@ -60,6 +75,10 @@ func TestPriorityQueue(t *testing.T) {
t.Fatalf("Fail: iteration %d got %v expected %v", i, n, tests[exp])
}
}

// wait for metrics to be collected
time.Sleep(collectTxMetricsTimeout + time.Second)
require.Equal(t, int64(pq.pq.Len()), priorityQueueM.Value())
}

func TestPriorityQueueAgain(t *testing.T) {
Expand Down