Skip to content

Commit

Permalink
fix: Prevent dispatcher merging if curTs is 0 (milvus-io#34562)
Browse files Browse the repository at this point in the history
When the main dispatcher has not yet consumed data, curTs is 0. During
this time, merging dispatchers should not be allowed; otherwise, the
data of the solo dispatcher will be skipped.

issue: milvus-io#34255

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Jul 12, 2024
1 parent d3d1920 commit 5fc4838
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/mq/msgdispatcher/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *dispatcherManager) tryMerge() {

c.mu.Lock()
defer c.mu.Unlock()
if c.mainDispatcher == nil {
if c.mainDispatcher == nil || c.mainDispatcher.CurTs() == 0 {
return
}
candidates := make(map[string]struct{})
Expand Down
12 changes: 12 additions & 0 deletions pkg/mq/msgdispatcher/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func TestManager(t *testing.T) {
_, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, 3, c.Num())
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
c.(*dispatcherManager).mu.RLock()
for _, d := range c.(*dispatcherManager).soloDispatchers {
d.curTs.Store(1000)
}
c.(*dispatcherManager).mu.RUnlock()

c.(*dispatcherManager).tryMerge()
assert.Equal(t, 1, c.Num())
Expand All @@ -96,6 +102,12 @@ func TestManager(t *testing.T) {
_, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, 3, c.Num())
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
c.(*dispatcherManager).mu.RLock()
for _, d := range c.(*dispatcherManager).soloDispatchers {
d.curTs.Store(1000)
}
c.(*dispatcherManager).mu.RUnlock()

CheckPeriod = 10 * time.Millisecond
go c.Run()
Expand Down

0 comments on commit 5fc4838

Please sign in to comment.