Skip to content

Commit

Permalink
remove sync segmentLastExpire every time when assigning(milvus-io#25271
Browse files Browse the repository at this point in the history
…) (milvus-io#25316)

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
  • Loading branch information
MrPresent-Han committed Jul 23, 2023
1 parent 32827f5 commit f66aac5
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 63 deletions.
7 changes: 6 additions & 1 deletion configs/milvus.yaml
Expand Up @@ -315,7 +315,12 @@ dataCoord:
maxSize: 512 # Maximum size of a segment in MB
diskSegmentMaxSize: 2048 # Maximun size of a segment in MB for collection which has Disk index
sealProportion: 0.23
assignmentExpiration: 2000 # The time of the assignment expiration in ms
# The time of the assignment expiration in ms
# Warning! this parameter is an expert variable and closely related to data integrity. Without specific
# target and solid understanding of the scenarios, it should not be changed. If it's necessary to alter
# this parameter, make sure that the newly changed value is larger than the previous value used before restart
# otherwise there could be a large possibility of data loss
assignmentExpiration: 2000
maxLife: 86400 # The max lifetime of segment in seconds, 24*60*60
# If a segment didn't accept dml records in maxIdleTime and the size of segment is greater than
# minSizeFromIdleToSealed, Milvus will automatically seal it.
Expand Down
29 changes: 14 additions & 15 deletions internal/datacoord/meta.go
Expand Up @@ -913,24 +913,13 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
curSegInfo := m.segments.GetSegment(segmentID)
if curSegInfo == nil {
// TODO: Error handling.
log.Warn("meta update: add allocation failed - segment not found",
zap.Int64("segmentID", segmentID))
log.Warn("meta update: add allocation failed - segment not found", zap.Int64("segmentID", segmentID))
return nil
}
// Persist segment updates first.
clonedSegment := curSegInfo.Clone(AddAllocation(allocation))
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil {
log.Error("meta update: add allocation failed",
zap.Int64("segmentID", segmentID),
zap.Error(err))
return err
}
}
// Update in-memory meta.
// As we use global segment lastExpire to guarantee data correctness after restart
// there is no need to persist allocation to meta store, only update allocation in-memory meta.
m.segments.AddAllocation(segmentID, allocation)
log.Info("meta update: add allocation - complete",
zap.Int64("segmentID", segmentID))
log.Info("meta update: add allocation - complete", zap.Int64("segmentID", segmentID))
return nil
}

Expand All @@ -950,6 +939,16 @@ func (m *meta) SetCurrentRows(segmentID UniqueID, rows int64) {
m.segments.SetCurrentRows(segmentID, rows)
}

// SetLastExpire set lastExpire time for segment
// Note that last is not necessary to store in KV meta
func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) {
m.Lock()
defer m.Unlock()
clonedSegment := m.segments.GetSegment(segmentID).Clone()
clonedSegment.LastExpireTime = lastExpire
m.segments.SetSegment(segmentID, clonedSegment)
}

// SetLastFlushTime set LastFlushTime for segment with provided `segmentID`
// Note that lastFlushTime is not persisted in KV store
func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) {
Expand Down
43 changes: 32 additions & 11 deletions internal/datacoord/segment_manager.go
Expand Up @@ -29,8 +29,8 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -113,7 +113,6 @@ type SegmentManager struct {
segmentSealPolicies []segmentSealPolicy
channelSealPolicies []channelSealPolicy
flushPolicy flushPolicy
rcc types.RootCoord
}

type allocHelper struct {
Expand Down Expand Up @@ -198,7 +197,7 @@ func defaultFlushPolicy() flushPolicy {
}

// newSegmentManager should be the only way to retrieve SegmentManager.
func newSegmentManager(meta *meta, allocator allocator, rcc types.RootCoord, opts ...allocOption) *SegmentManager {
func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) (*SegmentManager, error) {
manager := &SegmentManager{
meta: meta,
allocator: allocator,
Expand All @@ -209,13 +208,15 @@ func newSegmentManager(meta *meta, allocator allocator, rcc types.RootCoord, opt
segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
flushPolicy: defaultFlushPolicy(),
rcc: rcc,
}
for _, opt := range opts {
opt.apply(manager)
}
manager.loadSegmentsFromMeta()
return manager
if err := manager.maybeResetLastExpireForSegments(); err != nil {
return nil, err
}
return manager, nil
}

// loadSegmentsFromMeta generate corresponding segment status for each segment from meta
Expand All @@ -228,6 +229,32 @@ func (s *SegmentManager) loadSegmentsFromMeta() {
s.segments = segmentsID
}

func (s *SegmentManager) maybeResetLastExpireForSegments() error {
//for all sealed and growing segments, need to reset last expire
if len(s.segments) > 0 {
var latestTs uint64
allocateErr := retry.Do(context.Background(), func() error {
ts, tryErr := s.genExpireTs(context.Background(), false)
log.Warn("failed to get ts from rootCoord for globalLastExpire", zap.Error(tryErr))
if tryErr != nil {
return tryErr
}
latestTs = ts
return nil
}, retry.Attempts(Params.DataCoordCfg.AllocLatestExpireAttempt.GetAsUint()), retry.Sleep(200*time.Millisecond))
if allocateErr != nil {
log.Warn("cannot allocate latest lastExpire from rootCoord", zap.Error(allocateErr))
return errors.New("global max expire ts is unavailable for segment manager")
}
for _, sID := range s.segments {
if segment := s.meta.GetSegment(sID); segment != nil && segment.GetState() == commonpb.SegmentState_Growing {
s.meta.SetLastExpire(sID, latestTs)
}
}
}
return nil
}

// AllocSegment allocate segment per request collcation, partication, channel and rows
func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) {
Expand Down Expand Up @@ -308,12 +335,6 @@ func (s *SegmentManager) allocSegmentForImport(ctx context.Context, collectionID
if err != nil {
return nil, err
}
// ReportImport with the new segment so RootCoord can add segment ref lock onto it.
// TODO: This is a hack and will be removed once the whole ImportManager is migrated from RootCoord to DataCoord.
if s.rcc == nil {
log.Error("RootCoord client not set")
return nil, errors.New("RootCoord client not set")
}

allocation.ExpireTime = expireTs
allocation.SegmentID = segment.GetID()
Expand Down

0 comments on commit f66aac5

Please sign in to comment.