Skip to content

Commit

Permalink
Merge pull request #1255 from abhinavdangeti/scorch-persister-cb
Browse files Browse the repository at this point in the history
Preserve persistedCallbacks of an unpersisted snapshot until needed
  • Loading branch information
abhinavdangeti committed Jul 9, 2019
2 parents f9e4034 + 732150d commit 4a7e6fb
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions index/scorch/persister.go
Expand Up @@ -90,6 +90,9 @@ func (s *Scorch) persisterLoop() {
var persistWatchers []*epochWatcher
var lastPersistedEpoch, lastMergedEpoch uint64
var ew *epochWatcher

var unpersistedCallbacks []index.BatchCallback

po, err := s.parsePersisterOptions()
if err != nil {
s.fireAsyncError(fmt.Errorf("persisterOptions json parsing err: %v", err))
Expand Down Expand Up @@ -149,11 +152,25 @@ OUTER:
_ = ourSnapshot.DecRef()
break OUTER
}

// save this current snapshot's persistedCallbacks, to invoke during
// the retry attempt
unpersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...)

s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
_ = ourSnapshot.DecRef()
atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)
continue OUTER
}

if unpersistedCallbacks != nil {
// in the event of this being a retry attempt for persisting a snapshot
// that had earlier failed, prepend the persistedCallbacks associated
// with earlier segment(s) to the latest persistedCallbacks
ourPersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...)
unpersistedCallbacks = nil
}

for i := range ourPersistedCallbacks {
ourPersistedCallbacks[i](err)
}
Expand Down

0 comments on commit 4a7e6fb

Please sign in to comment.