Skip to content

Commit

Permalink
Merge acbf545 into 172962a
Browse files Browse the repository at this point in the history
  • Loading branch information
sreekanth-cb committed Mar 25, 2020
2 parents 172962a + acbf545 commit c41b949
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 43 deletions.
2 changes: 2 additions & 0 deletions index/scorch/introducer.go
Expand Up @@ -312,6 +312,8 @@ func (s *Scorch) introducePersist(persist *persistIntroduction) {
close(persist.applied)
}

// The introducer should definitely handle the segmentMerge.notify
// channel before exiting the introduceMerge.
func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
atomic.AddUint64(&s.stats.TotIntroduceMergeBeg, 1)
defer atomic.AddUint64(&s.stats.TotIntroduceMergeEnd, 1)
Expand Down
59 changes: 20 additions & 39 deletions index/scorch/merge.go
Expand Up @@ -157,22 +157,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(resultMergePlan.Tasks)))

// process tasks in serial for now
var notifications []chan *IndexSnapshot
var filenames []string
// clean up any pending notifications from introducer on exit
// from an index closure.
defer func() {
for _, notification := range notifications {
select {
case newSnapshot := <-notification:
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1)
if newSnapshot != nil {
_ = newSnapshot.DecRef()
}
default:
}
}
}()

for _, task := range resultMergePlan.Tasks {
if len(task.Segments) == 0 {
Expand Down Expand Up @@ -232,7 +217,6 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
if err == segment.ErrClosed {
// handle any pending index snapshot introduction notifications on exit
return err
}
return fmt.Errorf("merging failed: %v", err)
Expand Down Expand Up @@ -262,35 +246,33 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
old: oldMap,
oldNewDocNums: oldNewDocNums,
new: seg,
notify: make(chan *IndexSnapshot, 1),
notify: make(chan *IndexSnapshot),
}
notifications = append(notifications, sm.notify)

// give it to the introducer
select {
case <-s.closeCh:
// handle any pending index snapshot introduction notifications on exit
_ = seg.Close()
return segment.ErrClosed
case s.merges <- sm:
atomic.AddUint64(&s.stats.TotFileMergeIntroductions, 1)
}

atomic.AddUint64(&s.stats.TotFileMergePlanTasksDone, 1)
}

for _, notification := range notifications {
select {
case <-s.closeCh:
// handle any pending index snapshot introduction notifications on exit
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsSkipped, 1)
return segment.ErrClosed
case newSnapshot := <-notification:
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1)
if newSnapshot != nil {
_ = newSnapshot.DecRef()
}
introStartTime := time.Now()
// it is safe to blockingly wait for the merge introduction
// here as the introducer is bound to handle the notify channel.
newSnapshot := <-sm.notify
introTime := uint64(time.Since(introStartTime))
atomic.AddUint64(&s.stats.TotFileMergeZapIntroductionTime, introTime)
if atomic.LoadUint64(&s.stats.MaxFileMergeZapIntroductionTime) < introTime {
atomic.StoreUint64(&s.stats.MaxFileMergeZapIntroductionTime, introTime)
}
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1)
if newSnapshot != nil {
_ = newSnapshot.DecRef()
}

atomic.AddUint64(&s.stats.TotFileMergePlanTasksDone, 1)
}

// once all the newly merged segment introductions are done,
Expand Down Expand Up @@ -362,7 +344,7 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
old: make(map[uint64]*SegmentSnapshot),
oldNewDocNums: make(map[uint64][]uint64),
new: seg,
notify: make(chan *IndexSnapshot, 1),
notify: make(chan *IndexSnapshot),
}

for i, idx := range sbsIndexes {
Expand All @@ -378,14 +360,13 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
case s.merges <- sm:
}

select { // wait for introduction to complete
case <-s.closeCh:
return nil, 0, segment.ErrClosed
case newSnapshot := <-sm.notify:
// blockingly wait for the introduction to complete
newSnapshot := <-sm.notify
if newSnapshot != nil {
atomic.AddUint64(&s.stats.TotMemMergeSegments, uint64(len(sbs)))
atomic.AddUint64(&s.stats.TotMemMergeDone, 1)
return newSnapshot, newSegmentID, nil
}
return newSnapshot, newSegmentID, nil
}

func (s *Scorch) ReportBytesWritten(bytesWritten uint64) {
Expand Down
10 changes: 6 additions & 4 deletions index/scorch/stats.go
Expand Up @@ -98,10 +98,12 @@ type Stats struct {
TotFileSegmentsAtRoot uint64
TotFileMergeWrittenBytes uint64

TotFileMergeZapBeg uint64
TotFileMergeZapEnd uint64
TotFileMergeZapTime uint64
MaxFileMergeZapTime uint64
TotFileMergeZapBeg uint64
TotFileMergeZapEnd uint64
TotFileMergeZapTime uint64
MaxFileMergeZapTime uint64
TotFileMergeZapIntroductionTime uint64
MaxFileMergeZapIntroductionTime uint64

TotFileMergeIntroductions uint64
TotFileMergeIntroductionsDone uint64
Expand Down

0 comments on commit c41b949

Please sign in to comment.