From 06449e9fc4e290896dae10776e217ab374284665 Mon Sep 17 00:00:00 2001 From: Aditi Ahuja <48997495+metonymic-smokey@users.noreply.github.com> Date: Thu, 16 May 2024 09:56:36 +0530 Subject: [PATCH] MB-60945 - Add scorch event for pre merge check (#1971) This PR adds an event to check whether a merge can proceed based on criteria set by the caller(eg. cbft). If not, it's aborted and the merger loop continues its next iteration. Also includes minor refactor to the function signature of the event callback registry to accommodate the new event type. --- index/scorch/event.go | 7 ++++++- index/scorch/event_test.go | 3 ++- index/scorch/merge.go | 11 +++++++++++ index/scorch/merge_test.go | 3 ++- index/scorch/scorch.go | 8 +++++--- 5 files changed, 26 insertions(+), 6 deletions(-) diff --git a/index/scorch/event.go b/index/scorch/event.go index 31c9e80c9..0f653ccf4 100644 --- a/index/scorch/event.go +++ b/index/scorch/event.go @@ -22,7 +22,8 @@ var RegistryAsyncErrorCallbacks = map[string]func(error, string){} // RegistryEventCallbacks should be treated as read-only after // process init()'ialization. -var RegistryEventCallbacks = map[string]func(Event){} +// In the event of not having a callback, these return true. +var RegistryEventCallbacks = map[string]func(Event) bool{} // Event represents the information provided in an OnEvent() callback. type Event struct { @@ -62,3 +63,7 @@ var EventKindMergeTaskIntroductionStart = EventKind(7) // EventKindMergeTaskIntroduction is fired when the merger has completed // the introduction of merged segment from a single merge task. var EventKindMergeTaskIntroduction = EventKind(8) + +// EventKindPreMergeCheck is fired before the merge begins to check if +// the caller should proceed with the merge. +var EventKindPreMergeCheck = EventKind(9) diff --git a/index/scorch/event_test.go b/index/scorch/event_test.go index ed1dd4d95..be163a9e4 100644 --- a/index/scorch/event_test.go +++ b/index/scorch/event_test.go @@ -35,10 +35,11 @@ func TestEventBatchIntroductionStart(t *testing.T) { }() var count int - RegistryEventCallbacks["test"] = func(e Event) { + RegistryEventCallbacks["test"] = func(e Event) bool { if e.Kind == EventKindBatchIntroductionStart { count++ } + return true } ourConfig := make(map[string]interface{}, len(testConfig)) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 339ec5969..b74504ca1 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -72,6 +72,17 @@ OUTER: ctrlMsg = ctrlMsgDflt } if ctrlMsg != nil { + continueMerge := s.fireEvent(EventKindPreMergeCheck, 0) + // The default, if there's no handler, is to continue the merge. + if !continueMerge { + // If it's decided that this merge can't take place now, + // begin the merge process all over again. + // Retry instead of blocking/waiting here since a long wait + // can result in more segments introduced i.e. s.root will + // be updated. + continue OUTER + } + startTime := time.Now() // lets get started diff --git a/index/scorch/merge_test.go b/index/scorch/merge_test.go index b3783c6c3..ec7e68eac 100644 --- a/index/scorch/merge_test.go +++ b/index/scorch/merge_test.go @@ -41,7 +41,7 @@ func TestObsoleteSegmentMergeIntroduction(t *testing.T) { mergeIntroStart.Add(1) mergeIntroComplete.Add(1) var segIntroCompleted int - RegistryEventCallbacks["test"] = func(e Event) { + RegistryEventCallbacks["test"] = func(e Event) bool { if e.Kind == EventKindBatchIntroduction { segIntroCompleted++ if segIntroCompleted == 3 { @@ -61,6 +61,7 @@ func TestObsoleteSegmentMergeIntroduction(t *testing.T) { mergeIntroComplete.Done() } + return true } ourConfig := make(map[string]interface{}, len(testConfig)) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 7d691ff4f..7966d844d 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -77,7 +77,7 @@ type Scorch struct { rootBolt *bolt.DB asyncTasks sync.WaitGroup - onEvent func(event Event) + onEvent func(event Event) bool onAsyncError func(err error, path string) forceMergeRequestCh chan *mergerCtrl @@ -184,12 +184,14 @@ func (s *Scorch) NumEventsBlocking() uint64 { return eventsStarted - eventsCompleted } -func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) { +func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) bool { + res := true if s.onEvent != nil { atomic.AddUint64(&s.stats.TotEventTriggerStarted, 1) - s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur}) + res = s.onEvent(Event{Kind: kind, Scorch: s, Duration: dur}) atomic.AddUint64(&s.stats.TotEventTriggerCompleted, 1) } + return res } func (s *Scorch) fireAsyncError(err error) {