Skip to content

Commit

Permalink
Add mutex/locks to ctx reads/writes in node
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric-Warehime committed May 5, 2022
1 parent b0c4e8a commit 4081315
Showing 1 changed file with 8 additions and 14 deletions.
22 changes: 8 additions & 14 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (status StatusReport) TimeSinceLastRound() time.Duration {

// AlgorandFullNode specifies and implements a full Algorand node.
type AlgorandFullNode struct {
mu deadlock.RWMutex
mu deadlock.Mutex
ctx context.Context
cancelCtx context.CancelFunc
config config.Local
Expand Down Expand Up @@ -407,11 +407,11 @@ func (node *AlgorandFullNode) startMonitoringRoutines() {

// PKI TODO: Remove this with #2596
// Periodically check for new participation keys
go node.checkForParticipationKeys()
go node.checkForParticipationKeys(node.ctx.Done())

go node.txPoolGaugeThread()
go node.txPoolGaugeThread(node.ctx.Done())
// Delete old participation keys
go node.oldKeyDeletionThread()
go node.oldKeyDeletionThread(node.ctx.Done())

// TODO re-enable with configuration flag post V1
//go logging.UsageLogThread(node.ctx, node.log, 100*time.Millisecond, nil)
Expand Down Expand Up @@ -782,7 +782,7 @@ func ensureParticipationDB(genesisDir string, log logging.Logger) (account.Parti
}

// Reload participation keys from disk periodically
func (node *AlgorandFullNode) checkForParticipationKeys() {
func (node *AlgorandFullNode) checkForParticipationKeys(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()
ticker := time.NewTicker(node.config.ParticipationKeysRefreshInterval)
for {
Expand All @@ -792,7 +792,7 @@ func (node *AlgorandFullNode) checkForParticipationKeys() {
if err != nil {
node.log.Errorf("Could not refresh participation keys: %v", err)
}
case <-node.ctx.Done():
case <-done:
ticker.Stop()
return
}
Expand Down Expand Up @@ -1032,14 +1032,11 @@ func insertStateProofToRegistry(part account.PersistedParticipation, node *Algor

var txPoolGuage = metrics.MakeGauge(metrics.MetricName{Name: "algod_tx_pool_count", Description: "current number of available transactions in pool"})

func (node *AlgorandFullNode) txPoolGaugeThread() {
func (node *AlgorandFullNode) txPoolGaugeThread(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for true {
node.mu.RLock()
done := node.ctx.Done()
node.mu.RUnlock()
select {
case <-ticker.C:
txPoolGuage.Set(float64(node.transactionPool.PendingCount()), nil)
Expand Down Expand Up @@ -1074,12 +1071,9 @@ func (node *AlgorandFullNode) OnNewBlock(block bookkeeping.Block, delta ledgerco
// oldKeyDeletionThread keeps deleting old participation keys.
// It runs in a separate thread so that, during catchup, we
// don't have to delete key for each block we received.
func (node *AlgorandFullNode) oldKeyDeletionThread() {
func (node *AlgorandFullNode) oldKeyDeletionThread(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()
for {
node.mu.RLock()
done := node.ctx.Done()
node.mu.RUnlock()
select {
case <-done:
return
Expand Down

0 comments on commit 4081315

Please sign in to comment.