Skip to content

Commit

Permalink
MB-60945 - Add scorch event for pre merge check (#1971)
Browse files Browse the repository at this point in the history
This PR adds an event to check whether a merge can proceed based on
criteria set by the caller(eg. cbft). If not, it's aborted and the
merger loop continues its next iteration.
Also includes minor refactor to the function signature of the event
callback registry to accommodate the new event type.
  • Loading branch information
metonymic-smokey committed May 16, 2024
1 parent 5700950 commit 06449e9
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 6 deletions.
7 changes: 6 additions & 1 deletion index/scorch/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ var RegistryAsyncErrorCallbacks = map[string]func(error, string){}

// RegistryEventCallbacks should be treated as read-only after
// process init()'ialization.
var RegistryEventCallbacks = map[string]func(Event){}
// In the event of not having a callback, these return true.
var RegistryEventCallbacks = map[string]func(Event) bool{}

// Event represents the information provided in an OnEvent() callback.
type Event struct {
Expand Down Expand Up @@ -62,3 +63,7 @@ var EventKindMergeTaskIntroductionStart = EventKind(7)
// EventKindMergeTaskIntroduction is fired when the merger has completed
// the introduction of merged segment from a single merge task.
var EventKindMergeTaskIntroduction = EventKind(8)

// EventKindPreMergeCheck is fired before the merge begins to check if
// the caller should proceed with the merge.
var EventKindPreMergeCheck = EventKind(9)
3 changes: 2 additions & 1 deletion index/scorch/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ func TestEventBatchIntroductionStart(t *testing.T) {
}()

var count int
RegistryEventCallbacks["test"] = func(e Event) {
RegistryEventCallbacks["test"] = func(e Event) bool {
if e.Kind == EventKindBatchIntroductionStart {
count++
}
return true
}

ourConfig := make(map[string]interface{}, len(testConfig))
Expand Down
11 changes: 11 additions & 0 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ OUTER:
ctrlMsg = ctrlMsgDflt
}
if ctrlMsg != nil {
continueMerge := s.fireEvent(EventKindPreMergeCheck, 0)
// The default, if there's no handler, is to continue the merge.
if !continueMerge {
// If it's decided that this merge can't take place now,
// begin the merge process all over again.
// Retry instead of blocking/waiting here since a long wait
// can result in more segments introduced i.e. s.root will
// be updated.
continue OUTER
}

startTime := time.Now()

// lets get started
Expand Down
3 changes: 2 additions & 1 deletion index/scorch/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestObsoleteSegmentMergeIntroduction(t *testing.T) {
mergeIntroStart.Add(1)
mergeIntroComplete.Add(1)
var segIntroCompleted int
RegistryEventCallbacks["test"] = func(e Event) {
RegistryEventCallbacks["test"] = func(e Event) bool {
if e.Kind == EventKindBatchIntroduction {
segIntroCompleted++
if segIntroCompleted == 3 {
Expand All @@ -61,6 +61,7 @@ func TestObsoleteSegmentMergeIntroduction(t *testing.T) {
mergeIntroComplete.Done()

}
return true
}

ourConfig := make(map[string]interface{}, len(testConfig))
Expand Down
8 changes: 5 additions & 3 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Scorch struct {
rootBolt *bolt.DB
asyncTasks sync.WaitGroup

onEvent func(event Event)
onEvent func(event Event) bool
onAsyncError func(err error, path string)

forceMergeRequestCh chan *mergerCtrl
Expand Down Expand Up @@ -184,12 +184,14 @@ func (s *Scorch) NumEventsBlocking() uint64 {
return eventsStarted - eventsCompleted
}

func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) bool {
res := true
if s.onEvent != nil {
atomic.AddUint64(&s.stats.TotEventTriggerStarted, 1)
s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
res = s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur})
atomic.AddUint64(&s.stats.TotEventTriggerCompleted, 1)
}
return res
}

func (s *Scorch) fireAsyncError(err error) {
Expand Down

0 comments on commit 06449e9

Please sign in to comment.