Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger: rearrange blockqueue start/stop #4964

Merged
merged 9 commits into from Jan 19, 2023
21 changes: 0 additions & 21 deletions ledger/archival_test.go
Expand Up @@ -29,8 +29,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand Down Expand Up @@ -192,13 +190,6 @@ func TestArchivalRestart(t *testing.T) {

// Start in archival mode, add 2K blocks, restart, ensure all blocks are there

// disable deadlock checking code
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
deadlockDisable := deadlock.Opts.Disable
deadlock.Opts.Disable = true
defer func() {
deadlock.Opts.Disable = deadlockDisable
}()

dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
dbPrefix := filepath.Join(t.TempDir(), dbName)

Expand Down Expand Up @@ -339,13 +330,6 @@ func TestArchivalCreatables(t *testing.T) {
// restart, ensure all assets are there in index unless they were
// deleted

// disable deadlock checking code
deadlockDisable := deadlock.Opts.Disable
deadlock.Opts.Disable = true
defer func() {
deadlock.Opts.Disable = deadlockDisable
}()

dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
dbPrefix := filepath.Join(t.TempDir(), dbName)

Expand Down Expand Up @@ -691,11 +675,6 @@ func TestArchivalFromNonArchival(t *testing.T) {
partitiontest.PartitionTest(t)

// Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there
deadlockDisable := deadlock.Opts.Disable
deadlock.Opts.Disable = true
defer func() {
deadlock.Opts.Disable = deadlockDisable
}()

dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
dbPrefix := filepath.Join(t.TempDir(), dbName)
Expand Down
71 changes: 44 additions & 27 deletions ledger/blockqueue.go
Expand Up @@ -52,55 +52,72 @@ type blockQueue struct {
closed chan struct{}
}

func bqInit(l *Ledger) (*blockQueue, error) {
func newBlockQueue(l *Ledger) (*blockQueue, error) {
bq := &blockQueue{}
bq.cond = sync.NewCond(&bq.mu)
bq.l = l
bq.running = true
bq.closed = make(chan struct{})
ledgerBlockqInitCount.Inc(nil)
start := time.Now()
err := bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
bq.lastCommitted, err0 = blockdb.BlockLatest(tx)
return err0
})
ledgerBlockqInitMicros.AddMicrosecondsSince(start, nil)
if err != nil {
return nil, err
}

go bq.syncer()
return bq, nil
}

func (bq *blockQueue) close() {
func (bq *blockQueue) start() error {
bq.mu.Lock()
defer func() {
bq.mu.Unlock()
// we want to block here until the sync go routine is done.
// it's not (just) for the sake of a complete cleanup, but rather
// to ensure that the sync goroutine isn't busy in a notifyCommit
// call which might be blocked inside one of the trackers.
<-bq.closed
}()
defer bq.mu.Unlock()

if !bq.running {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if !bq.running {
if bq.running {
// log a warning, we should not be restarting running block queue
return
}

if bq.closed != nil {
// a previus close() is still waiting on a previous syncer() to finish
oldsyncer := bq.closed
bq.mu.Unlock()
<-oldsyncer
bq.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test for this?

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If bq.closed is not set to nil, this can be simplified:

Suggested change
if bq.closed != nil {
// a previus close() is still waiting on a previous syncer() to finish
oldsyncer := bq.closed
bq.mu.Unlock()
<-oldsyncer
bq.mu.Lock()
}
// a previus close() is still waiting on a previous syncer() to finish
bq.mu.Unlock()
<-bq.closed
bq.mu.Lock()

bq.running = true
bq.closed = make(chan struct{})
ledgerBlockqInitCount.Inc(nil)
start := time.Now()
err := bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
bq.lastCommitted, err0 = blockdb.BlockLatest(tx)
return err0
})
ledgerBlockqInitMicros.AddMicrosecondsSince(start, nil)
if err != nil {
return err
}

go bq.syncer()
}
return nil
}

func (bq *blockQueue) stop() {
bq.mu.Lock()
closechan := bq.closed
if bq.running {
bq.running = false
bq.cond.Broadcast()
}

bq.mu.Unlock()

// we want to block here until the sync go routine is done.
// it's not (just) for the sake of a complete cleanup, but rather
// to ensure that the sync goroutine isn't busy in a notifyCommit
// call which might be blocked inside one of the trackers.
if closechan != nil {
<-closechan
}
}

func (bq *blockQueue) syncer() {
defer close(bq.closed)
bq.mu.Lock()
for {
for bq.running && len(bq.q) == 0 {
bq.cond.Wait()
}

if !bq.running {
close(bq.closed)
bq.closed = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is setting this to nil necessary? Keep it closed, and replace it with a new one when restarted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiving from nil blocks forever, and creating a chan and immediately closing it feels silly, so I like the nil checking

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my suggestion, I expect that none of what you arguing should happen.
I must be missing something, can you please point to me when:

  1. bq.closed will be nil
  2. where bq.closed will be created and immediately closed

bq.mu.Unlock()
return
}
Expand Down
4 changes: 2 additions & 2 deletions ledger/double_test.go
Expand Up @@ -152,8 +152,8 @@ func (dl *DoubleLedger) fundedApp(sender basics.Address, amount uint64, source s
}

func (dl *DoubleLedger) reloadLedgers() {
require.NoError(dl.t, dl.generator.ReloadLedger())
require.NoError(dl.t, dl.validator.ReloadLedger())
require.NoError(dl.t, dl.generator.reloadLedger())
require.NoError(dl.t, dl.validator.reloadLedger())
}

func checkBlock(t *testing.T, checkLedger *Ledger, vb *ledgercore.ValidatedBlock) {
Expand Down
21 changes: 9 additions & 12 deletions ledger/ledger.go
Expand Up @@ -160,6 +160,11 @@ func OpenLedger(
l.genesisAccounts = make(map[basics.Address]basics.AccountData)
}

l.blockQ, err = newBlockQueue(l)
if err != nil {
return nil, err
}

err = l.reloadLedger()
if err != nil {
return nil, err
Expand All @@ -168,19 +173,12 @@ func OpenLedger(
return l, nil
}

// ReloadLedger is exported for the benefit of tests in the internal
// package. Revisit this when we rename / restructure that thing
func (l *Ledger) ReloadLedger() error {
return l.reloadLedger()
}

func (l *Ledger) reloadLedger() error {
// similar to the Close function, we want to start by closing the blockQ first. The
// blockQ is having a sync goroutine which indirectly calls other trackers. We want to eliminate that go-routine first,
// and follow up by taking the trackers lock.
if l.blockQ != nil {
l.blockQ.close()
l.blockQ = nil
l.blockQ.stop()
}

// take the trackers lock. This would ensure that no other goroutine is using the trackers.
Expand All @@ -192,9 +190,9 @@ func (l *Ledger) reloadLedger() error {

// init block queue
var err error
l.blockQ, err = bqInit(l)
err = l.blockQ.start()
if err != nil {
err = fmt.Errorf("reloadLedger.bqInit %v", err)
err = fmt.Errorf("reloadLedger.blockQ.start %v", err)
return err
}

Expand Down Expand Up @@ -381,8 +379,7 @@ func (l *Ledger) Close() {
// we shut the the blockqueue first, since it's sync goroutine dispatches calls
// back to the trackers.
if l.blockQ != nil {
l.blockQ.close()
l.blockQ = nil
l.blockQ.stop()
}

// take the trackers lock. This would ensure that no other goroutine is using the trackers.
Expand Down
2 changes: 1 addition & 1 deletion ledger/ledger_test.go
Expand Up @@ -1557,7 +1557,7 @@ func TestWaitLedgerReload(t *testing.T) {
waitRound := l.Latest() + 1
waitChannel := l.Wait(waitRound)

err = l.ReloadLedger()
err = l.reloadLedger()
a.NoError(err)
triggerTrackerFlush(t, l, genesisInitState)

Expand Down