diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index 2c285394dc09..026b6a95b99e 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -154,7 +154,7 @@ type QueryCoordCatalog interface { RemoveResourceGroup(rgName string) error GetResourceGroups() ([]*querypb.ResourceGroup, error) - SaveCollectionTarget(target *querypb.CollectionTarget) error + SaveCollectionTargets(target ...*querypb.CollectionTarget) error RemoveCollectionTarget(collectionID int64) error GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error) } diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index 2ac1fa6039be..22dcc7f98444 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -241,16 +241,21 @@ func (s Catalog) ReleaseReplica(collection, replica int64) error { return s.cli.Remove(key) } -func (s Catalog) SaveCollectionTarget(target *querypb.CollectionTarget) error { - k := encodeCollectionTargetKey(target.GetCollectionID()) - v, err := proto.Marshal(target) - if err != nil { - return err +func (s Catalog) SaveCollectionTargets(targets ...*querypb.CollectionTarget) error { + kvs := make(map[string]string) + for _, target := range targets { + k := encodeCollectionTargetKey(target.GetCollectionID()) + v, err := proto.Marshal(target) + if err != nil { + return err + } + var compressed bytes.Buffer + compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) + kvs[k] = compressed.String() } + // to reduce the target size, we do compress before write to etcd - var compressed bytes.Buffer - compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) - err = s.cli.Save(k, compressed.String()) + err := s.cli.MultiSave(kvs) if err != nil { return err } diff --git a/internal/metastore/kv/querycoord/kv_catalog_test.go b/internal/metastore/kv/querycoord/kv_catalog_test.go index 349e94539f56..8b4bba050f43 100644 --- a/internal/metastore/kv/querycoord/kv_catalog_test.go +++ b/internal/metastore/kv/querycoord/kv_catalog_test.go @@ -203,22 +203,22 @@ func (suite *CatalogTestSuite) TestResourceGroup() { } func (suite *CatalogTestSuite) TestCollectionTarget() { - suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{ + suite.catalog.SaveCollectionTargets(&querypb.CollectionTarget{ CollectionID: 1, Version: 1, - }) - suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{ - CollectionID: 2, - Version: 2, - }) - suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{ - CollectionID: 3, - Version: 3, - }) - suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{ - CollectionID: 1, - Version: 4, - }) + }, + &querypb.CollectionTarget{ + CollectionID: 2, + Version: 2, + }, + &querypb.CollectionTarget{ + CollectionID: 3, + Version: 3, + }, + &querypb.CollectionTarget{ + CollectionID: 1, + Version: 4, + }) suite.catalog.RemoveCollectionTarget(2) targets, err := suite.catalog.GetCollectionTargets() @@ -230,18 +230,18 @@ func (suite *CatalogTestSuite) TestCollectionTarget() { // test access meta store failed mockStore := mocks.NewMetaKv(suite.T()) mockErr := errors.New("failed to access etcd") - mockStore.EXPECT().Save(mock.Anything, mock.Anything).Return(mockErr) + mockStore.EXPECT().MultiSave(mock.Anything).Return(mockErr) mockStore.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) suite.catalog.cli = mockStore - err = suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{}) + err = suite.catalog.SaveCollectionTargets(&querypb.CollectionTarget{}) suite.ErrorIs(err, mockErr) _, err = suite.catalog.GetCollectionTargets() suite.ErrorIs(err, mockErr) // test invalid message - err = suite.catalog.SaveCollectionTarget(nil) + err = suite.catalog.SaveCollectionTargets(nil) suite.Error(err) } diff --git a/internal/metastore/mocks/mock_querycoord_catalog.go b/internal/metastore/mocks/mock_querycoord_catalog.go index 98dfaaf9d7c7..06e12d69378b 100644 --- a/internal/metastore/mocks/mock_querycoord_catalog.go +++ b/internal/metastore/mocks/mock_querycoord_catalog.go @@ -610,13 +610,19 @@ func (_c *QueryCoordCatalog_SaveCollection_Call) RunAndReturn(run func(*querypb. return _c } -// SaveCollectionTarget provides a mock function with given fields: target -func (_m *QueryCoordCatalog) SaveCollectionTarget(target *querypb.CollectionTarget) error { - ret := _m.Called(target) +// SaveCollectionTargets provides a mock function with given fields: target +func (_m *QueryCoordCatalog) SaveCollectionTargets(target ...*querypb.CollectionTarget) error { + _va := make([]interface{}, len(target)) + for _i := range target { + _va[_i] = target[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(*querypb.CollectionTarget) error); ok { - r0 = rf(target) + if rf, ok := ret.Get(0).(func(...*querypb.CollectionTarget) error); ok { + r0 = rf(target...) } else { r0 = ret.Error(0) } @@ -624,30 +630,37 @@ func (_m *QueryCoordCatalog) SaveCollectionTarget(target *querypb.CollectionTarg return r0 } -// QueryCoordCatalog_SaveCollectionTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCollectionTarget' -type QueryCoordCatalog_SaveCollectionTarget_Call struct { +// QueryCoordCatalog_SaveCollectionTargets_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCollectionTargets' +type QueryCoordCatalog_SaveCollectionTargets_Call struct { *mock.Call } -// SaveCollectionTarget is a helper method to define mock.On call -// - target *querypb.CollectionTarget -func (_e *QueryCoordCatalog_Expecter) SaveCollectionTarget(target interface{}) *QueryCoordCatalog_SaveCollectionTarget_Call { - return &QueryCoordCatalog_SaveCollectionTarget_Call{Call: _e.mock.On("SaveCollectionTarget", target)} +// SaveCollectionTargets is a helper method to define mock.On call +// - target ...*querypb.CollectionTarget +func (_e *QueryCoordCatalog_Expecter) SaveCollectionTargets(target ...interface{}) *QueryCoordCatalog_SaveCollectionTargets_Call { + return &QueryCoordCatalog_SaveCollectionTargets_Call{Call: _e.mock.On("SaveCollectionTargets", + append([]interface{}{}, target...)...)} } -func (_c *QueryCoordCatalog_SaveCollectionTarget_Call) Run(run func(target *querypb.CollectionTarget)) *QueryCoordCatalog_SaveCollectionTarget_Call { +func (_c *QueryCoordCatalog_SaveCollectionTargets_Call) Run(run func(target ...*querypb.CollectionTarget)) *QueryCoordCatalog_SaveCollectionTargets_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*querypb.CollectionTarget)) + variadicArgs := make([]*querypb.CollectionTarget, len(args)-0) + for i, a := range args[0:] { + if a != nil { + variadicArgs[i] = a.(*querypb.CollectionTarget) + } + } + run(variadicArgs...) }) return _c } -func (_c *QueryCoordCatalog_SaveCollectionTarget_Call) Return(_a0 error) *QueryCoordCatalog_SaveCollectionTarget_Call { +func (_c *QueryCoordCatalog_SaveCollectionTargets_Call) Return(_a0 error) *QueryCoordCatalog_SaveCollectionTargets_Call { _c.Call.Return(_a0) return _c } -func (_c *QueryCoordCatalog_SaveCollectionTarget_Call) RunAndReturn(run func(*querypb.CollectionTarget) error) *QueryCoordCatalog_SaveCollectionTarget_Call { +func (_c *QueryCoordCatalog_SaveCollectionTargets_Call) RunAndReturn(run func(...*querypb.CollectionTarget) error) *QueryCoordCatalog_SaveCollectionTargets_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 5e633da881d2..6c353ef5a29f 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -19,6 +19,7 @@ package meta import ( "context" "fmt" + "runtime" "sync" "github.com/cockroachdb/errors" @@ -27,8 +28,10 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" @@ -583,13 +586,38 @@ func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) mgr.rwMutex.Lock() defer mgr.rwMutex.Unlock() if mgr.current != nil { + // use pool here to control maximal writer used by save target + pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2) + // use batch write in case of the number of collections is large + batchSize := 16 + var wg sync.WaitGroup + submit := func(tasks []typeutil.Pair[int64, *querypb.CollectionTarget]) { + wg.Add(1) + pool.Submit(func() (any, error) { + defer wg.Done() + ids := lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) int64 { return p.A }) + if err := catalog.SaveCollectionTargets(lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) *querypb.CollectionTarget { + return p.B + })...); err != nil { + log.Warn("failed to save current target for collection", zap.Int64s("collectionIDs", ids), zap.Error(err)) + } else { + log.Info("succeed to save current target for collection", zap.Int64s("collectionIDs", ids)) + } + return nil, nil + }) + } + tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) for id, target := range mgr.current.collectionTargetMap { - if err := catalog.SaveCollectionTarget(target.toPbMsg()); err != nil { - log.Warn("failed to save current target for collection", zap.Int64("collectionID", id), zap.Error(err)) - } else { - log.Warn("succeed to save current target for collection", zap.Int64("collectionID", id)) + tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg())) + if len(tasks) >= batchSize { + submit(tasks) + tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) } } + if len(tasks) > 0 { + submit(tasks) + } + wg.Wait() } }