Skip to content

Commit

Permalink
feat: re-implementation of sp exit (#1279)
Browse files Browse the repository at this point in the history
* feat: SP exit recover scheduler
---------

Co-authored-by: Alexgao001 <alex.g@nodereal.io>
  • Loading branch information
constwz and alexgao001 committed Dec 26, 2023
1 parent 5d53b63 commit 1818b8b
Show file tree
Hide file tree
Showing 51 changed files with 6,709 additions and 1,617 deletions.
6 changes: 6 additions & 0 deletions base/gfspapp/app_options.go
Expand Up @@ -153,6 +153,12 @@ const (
SignerFailureGfSpBucketMigrateInfo = "signer_gfsp_bucket_migrate_info_failure"
SignerSuccessRejectMigrateBucket = "signer_reject_migrate_bucket_success"
SignerFailureRejectMigrateBucket = "signer_reject_migrate_bucket_failure"
SignerSuccessSwapIn = "signer_swap_in_success"
SignerFailureSwapIn = "signer_swap_in_failure"
SignerSuccessCompleteSwapIn = "signer_complete_swap_in_success"
SignerFailureCompleteSwapIn = "signer_complete_swap_in_failure"
SignerSuccessCancelSwapIn = "signer_cancel_swap_in_success"
SignerFailureCancelSwapIn = "signer_cancel_swap_in_failure"

SignerSuccessDeposit = "signer_deposit_success"
SignerFailureDeposit = "signer_deposit_failure"
Expand Down
20 changes: 20 additions & 0 deletions base/gfspapp/manage_server.go
Expand Up @@ -411,3 +411,23 @@ func (g *GfSpBaseApp) GfSpResetRecoveryFailedList(ctx context.Context, _ *gfspse
RecoveryFailedList: recoveryFailedList,
}, nil
}

func (g *GfSpBaseApp) GfSpTriggerRecoverForSuccessorSP(ctx context.Context, req *gfspserver.GfSpTriggerRecoverForSuccessorSPRequest) (
*gfspserver.GfSpTriggerRecoverForSuccessorSPResponse, error) {
err := g.manager.TriggerRecoverForSuccessorSP(ctx, req.GetVgfId(), req.GetGvgId(), req.ReplicateIndex)
if err != nil {
return nil, err
}
return &gfspserver.GfSpTriggerRecoverForSuccessorSPResponse{}, nil
}

func (g *GfSpBaseApp) GfSpQueryRecoverProcess(ctx context.Context, req *gfspserver.GfSpQueryRecoverProcessRequest) (*gfspserver.GfSpQueryRecoverProcessResponse, error) {
gvgStats, flag, err := g.manager.QueryRecoverProcess(ctx, req.GetVgfId(), req.GetGvgId())
if err != nil {
return nil, err
}
return &gfspserver.GfSpQueryRecoverProcessResponse{
RecoverProcesses: gvgStats,
Executing: flag,
}, nil
}
30 changes: 30 additions & 0 deletions base/gfspapp/sign_server.go
Expand Up @@ -291,6 +291,36 @@ func (g *GfSpBaseApp) GfSpSign(ctx context.Context, req *gfspserver.GfSpSignRequ
metrics.ReqCounter.WithLabelValues(SignerSuccessRejectMigrateBucket).Inc()
metrics.ReqTime.WithLabelValues(SignerSuccessRejectMigrateBucket).Observe(time.Since(startTime).Seconds())
}
case *gfspserver.GfSpSignRequest_ReserveSwapIn:
txHash, err = g.signer.ReserveSwapIn(ctx, t.ReserveSwapIn)
if err != nil {
log.CtxErrorw(ctx, "failed to reserve swap in", "error", err)
metrics.ReqCounter.WithLabelValues(SignerFailureSwapIn).Inc()
metrics.ReqTime.WithLabelValues(SignerFailureSwapIn).Observe(time.Since(startTime).Seconds())
} else {
metrics.ReqCounter.WithLabelValues(SignerSuccessSwapIn).Inc()
metrics.ReqTime.WithLabelValues(SignerSuccessSwapIn).Observe(time.Since(startTime).Seconds())
}
case *gfspserver.GfSpSignRequest_CompleteSwapIn:
txHash, err = g.signer.CompleteSwapIn(ctx, t.CompleteSwapIn)
if err != nil {
log.CtxErrorw(ctx, "failed to complete swap in", "error", err)
metrics.ReqCounter.WithLabelValues(SignerFailureCompleteSwapIn).Inc()
metrics.ReqTime.WithLabelValues(SignerFailureCompleteSwapIn).Observe(time.Since(startTime).Seconds())
} else {
metrics.ReqCounter.WithLabelValues(SignerSuccessCompleteSwapIn).Inc()
metrics.ReqTime.WithLabelValues(SignerSuccessCompleteSwapIn).Observe(time.Since(startTime).Seconds())
}
case *gfspserver.GfSpSignRequest_CancelSwapIn:
txHash, err = g.signer.CancelSwapIn(ctx, t.CancelSwapIn)
if err != nil {
log.CtxErrorw(ctx, "failed to cancel swap in", "error", err)
metrics.ReqCounter.WithLabelValues(SignerFailureCancelSwapIn).Inc()
metrics.ReqTime.WithLabelValues(SignerFailureCancelSwapIn).Observe(time.Since(startTime).Seconds())
} else {
metrics.ReqCounter.WithLabelValues(SignerSuccessCancelSwapIn).Inc()
metrics.ReqTime.WithLabelValues(SignerSuccessCancelSwapIn).Observe(time.Since(startTime).Seconds())
}
case *gfspserver.GfSpSignRequest_Deposit:
txHash, err = g.signer.Deposit(ctx, t.Deposit)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions base/gfspclient/helper_test.go
Expand Up @@ -236,6 +236,14 @@ func (mockDownloaderServer) GfSpDeductQuotaForBucketMigrate(ctx context.Context,

type mockManagerServer struct{}

func (s mockManagerServer) GfSpTriggerRecoverForSuccessorSP(ctx context.Context, request *gfspserver.GfSpTriggerRecoverForSuccessorSPRequest) (*gfspserver.GfSpTriggerRecoverForSuccessorSPResponse, error) {
return &gfspserver.GfSpTriggerRecoverForSuccessorSPResponse{Err: ErrExceptionsStream}, nil
}

func (s mockManagerServer) GfSpQueryRecoverProcess(ctx context.Context, request *gfspserver.GfSpQueryRecoverProcessRequest) (*gfspserver.GfSpQueryRecoverProcessResponse, error) {
return &gfspserver.GfSpQueryRecoverProcessResponse{Err: ErrExceptionsStream}, nil
}

func (mockManagerServer) GfSpBeginTask(ctx context.Context, req *gfspserver.GfSpBeginTaskRequest) (
*gfspserver.GfSpBeginTaskResponse, error) {
switch req.Request.(type) {
Expand Down
2 changes: 2 additions & 0 deletions base/gfspclient/interface.go
Expand Up @@ -194,6 +194,8 @@ type SignerAPI interface {
SignMigrateGVG(ctx context.Context, task *gfsptask.GfSpMigrateGVGTask) ([]byte, error)
SignBucketMigrationInfo(ctx context.Context, task *gfsptask.GfSpBucketMigrationInfo) ([]byte, error)
RejectMigrateBucket(ctx context.Context, rejectMigrateBucket *storagetypes.MsgRejectMigrateBucket) (string, error)
ReserveSwapIn(ctx context.Context, reserveSwapIn *virtualgrouptypes.MsgReserveSwapIn) (string, error)
CompleteSwapIn(ctx context.Context, completeSwpIn *virtualgrouptypes.MsgCompleteSwapIn) (string, error)
Deposit(ctx context.Context, deposit *virtualgrouptypes.MsgDeposit) (string, error)
DeleteGlobalVirtualGroup(ctx context.Context, deleteGVG *virtualgrouptypes.MsgDeleteGlobalVirtualGroup) (string, error)
}
Expand Down

0 comments on commit 1818b8b

Please sign in to comment.