From 5fc4838891d6a2537aac230b5fb032c49330407b Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 11 Jul 2024 00:02:50 +0800 Subject: [PATCH] fix: Prevent dispatcher merging if curTs is 0 (#34562) When the main dispatcher has not yet consumed data, curTs is 0. During this time, merging dispatchers should not be allowed; otherwise, the data of the solo dispatcher will be skipped. issue: https://github.com/milvus-io/milvus/issues/34255 --------- Signed-off-by: bigsheeper --- pkg/mq/msgdispatcher/manager.go | 2 +- pkg/mq/msgdispatcher/manager_test.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/mq/msgdispatcher/manager.go b/pkg/mq/msgdispatcher/manager.go index ecd3d079aeb3..ddaa1c642777 100644 --- a/pkg/mq/msgdispatcher/manager.go +++ b/pkg/mq/msgdispatcher/manager.go @@ -183,7 +183,7 @@ func (c *dispatcherManager) tryMerge() { c.mu.Lock() defer c.mu.Unlock() - if c.mainDispatcher == nil { + if c.mainDispatcher == nil || c.mainDispatcher.CurTs() == 0 { return } candidates := make(map[string]struct{}) diff --git a/pkg/mq/msgdispatcher/manager_test.go b/pkg/mq/msgdispatcher/manager_test.go index 621767dd6e42..79ee3399a6b8 100644 --- a/pkg/mq/msgdispatcher/manager_test.go +++ b/pkg/mq/msgdispatcher/manager_test.go @@ -71,6 +71,12 @@ func TestManager(t *testing.T) { _, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) assert.Equal(t, 3, c.Num()) + c.(*dispatcherManager).mainDispatcher.curTs.Store(1000) + c.(*dispatcherManager).mu.RLock() + for _, d := range c.(*dispatcherManager).soloDispatchers { + d.curTs.Store(1000) + } + c.(*dispatcherManager).mu.RUnlock() c.(*dispatcherManager).tryMerge() assert.Equal(t, 1, c.Num()) @@ -96,6 +102,12 @@ func TestManager(t *testing.T) { _, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) assert.Equal(t, 3, c.Num()) + c.(*dispatcherManager).mainDispatcher.curTs.Store(1000) + c.(*dispatcherManager).mu.RLock() + for _, d := range c.(*dispatcherManager).soloDispatchers { + d.curTs.Store(1000) + } + c.(*dispatcherManager).mu.RUnlock() CheckPeriod = 10 * time.Millisecond go c.Run()