From 12559eb4cbf4c02d48027fd1e7fc2f8fabd1f7cc Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 26 Apr 2024 19:58:16 -0700 Subject: [PATCH] Add LogPosition and CollectionVersion --- .../coordinator/grpc/proto_model_convert.go | 14 ++--- go/pkg/metastore/coordinator/table_catalog.go | 20 +++++--- go/pkg/metastore/db/dao/segment.go | 51 ++++++++++++------- .../db/dbmodel/mocks/ICollectionDb.go | 26 +--------- .../db/dbmodel/mocks/ICollectionMetadataDb.go | 14 +---- go/pkg/metastore/db/dbmodel/segment.go | 1 + go/pkg/model/segment.go | 16 +++--- 7 files changed, 68 insertions(+), 74 deletions(-) diff --git a/go/pkg/coordinator/grpc/proto_model_convert.go b/go/pkg/coordinator/grpc/proto_model_convert.go index cc7fbb12fc..3edac51aa6 100644 --- a/go/pkg/coordinator/grpc/proto_model_convert.go +++ b/go/pkg/coordinator/grpc/proto_model_convert.go @@ -153,12 +153,14 @@ func convertSegmentToProto(segment *model.Segment) *coordinatorpb.Segment { } } segmentpb := &coordinatorpb.Segment{ - Id: segment.ID.String(), - Type: segment.Type, - Scope: segmentSceope, - Collection: nil, - Metadata: nil, - FilePaths: filePaths, + Id: segment.ID.String(), + Type: segment.Type, + Scope: segmentSceope, + Collection: nil, + Metadata: nil, + FilePaths: filePaths, + LogPosition: segment.LogPosition, + CollectionVersion: segment.CollectionVersion, } collectionID := segment.CollectionID diff --git a/go/pkg/metastore/coordinator/table_catalog.go b/go/pkg/metastore/coordinator/table_catalog.go index 5ed9ed1127..1ad51351cb 100644 --- a/go/pkg/metastore/coordinator/table_catalog.go +++ b/go/pkg/metastore/coordinator/table_catalog.go @@ -471,11 +471,13 @@ func (tc *Catalog) GetSegments(ctx context.Context, segmentID types.UniqueID, se segments := make([]*model.Segment, 0, len(segmentAndMetadataList)) for _, segmentAndMetadata := range segmentAndMetadataList { segment := &model.Segment{ - ID: types.MustParse(segmentAndMetadata.Segment.ID), - Type: segmentAndMetadata.Segment.Type, - Scope: segmentAndMetadata.Segment.Scope, - Ts: segmentAndMetadata.Segment.Ts, - FilePaths: segmentAndMetadata.Segment.FilePaths, + ID: types.MustParse(segmentAndMetadata.Segment.ID), + Type: segmentAndMetadata.Segment.Type, + Scope: segmentAndMetadata.Segment.Scope, + Ts: segmentAndMetadata.Segment.Ts, + FilePaths: segmentAndMetadata.Segment.FilePaths, + LogPosition: segmentAndMetadata.Segment.LogPosition, + CollectionVersion: segmentAndMetadata.Segment.CollectionVersion, } if segmentAndMetadata.Segment.CollectionID != nil { @@ -628,9 +630,15 @@ func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectio if err != nil { return err } + // update segment log position and version, this assumes that the log position and collection version are already successfully updated in the collection + collectionVersion, err = tc.metaDomain.SegmentDb(txCtx).UpdateLogPositionAndVersion(flushCollectionCompaction.ID.String(), flushCollectionCompaction.LogPosition, collectionVersion) + if err != nil { + return err + } + flushCollectionInfo.CollectionVersion = collectionVersion - // update tenant last compaction time + // Update tenant last compaction time // TODO: add a system configuration to disable // since this might cause resource contention if one tenant has a lot of collection compactions at the same time lastCompactionTime := time.Now().Unix() diff --git a/go/pkg/metastore/db/dao/segment.go b/go/pkg/metastore/db/dao/segment.go index 670cddb826..bf0506eacb 100644 --- a/go/pkg/metastore/db/dao/segment.go +++ b/go/pkg/metastore/db/dao/segment.go @@ -48,15 +48,13 @@ func (s *segmentDb) Insert(in *dbmodel.Segment) error { return err } return nil - - return nil } func (s *segmentDb) GetSegments(id types.UniqueID, segmentType *string, scope *string, collectionID types.UniqueID) ([]*dbmodel.SegmentAndMetadata, error) { var segments []*dbmodel.SegmentAndMetadata query := s.db.Table("segments"). - Select("segments.id, segments.collection_id, segments.type, segments.scope, segments.file_paths, segment_metadata.key, segment_metadata.str_value, segment_metadata.int_value, segment_metadata.float_value"). + Select("segments.id, segments.collection_id, segments.type, segments.scope, segments.file_paths, segments.log_position, segments.collection_version, segment_metadata.key, segment_metadata.str_value, segment_metadata.int_value, segment_metadata.float_value"). Joins("LEFT JOIN segment_metadata ON segments.id = segment_metadata.segment_id"). Order("segments.id") @@ -86,18 +84,20 @@ func (s *segmentDb) GetSegments(id types.UniqueID, segmentType *string, scope *s for rows.Next() { var ( - segmentID string - collectionID sql.NullString - segmentType string - scope string - filePathsJson string - key sql.NullString - strValue sql.NullString - intValue sql.NullInt64 - floatValue sql.NullFloat64 + segmentID string + collectionID sql.NullString + segmentType string + scope string + filePathsJson string + logPosition int64 + collectionVersion int32 + key sql.NullString + strValue sql.NullString + intValue sql.NullInt64 + floatValue sql.NullFloat64 ) - err := rows.Scan(&segmentID, &collectionID, &segmentType, &scope, &filePathsJson, &key, &strValue, &intValue, &floatValue) + err := rows.Scan(&segmentID, &collectionID, &segmentType, &scope, &filePathsJson, &logPosition, &collectionVersion, &key, &strValue, &intValue, &floatValue) if err != nil { log.Error("scan segment failed", zap.Error(err)) } @@ -112,10 +112,12 @@ func (s *segmentDb) GetSegments(id types.UniqueID, segmentType *string, scope *s } currentSegment = &dbmodel.SegmentAndMetadata{ Segment: &dbmodel.Segment{ - ID: segmentID, - Type: segmentType, - Scope: scope, - FilePaths: filePaths, + ID: segmentID, + Type: segmentType, + Scope: scope, + FilePaths: filePaths, + LogPosition: logPosition, + CollectionVersion: collectionVersion, }, SegmentMetadata: metadata, } @@ -207,3 +209,18 @@ func (s *segmentDb) RegisterFilePaths(flushSegmentCompactions []*model.FlushSegm } return nil } + +func (s *segmentDb) UpdateLogPositionAndVersion(collectionID string, logPosition int64, currentCollectionVersion int32) (int32, error) { + // This assumes that the log position and collection version are already successfully updated in the collection. + err := s.db.Model(&dbmodel.Segment{}). + Where("collection_id = ?", collectionID). + Updates(map[string]interface{}{ + "log_position": logPosition, + "collection_version": currentCollectionVersion, + }).Error + if err != nil { + log.Error("update log position and version failed", zap.Error(err)) + return 0, err + } + return currentCollectionVersion, nil +} diff --git a/go/pkg/metastore/db/dbmodel/mocks/ICollectionDb.go b/go/pkg/metastore/db/dbmodel/mocks/ICollectionDb.go index 83561df29d..d4cd18c0d6 100644 --- a/go/pkg/metastore/db/dbmodel/mocks/ICollectionDb.go +++ b/go/pkg/metastore/db/dbmodel/mocks/ICollectionDb.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.2. DO NOT EDIT. +// Code generated by mockery v2.33.3. DO NOT EDIT. package mocks @@ -16,10 +16,6 @@ type ICollectionDb struct { func (_m *ICollectionDb) DeleteAll() error { ret := _m.Called() - if len(ret) == 0 { - panic("no return value specified for DeleteAll") - } - var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -34,10 +30,6 @@ func (_m *ICollectionDb) DeleteAll() error { func (_m *ICollectionDb) DeleteCollectionByID(collectionID string) (int, error) { ret := _m.Called(collectionID) - if len(ret) == 0 { - panic("no return value specified for DeleteCollectionByID") - } - var r0 int var r1 error if rf, ok := ret.Get(0).(func(string) (int, error)); ok { @@ -62,10 +54,6 @@ func (_m *ICollectionDb) DeleteCollectionByID(collectionID string) (int, error) func (_m *ICollectionDb) GetCollections(collectionID *string, collectionName *string, tenantID string, databaseName string, limit *int32, offset *int32) ([]*dbmodel.CollectionAndMetadata, error) { ret := _m.Called(collectionID, collectionName, tenantID, databaseName, limit, offset) - if len(ret) == 0 { - panic("no return value specified for GetCollections") - } - var r0 []*dbmodel.CollectionAndMetadata var r1 error if rf, ok := ret.Get(0).(func(*string, *string, string, string, *int32, *int32) ([]*dbmodel.CollectionAndMetadata, error)); ok { @@ -92,10 +80,6 @@ func (_m *ICollectionDb) GetCollections(collectionID *string, collectionName *st func (_m *ICollectionDb) Insert(in *dbmodel.Collection) error { ret := _m.Called(in) - if len(ret) == 0 { - panic("no return value specified for Insert") - } - var r0 error if rf, ok := ret.Get(0).(func(*dbmodel.Collection) error); ok { r0 = rf(in) @@ -110,10 +94,6 @@ func (_m *ICollectionDb) Insert(in *dbmodel.Collection) error { func (_m *ICollectionDb) Update(in *dbmodel.Collection) error { ret := _m.Called(in) - if len(ret) == 0 { - panic("no return value specified for Update") - } - var r0 error if rf, ok := ret.Get(0).(func(*dbmodel.Collection) error); ok { r0 = rf(in) @@ -128,10 +108,6 @@ func (_m *ICollectionDb) Update(in *dbmodel.Collection) error { func (_m *ICollectionDb) UpdateLogPositionAndVersion(collectionID string, logPosition int64, currentCollectionVersion int32) (int32, error) { ret := _m.Called(collectionID, logPosition, currentCollectionVersion) - if len(ret) == 0 { - panic("no return value specified for UpdateLogPositionAndVersion") - } - var r0 int32 var r1 error if rf, ok := ret.Get(0).(func(string, int64, int32) (int32, error)); ok { diff --git a/go/pkg/metastore/db/dbmodel/mocks/ICollectionMetadataDb.go b/go/pkg/metastore/db/dbmodel/mocks/ICollectionMetadataDb.go index d89e09536d..5103142647 100644 --- a/go/pkg/metastore/db/dbmodel/mocks/ICollectionMetadataDb.go +++ b/go/pkg/metastore/db/dbmodel/mocks/ICollectionMetadataDb.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.0. DO NOT EDIT. +// Code generated by mockery v2.33.3. DO NOT EDIT. package mocks @@ -16,10 +16,6 @@ type ICollectionMetadataDb struct { func (_m *ICollectionMetadataDb) DeleteAll() error { ret := _m.Called() - if len(ret) == 0 { - panic("no return value specified for DeleteAll") - } - var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -34,10 +30,6 @@ func (_m *ICollectionMetadataDb) DeleteAll() error { func (_m *ICollectionMetadataDb) DeleteByCollectionID(collectionID string) (int, error) { ret := _m.Called(collectionID) - if len(ret) == 0 { - panic("no return value specified for DeleteByCollectionID") - } - var r0 int var r1 error if rf, ok := ret.Get(0).(func(string) (int, error)); ok { @@ -62,10 +54,6 @@ func (_m *ICollectionMetadataDb) DeleteByCollectionID(collectionID string) (int, func (_m *ICollectionMetadataDb) Insert(in []*dbmodel.CollectionMetadata) error { ret := _m.Called(in) - if len(ret) == 0 { - panic("no return value specified for Insert") - } - var r0 error if rf, ok := ret.Get(0).(func([]*dbmodel.CollectionMetadata) error); ok { r0 = rf(in) diff --git a/go/pkg/metastore/db/dbmodel/segment.go b/go/pkg/metastore/db/dbmodel/segment.go index e15cc25b69..b1b1c9f143 100644 --- a/go/pkg/metastore/db/dbmodel/segment.go +++ b/go/pkg/metastore/db/dbmodel/segment.go @@ -49,4 +49,5 @@ type ISegmentDb interface { Update(*UpdateSegment) error DeleteAll() error RegisterFilePaths(flushSegmentCompactions []*model.FlushSegmentCompaction) error + UpdateLogPositionAndVersion(collectionID string, logPosition int64, currentCollectionVersion int32) (int32, error) } diff --git a/go/pkg/model/segment.go b/go/pkg/model/segment.go index 5e30c96df1..b61cec5b6a 100644 --- a/go/pkg/model/segment.go +++ b/go/pkg/model/segment.go @@ -5,13 +5,15 @@ import ( ) type Segment struct { - ID types.UniqueID - Type string - Scope string - CollectionID types.UniqueID - Metadata *SegmentMetadata[SegmentMetadataValueType] - Ts types.Timestamp - FilePaths map[string][]string + ID types.UniqueID + Type string + Scope string + CollectionID types.UniqueID + Metadata *SegmentMetadata[SegmentMetadataValueType] + Ts types.Timestamp + FilePaths map[string][]string + LogPosition int64 + CollectionVersion int32 } type CreateSegment struct {