-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: bucket migration state improvement #1250
Conversation
41bd8e5
to
9e915e4
Compare
9e915e4
to
2decd3c
Compare
8560aa1
to
29c40d6
Compare
8a68460
to
578d0e0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
NotifyPostMigrateBucket(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) error | ||
GetMigrationBucketState(ctx context.Context, bucketID uint64) (*gfspserver.MigrateBucketProgressMeta, error) | ||
NotifyPostMigrateBucketAndRecoupQuota(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) (*gfsptask.GfSpBucketQuotaInfo, error) | ||
NotifyPreMigrateBucketAndDeductQuota(ctx context.Context, bucketID uint64) (*gfsptask.GfSpBucketQuotaInfo, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please try to keep a consistent style.
@@ -127,6 +128,7 @@ type MetadataAPI interface { | |||
ListGlobalVirtualGroupsByBucket(ctx context.Context, bucketID uint64, opts ...grpc.DialOption) ([]*virtualgrouptypes.GlobalVirtualGroup, error) | |||
ListGlobalVirtualGroupsBySecondarySP(ctx context.Context, spID uint32, opts ...grpc.DialOption) ([]*virtualgrouptypes.GlobalVirtualGroup, error) | |||
ListMigrateBucketEvents(ctx context.Context, blockID uint64, spID uint32, opts ...grpc.DialOption) ([]*types.ListMigrateBucketEvents, error) | |||
ListCompleteMigrationBucketEvents(ctx context.Context, blockID uint64, srcSpID uint32, opts ...grpc.DialOption) ([]*storagetypes.EventCompleteMigrationBucket, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to distinguish the difference between ListMigrateBucketEvents and ListCompleteMigrationBucketEvents through naming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantics of the interfaces are currently a bit different:
- ListMigrateBucketEvents returns all events between (0-height), while ListCompleteMigrationBucketEvents returns the current block height.
- ListMigrateBucketEvents does not return CompleteMigrationBucketEvent. ListCompleteMigrationBucketEvents is mainly used for the src SP to listen for CompleteMigrationBucketEvent.
- ListMigrateBucketEvents has an internal table, EventMigrationBucket, using (dst_primary_sp_id). There is no src_sp_id here, making the implementation a bit more complex.
Considering the above reasons, ListCompleteMigrationBucketEvents has been implemented."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- src sp , dest sp comment
base/types/gfsptask/migrate_gvg.go
Outdated
@@ -373,3 +373,38 @@ func (m *GfSpGCBucketMigrationTask) GetBucketID() uint64 { | |||
func (m *GfSpGCBucketMigrationTask) SetBucketID(bucketID uint64) { | |||
m.BucketId = bucketID | |||
} | |||
|
|||
func (m *GfSpGCBucketMigrationTask) SetLastGCObjectID(lastGCObjectID uint64) { | |||
m.LastGcObjectId = lastGCObjectID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gc or GC? keep a consistent style.
// used by persist bucket migration progress and meta | ||
type MigrateBucketProgressTable struct { | ||
BucketID uint64 `gorm:"primary_key"` | ||
SubscribedBlockHeight uint64 `gorm:"primary_key"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the meaning of the two primary keys in gorm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100 -> xx init, migrate bucket process
+++ 101 102;
103 -> migrate finished;
add more comment;
@@ -119,8 +119,9 @@ type MigrateGVGUnitMeta struct { | |||
SrcSPID uint32 | |||
DestSPID uint32 | |||
LastMigratedObjectID uint64 | |||
MigrateStatus int // scheduler assign unit status. | |||
RetryTime int // | |||
MigrateStatus int // scheduler assign unit status. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Status or state, pls distinguish semantics and be consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the migrate gvg task 's migrate status
@@ -384,13 +384,16 @@ func (e *ExecuteModular) gcMetaReadRecord(ctx context.Context, task coretask.GCM | |||
} | |||
|
|||
func (e *ExecuteModular) HandleGCBucketMigrationBucket(ctx context.Context, task coretask.GCBucketMigrationTask) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether to support continuing gc from the last breakpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- parallel
- resumable gc, breakpoint
err = e.gcWorker.deleteObjectSegmentsAndIntegrity(ctx, objectInfo) | ||
log.CtxInfow(ctx, "succeed to delete objects by gvg and bucket for gc", "object", objectInfo, "error", err) | ||
for { | ||
if objectList, err = e.baseApp.GfSpClient().ListObjectsByGVGAndBucketForGC(ctx, gvg.GetId(), bucketID, startAfter, limit); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not find out startAfter
is updated in this for loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
log.CtxErrorw(reqCtx.Context(), "failed to pre migrate bucket, the bucket may already notified", "bucket_id", | ||
bucketID, "error", err) | ||
// if the bucket has already pre notified ignore the error | ||
if strings.Contains(err.Error(), "the bucket has already notified") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the err can be nil
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, if the quota has been deducted, the preMigrateBucket should be return true;
@@ -526,32 +555,68 @@ func (s *BucketMigrateScheduler) doneMigrateBucket(bucketID uint64) error { | |||
s.deleteExecutePlanByBucketID(bucketID) | |||
executePlan.stopSPSchedule() | |||
err = s.manager.baseApp.GfSpDB().DeleteMigrateGVGUnitsByBucketID(bucketID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this records stay in DB if skipping?
if err = UpdateBucketMigrationProgress(s.manager.baseApp, bucketID, storetypes.BucketMigrationState_MIGRATION_FINISHED); err != nil { | ||
return | ||
} | ||
log.CtxInfow(ctx, "succeed to confirm complete events", "EventMigrationBucket", event, "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted
modular/manager/migrate_service.go
Outdated
// get bucket quota and check | ||
if bucketSize, err = m.getBucketTotalSize(ctx, bucketID); err != nil { | ||
log.Errorf("failed to get bucket total object size", "bucket_id", bucketID, "error", err) | ||
return latestQuota, err | ||
} | ||
|
||
if bmInfo.GetFinished() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this useless if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted, this is the old bucket migration gc trigger way.
base/gfspclient/gater.go
Outdated
@@ -243,7 +245,7 @@ func (s *GfSpClient) QueryLatestBucketQuota(ctx context.Context, endpoint string | |||
return gfsptask.GfSpBucketQuotaInfo{}, fmt.Errorf("failed to query latest bucket quota, bucket(%s), status_code(%d), endpoint(%s)", queryMsg, resp.StatusCode, endpoint) | |||
} | |||
|
|||
signedMsg, err := hex.DecodeString(resp.Header.Get(GnfdSignedApprovalMsgHeader)) | |||
signedMsg, err := hex.DecodeString(resp.Header.Get(GnfdQuotaInfoHeader)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
signedMsg
may be misleading, it can be optimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
core/spdb/entity.go
Outdated
type MigrateBucketProgressMeta struct { | ||
BucketID uint64 // as primary key | ||
SubscribedBlockHeight uint64 | ||
MigrationState int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
migrateState?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Description
Optimizations for bucket migration:
Feat 1: Persistence of Quota State
In the previous setup, the source storage provider (Src SP) utilized an in-memory map (
ManageModular::migratingBuckets
) to keep track of whether the quota for bucket migration had been deducted. However, this approach presented challenges in scenarios where the source storage provider experienced an outage and needed to restart, making it difficult to maintain accurate state. To address this issue, the in-memory map has been replaced with a persistent MySQL table.Abstract some logic from preMigrateBucketHandler and postMigrateBucketHandler into Manager::NotifyPreMigrateBucketAndDeductQuota & Manager::NotifyPostMigrateBucketAndRecoupQuota.
feat 2: Optimization of state persistence in DB for each phase of bucket migration
MigrateBucketProgressTable
table to record the state and meta of bucket migration. Additionally, encapsulation of interfaces for modifying state and meta.feat 3: Bucket Migration Progress Query
In both
MigrateGVGUnitMeta
(core/spdb/entity.go) andMigrateGVGTable
, add a field namedMigratedBytesSize
.MigratedBytesSize
from the task. When reporting and callingupdateMigrateGVGStatus
during the update, useUpdateMigrateGVGMigratedBytesSize
to update theMigratedBytesSize
field.listExecutePlan
.Rationale
Example
NA
Changes
Notable changes:
Notification from Destination SP to Source SP - PreMigrateBucket:
In the processing flow (preMigrateBucketHandler) at the source SP upon receiving notification:
MigrateBucketTable
for theMigrateStatus
status. If the quota has already been deducted, return "ok" directly.DeductQuotaForBucketMigrate
to deduct the quota.NotifyDonePreMigrateBucket
to update theMigrateStatus
status in theMigrateBucketTable
to "PreNotified."Notification from Destination SP to Source SP - PostMigrateBucket:
PostMigrateBucket
can only be invoked once.Potential Impacts