Skip to content

Commit

Permalink
Add LogPosition and CollectionVersion
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Apr 29, 2024
1 parent 2ff4cc4 commit 12559eb
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 74 deletions.
14 changes: 8 additions & 6 deletions go/pkg/coordinator/grpc/proto_model_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ func convertSegmentToProto(segment *model.Segment) *coordinatorpb.Segment {
}
}
segmentpb := &coordinatorpb.Segment{
Id: segment.ID.String(),
Type: segment.Type,
Scope: segmentSceope,
Collection: nil,
Metadata: nil,
FilePaths: filePaths,
Id: segment.ID.String(),
Type: segment.Type,
Scope: segmentSceope,
Collection: nil,
Metadata: nil,
FilePaths: filePaths,
LogPosition: segment.LogPosition,
CollectionVersion: segment.CollectionVersion,
}

collectionID := segment.CollectionID
Expand Down
20 changes: 14 additions & 6 deletions go/pkg/metastore/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,13 @@ func (tc *Catalog) GetSegments(ctx context.Context, segmentID types.UniqueID, se
segments := make([]*model.Segment, 0, len(segmentAndMetadataList))
for _, segmentAndMetadata := range segmentAndMetadataList {
segment := &model.Segment{
ID: types.MustParse(segmentAndMetadata.Segment.ID),
Type: segmentAndMetadata.Segment.Type,
Scope: segmentAndMetadata.Segment.Scope,
Ts: segmentAndMetadata.Segment.Ts,
FilePaths: segmentAndMetadata.Segment.FilePaths,
ID: types.MustParse(segmentAndMetadata.Segment.ID),
Type: segmentAndMetadata.Segment.Type,
Scope: segmentAndMetadata.Segment.Scope,
Ts: segmentAndMetadata.Segment.Ts,
FilePaths: segmentAndMetadata.Segment.FilePaths,
LogPosition: segmentAndMetadata.Segment.LogPosition,
CollectionVersion: segmentAndMetadata.Segment.CollectionVersion,
}

if segmentAndMetadata.Segment.CollectionID != nil {
Expand Down Expand Up @@ -628,9 +630,15 @@ func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectio
if err != nil {
return err
}
// update segment log position and version, this assumes that the log position and collection version are already successfully updated in the collection
collectionVersion, err = tc.metaDomain.SegmentDb(txCtx).UpdateLogPositionAndVersion(flushCollectionCompaction.ID.String(), flushCollectionCompaction.LogPosition, collectionVersion)
if err != nil {
return err
}

flushCollectionInfo.CollectionVersion = collectionVersion

// update tenant last compaction time
// Update tenant last compaction time
// TODO: add a system configuration to disable
// since this might cause resource contention if one tenant has a lot of collection compactions at the same time
lastCompactionTime := time.Now().Unix()
Expand Down
51 changes: 34 additions & 17 deletions go/pkg/metastore/db/dao/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,13 @@ func (s *segmentDb) Insert(in *dbmodel.Segment) error {
return err
}
return nil

return nil
}

func (s *segmentDb) GetSegments(id types.UniqueID, segmentType *string, scope *string, collectionID types.UniqueID) ([]*dbmodel.SegmentAndMetadata, error) {
var segments []*dbmodel.SegmentAndMetadata

query := s.db.Table("segments").
Select("segments.id, segments.collection_id, segments.type, segments.scope, segments.file_paths, segment_metadata.key, segment_metadata.str_value, segment_metadata.int_value, segment_metadata.float_value").
Select("segments.id, segments.collection_id, segments.type, segments.scope, segments.file_paths, segments.log_position, segments.collection_version, segment_metadata.key, segment_metadata.str_value, segment_metadata.int_value, segment_metadata.float_value").
Joins("LEFT JOIN segment_metadata ON segments.id = segment_metadata.segment_id").
Order("segments.id")

Expand Down Expand Up @@ -86,18 +84,20 @@ func (s *segmentDb) GetSegments(id types.UniqueID, segmentType *string, scope *s

for rows.Next() {
var (
segmentID string
collectionID sql.NullString
segmentType string
scope string
filePathsJson string
key sql.NullString
strValue sql.NullString
intValue sql.NullInt64
floatValue sql.NullFloat64
segmentID string
collectionID sql.NullString
segmentType string
scope string
filePathsJson string
logPosition int64
collectionVersion int32
key sql.NullString
strValue sql.NullString
intValue sql.NullInt64
floatValue sql.NullFloat64
)

err := rows.Scan(&segmentID, &collectionID, &segmentType, &scope, &filePathsJson, &key, &strValue, &intValue, &floatValue)
err := rows.Scan(&segmentID, &collectionID, &segmentType, &scope, &filePathsJson, &logPosition, &collectionVersion, &key, &strValue, &intValue, &floatValue)
if err != nil {
log.Error("scan segment failed", zap.Error(err))
}
Expand All @@ -112,10 +112,12 @@ func (s *segmentDb) GetSegments(id types.UniqueID, segmentType *string, scope *s
}
currentSegment = &dbmodel.SegmentAndMetadata{
Segment: &dbmodel.Segment{
ID: segmentID,
Type: segmentType,
Scope: scope,
FilePaths: filePaths,
ID: segmentID,
Type: segmentType,
Scope: scope,
FilePaths: filePaths,
LogPosition: logPosition,
CollectionVersion: collectionVersion,
},
SegmentMetadata: metadata,
}
Expand Down Expand Up @@ -207,3 +209,18 @@ func (s *segmentDb) RegisterFilePaths(flushSegmentCompactions []*model.FlushSegm
}
return nil
}

func (s *segmentDb) UpdateLogPositionAndVersion(collectionID string, logPosition int64, currentCollectionVersion int32) (int32, error) {
// This assumes that the log position and collection version are already successfully updated in the collection.
err := s.db.Model(&dbmodel.Segment{}).
Where("collection_id = ?", collectionID).
Updates(map[string]interface{}{
"log_position": logPosition,
"collection_version": currentCollectionVersion,
}).Error
if err != nil {
log.Error("update log position and version failed", zap.Error(err))
return 0, err
}
return currentCollectionVersion, nil
}
26 changes: 1 addition & 25 deletions go/pkg/metastore/db/dbmodel/mocks/ICollectionDb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 1 addition & 13 deletions go/pkg/metastore/db/dbmodel/mocks/ICollectionMetadataDb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/pkg/metastore/db/dbmodel/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ type ISegmentDb interface {
Update(*UpdateSegment) error
DeleteAll() error
RegisterFilePaths(flushSegmentCompactions []*model.FlushSegmentCompaction) error
UpdateLogPositionAndVersion(collectionID string, logPosition int64, currentCollectionVersion int32) (int32, error)
}
16 changes: 9 additions & 7 deletions go/pkg/model/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
)

type Segment struct {
ID types.UniqueID
Type string
Scope string
CollectionID types.UniqueID
Metadata *SegmentMetadata[SegmentMetadataValueType]
Ts types.Timestamp
FilePaths map[string][]string
ID types.UniqueID
Type string
Scope string
CollectionID types.UniqueID
Metadata *SegmentMetadata[SegmentMetadataValueType]
Ts types.Timestamp
FilePaths map[string][]string
LogPosition int64
CollectionVersion int32
}

type CreateSegment struct {
Expand Down

0 comments on commit 12559eb

Please sign in to comment.