Skip to content

Commit

Permalink
fix(embedded/store): fix indexing data race
Browse files Browse the repository at this point in the history
Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>
  • Loading branch information
jeroiraz committed May 11, 2021
1 parent 68745cc commit 683b23c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 9 deletions.
13 changes: 5 additions & 8 deletions embedded/store/indexer.go
Expand Up @@ -203,22 +203,19 @@ func (idx *indexer) CompactIndex() (err error) {
func (idx *indexer) stop() {
idx.stateCond.L.Lock()
idx.state = stopped
close(idx.cancellation)
idx.stateCond.L.Unlock()
idx.stateCond.Signal()

close(idx.cancellation)

idx.store.notify(Info, true, "Indexing gracefully stopped at '%s'", idx.store.path)
}

func (idx *indexer) resume() {
idx.stateCond.L.Lock()
idx.state = running
idx.stateCond.L.Unlock()

idx.cancellation = make(chan struct{})

go idx.doIndexing()
go idx.doIndexing(idx.cancellation)
idx.stateCond.L.Unlock()

idx.store.notify(Info, true, "Indexing in progress at '%s'", idx.store.path)
}
Expand Down Expand Up @@ -278,15 +275,15 @@ func (idx *indexer) Pause() {
idx.stateCond.L.Unlock()
}

func (idx *indexer) doIndexing() {
func (idx *indexer) doIndexing(cancellation <-chan struct{}) {
for {
lastIndexedTx := idx.index.Ts()

if idx.wHub != nil {
idx.wHub.DoneUpto(lastIndexedTx)
}

err := idx.store.wHub.WaitFor(lastIndexedTx+1, idx.cancellation)
err := idx.store.wHub.WaitFor(lastIndexedTx+1, cancellation)
if err == watchers.ErrCancellationRequested || err == watchers.ErrAlreadyClosed {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/database/all_ops_test.go
Expand Up @@ -92,7 +92,7 @@ func TestConcurrentCompactIndex(t *testing.T) {
case <-ticker.C:
{
err := compactIndex(db, cleanUpTimeout)
if err != nil {
if err != nil && err != store.ErrAlreadyClosed {
panic(err)
}
}
Expand Down

0 comments on commit 683b23c

Please sign in to comment.