Skip to content

Commit

Permalink
Merge branch 'develop' into fix-creategvgmetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
VM committed Nov 3, 2023
2 parents 014c24f + cf64217 commit dfc1124
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 25 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @ruojunm @will-2012 @zjubfd @KeefeL
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.0.4
This release contains 1 bugfix.

BUGFIXES
* [#1231](https://github.com/bnb-chain/greenfield-storage-provider/pull/1231) fix: fix: add config sp blacklist

## v1.0.3
This release contains 1 bugfix.

Expand Down
1 change: 1 addition & 0 deletions base/gfspconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,5 @@ type ManagerConfig struct {
SubscribeSwapOutExitEventIntervalMillisecond uint `comment:"optional"`
SubscribeBucketMigrateEventIntervalMillisecond uint `comment:"optional"`
GVGPreferSPList []uint32 `comment:"optional"`
SPBlackList []uint32 `comment:"optional"`
}
9 changes: 8 additions & 1 deletion base/gfspvgmgr/sp_freeze_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

const (
DefaultFreezingPeriodForSP = 5 * time.Minute
DefaultFreezingPeriodForSP = 10 * time.Minute
ReleaseSPJobInterval = 1 * time.Minute
)

Expand Down Expand Up @@ -66,3 +66,10 @@ func (s *FreezeSPPool) ReleaseSP() {
return true
})
}

func (s *FreezeSPPool) ReleaseAllSP() {
s.Range(func(k interface{}, v interface{}) bool {
s.Delete(k)
return true
})
}
11 changes: 11 additions & 0 deletions base/gfspvgmgr/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package gfspvgmgr

import "github.com/bnb-chain/greenfield-storage-provider/core/vgmgr"

func NewIDSetFromList(list []uint32) vgmgr.IDSet {
set := make(map[uint32]struct{}, 0)
for _, v := range list {
set[v] = struct{}{}
}
return set
}
47 changes: 36 additions & 11 deletions base/gfspvgmgr/virtual_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (picker *FreeStorageSizeWeightPicker) pickIndex() (uint32, error) {
return 0, fmt.Errorf("failed to pick weighted random index")
}

func (vgfm *virtualGroupFamilyManager) pickVirtualGroupFamily(filter *vgmgr.PickVGFFilter) (*vgmgr.VirtualGroupFamilyMeta, error) {
func (vgfm *virtualGroupFamilyManager) pickVirtualGroupFamily(filter *vgmgr.PickVGFFilter, filterByGvgList *vgmgr.PickVGFByGVGFilter) (*vgmgr.VirtualGroupFamilyMeta, error) {
var (
picker FreeStorageSizeWeightPicker
familyID uint32
Expand All @@ -93,6 +93,9 @@ func (vgfm *virtualGroupFamilyManager) pickVirtualGroupFamily(filter *vgmgr.Pick
if filter != nil && !filter.Check(id) {
continue
}
if !filterByGvgList.Check(f.GVGMap) {
continue
}
picker.addVirtualGroupFamily(f)
}

Expand All @@ -103,7 +106,7 @@ func (vgfm *virtualGroupFamilyManager) pickVirtualGroupFamily(filter *vgmgr.Pick
return vgfm.vgfIDToVgf[familyID], nil
}

func (vgfm *virtualGroupFamilyManager) pickGlobalVirtualGroup(vgfID uint32, filter vgmgr.ExcludeFilter) (*vgmgr.GlobalVirtualGroupMeta, error) {
func (vgfm *virtualGroupFamilyManager) pickGlobalVirtualGroup(vgfID uint32, filter, excludeGVGsFilter vgmgr.ExcludeFilter) (*vgmgr.GlobalVirtualGroupMeta, error) {
var (
picker FreeStorageSizeWeightPicker
globalVirtualGroupID uint32
Expand All @@ -117,6 +120,9 @@ func (vgfm *virtualGroupFamilyManager) pickGlobalVirtualGroup(vgfID uint32, filt
if filter != nil && filter.Apply(g.ID) {
continue
}
if excludeGVGsFilter != nil && excludeGVGsFilter.Apply(g.ID) {
continue
}
picker.addGlobalVirtualGroup(g)
}

Expand Down Expand Up @@ -164,14 +170,17 @@ type spManager struct {
otherSPs []*sptypes.StorageProvider
}

func (sm *spManager) generateVirtualGroupMeta(genPolicy vgmgr.GenerateGVGSecondarySPsPolicy, filter vgmgr.ExcludeFilter) (*vgmgr.GlobalVirtualGroupMeta, error) {
func (sm *spManager) generateVirtualGroupMeta(genPolicy vgmgr.GenerateGVGSecondarySPsPolicy, filter, excludeSPsFilter vgmgr.ExcludeFilter) (*vgmgr.GlobalVirtualGroupMeta, error) {
for _, sp := range sm.otherSPs {
if !sp.IsInService() {
continue
}
if filter != nil && filter.Apply(sp.Id) {
continue
}
if excludeSPsFilter != nil && excludeSPsFilter.Apply(sp.Id) {
continue
}
genPolicy.AddCandidateSP(sp.GetId())
}
secondarySPIDs, err := genPolicy.GenerateGVGSecondarySPs()
Expand Down Expand Up @@ -350,22 +359,22 @@ func (vgm *virtualGroupManager) refreshMetaByChain() {

// PickVirtualGroupFamily pick a virtual group family(If failed to pick,
// new VGF will be automatically created on the chain) in get create bucket approval workflow.
func (vgm *virtualGroupManager) PickVirtualGroupFamily() (*vgmgr.VirtualGroupFamilyMeta, error) {
func (vgm *virtualGroupManager) PickVirtualGroupFamily(filterByGvgList *vgmgr.PickVGFByGVGFilter) (*vgmgr.VirtualGroupFamilyMeta, error) {
filter, err := vgm.genVgfFilter()
if err != nil {
return nil, err
}
vgm.mutex.RLock()
defer vgm.mutex.RUnlock()
return vgm.vgfManager.pickVirtualGroupFamily(filter)
return vgm.vgfManager.pickVirtualGroupFamily(filter, filterByGvgList)
}

// PickGlobalVirtualGroup picks a global virtual group(If failed to pick,
// new GVG will be created by primary SP) in replicate/seal object workflow.
func (vgm *virtualGroupManager) PickGlobalVirtualGroup(vgfID uint32) (*vgmgr.GlobalVirtualGroupMeta, error) {
func (vgm *virtualGroupManager) PickGlobalVirtualGroup(vgfID uint32, excludeGVGsFilter vgmgr.ExcludeFilter) (*vgmgr.GlobalVirtualGroupMeta, error) {
vgm.mutex.RLock()
defer vgm.mutex.RUnlock()
return vgm.vgfManager.pickGlobalVirtualGroup(vgfID, vgmgr.NewExcludeIDFilter(vgm.freezeSPPool.GetFreezeGVGsInFamily(vgfID)))
return vgm.vgfManager.pickGlobalVirtualGroup(vgfID, vgmgr.NewExcludeIDFilter(vgm.freezeSPPool.GetFreezeGVGsInFamily(vgfID)), excludeGVGsFilter)
}

// PickGlobalVirtualGroupForBucketMigrate picks a global virtual group(If failed to pick,
Expand All @@ -382,10 +391,10 @@ func (vgm *virtualGroupManager) PickGlobalVirtualGroupForBucketMigrate(filter vg

// PickMigrateDestGlobalVirtualGroup picks a global virtual group(If failed to pick,
// new GVG will be created by primary SP) in replicate/seal object workflow.
func (vgm *virtualGroupManager) PickMigrateDestGlobalVirtualGroup(vgfID uint32) (*vgmgr.GlobalVirtualGroupMeta, error) {
func (vgm *virtualGroupManager) PickMigrateDestGlobalVirtualGroup(vgfID uint32, excludeGVGsFilter vgmgr.ExcludeFilter) (*vgmgr.GlobalVirtualGroupMeta, error) {
vgm.mutex.RLock()
defer vgm.mutex.RUnlock()
return vgm.vgfManager.pickGlobalVirtualGroup(vgfID, vgmgr.NewExcludeIDFilter(vgm.freezeSPPool.GetFreezeGVGsInFamily(vgfID)))
return vgm.vgfManager.pickGlobalVirtualGroup(vgfID, vgmgr.NewExcludeIDFilter(vgm.freezeSPPool.GetFreezeGVGsInFamily(vgfID)), excludeGVGsFilter)
}

// ForceRefreshMeta is used to query metadata service and refresh the virtual group manager meta.
Expand All @@ -399,10 +408,10 @@ func (vgm *virtualGroupManager) ForceRefreshMeta() error {
}

// GenerateGlobalVirtualGroupMeta is used to generate a gvg meta.
func (vgm *virtualGroupManager) GenerateGlobalVirtualGroupMeta(genPolicy vgmgr.GenerateGVGSecondarySPsPolicy) (*vgmgr.GlobalVirtualGroupMeta, error) {
func (vgm *virtualGroupManager) GenerateGlobalVirtualGroupMeta(genPolicy vgmgr.GenerateGVGSecondarySPsPolicy, excludeSPsFilter vgmgr.ExcludeFilter) (*vgmgr.GlobalVirtualGroupMeta, error) {
vgm.mutex.RLock()
defer vgm.mutex.RUnlock()
return vgm.spManager.generateVirtualGroupMeta(genPolicy, vgmgr.NewExcludeIDFilter(vgm.freezeSPPool.GetFreezeSPIDs()))
return vgm.spManager.generateVirtualGroupMeta(genPolicy, vgmgr.NewExcludeIDFilter(vgm.freezeSPPool.GetFreezeSPIDs()), excludeSPsFilter)
}

// PickSPByFilter is used to pick sp by filter check.
Expand Down Expand Up @@ -455,5 +464,21 @@ func (vgm *virtualGroupManager) releaseSPAndGVGLoop() {
ticker := time.NewTicker(ReleaseSPJobInterval)
for range ticker.C {
vgm.freezeSPPool.ReleaseSP()

vgm.mutex.RLock()
aliveSP := make([]*sptypes.StorageProvider, 0)
for _, sp := range vgm.spManager.otherSPs {
if sp.IsInService() {
aliveSP = append(aliveSP, sp)
}
}
vgm.mutex.RUnlock()
params, err := vgm.chainClient.QueryStorageParamsByTimestamp(context.Background(), time.Now().Unix())
if err != nil {
continue
}
if len(aliveSP)-len(vgm.freezeSPPool.GetFreezeSPIDs()) < int(params.GetRedundantDataChunkNum()+params.GetRedundantParityChunkNum()) {
vgm.freezeSPPool.ReleaseAllSP()
}
}
}
11 changes: 8 additions & 3 deletions base/gfspvgmgr/virtual_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,23 @@ func Test_pickIndex(t *testing.T) {
}

func Test_pickVirtualGroupFamilySuccess(t *testing.T) {
gvg := make(map[uint32]*corevgmgr.GlobalVirtualGroupMeta)
gvg[1] = &corevgmgr.GlobalVirtualGroupMeta{
ID: 1,
FamilyID: 1,
}
vgfm := &virtualGroupFamilyManager{vgfIDToVgf: map[uint32]*corevgmgr.VirtualGroupFamilyMeta{
1: {FamilyUsedStorageSize: 1, FamilyStakingStorageSize: 100}}}
1: {FamilyUsedStorageSize: 1, FamilyStakingStorageSize: 100, GVGMap: gvg}}}
filter := corevgmgr.NewPickVGFFilter([]uint32{1, 2})
result, err := vgfm.pickVirtualGroupFamily(filter)
result, err := vgfm.pickVirtualGroupFamily(filter, corevgmgr.NewPickVGFByGVGFilter([]uint32{2, 3}))
assert.Nil(t, err)
assert.Nil(t, result)
}

func Test_pickVirtualGroupFamilyFailure(t *testing.T) {
vgfm := &virtualGroupFamilyManager{vgfIDToVgf: map[uint32]*corevgmgr.VirtualGroupFamilyMeta{}}
filter := &corevgmgr.PickVGFFilter{AvailableVgfIDSet: map[uint32]struct{}{}}
result, err := vgfm.pickVirtualGroupFamily(filter)
result, err := vgfm.pickVirtualGroupFamily(filter, corevgmgr.NewPickVGFByGVGFilter([]uint32{1, 2}))
assert.Equal(t, ErrFailedPickVGF, err)
assert.Nil(t, result)
}
32 changes: 29 additions & 3 deletions core/vgmgr/virtual_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,32 @@ func (p *PickVGFFilter) Check(vgfID uint32) bool {
return ok
}

type PickVGFByGVGFilter struct {
BlackListSPs map[uint32]struct{}
}

func NewPickVGFByGVGFilter(blackListSPs []uint32) *PickVGFByGVGFilter {
idSet := make(map[uint32]struct{})
for _, id := range blackListSPs {
idSet[id] = struct{}{}
}
return &PickVGFByGVGFilter{BlackListSPs: idSet}
}

// Check checks if a family has at least 1 valid GVG(gvg doest not contain blacklist Sp)
func (p *PickVGFByGVGFilter) Check(gvgs map[uint32]*GlobalVirtualGroupMeta) bool {
validCount := len(gvgs)
for _, gvg := range gvgs {
for _, sspID := range gvg.SecondarySPIDs {
if _, ok := p.BlackListSPs[sspID]; ok {
validCount--
break
}
}
}
return validCount > 0
}

// GVGPickFilter is used to check sp pick condition.
type GVGPickFilter interface {
// CheckFamily returns true when match pick request condition.
Expand Down Expand Up @@ -97,18 +123,18 @@ func (f *ExcludeIDFilter) Apply(id uint32) bool {
type VirtualGroupManager interface {
// PickVirtualGroupFamily pick a virtual group family(If failed to pick,
// new VGF will be automatically created on the chain) in get create bucket approval workflow.
PickVirtualGroupFamily() (*VirtualGroupFamilyMeta, error)
PickVirtualGroupFamily(filterByGvgList *PickVGFByGVGFilter) (*VirtualGroupFamilyMeta, error)
// PickGlobalVirtualGroup picks a global virtual group(If failed to pick,
// new GVG will be created by primary SP) in replicate/seal object workflow.
PickGlobalVirtualGroup(vgfID uint32) (*GlobalVirtualGroupMeta, error)
PickGlobalVirtualGroup(vgfID uint32, excludeGVGsFilter ExcludeFilter) (*GlobalVirtualGroupMeta, error)
// PickGlobalVirtualGroupForBucketMigrate picks a global virtual group(If failed to pick,
// new GVG will be created by primary SP) in replicate/seal object workflow.
PickGlobalVirtualGroupForBucketMigrate(filter GVGPickFilter) (*GlobalVirtualGroupMeta, error)
// ForceRefreshMeta is used to query metadata service and refresh the virtual group manager meta.
// if pick func returns ErrStaledMetadata, the caller need force refresh from metadata and retry pick.
ForceRefreshMeta() error
// GenerateGlobalVirtualGroupMeta is used to generate a new global virtual group meta, the caller need send a tx to chain.
GenerateGlobalVirtualGroupMeta(genPolicy GenerateGVGSecondarySPsPolicy) (*GlobalVirtualGroupMeta, error)
GenerateGlobalVirtualGroupMeta(genPolicy GenerateGVGSecondarySPsPolicy, excludeSPsFilter ExcludeFilter) (*GlobalVirtualGroupMeta, error)
// PickSPByFilter picks sp which is match pick filter condition.
PickSPByFilter(filter SPPickFilter) (*sptypes.StorageProvider, error)
// QuerySPByID returns sp proto.
Expand Down
2 changes: 2 additions & 0 deletions docs/run-book/config/config_template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,5 @@ SubscribeSwapOutExitEventIntervalSec = 0
SubscribeBucketMigrateEventIntervalSec = 0
# optional
GVGPreferSPList = []
# optional
SPBlackList = []
2 changes: 1 addition & 1 deletion modular/manager/bucket_migrate_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func calculateStakingSizeStrategy(manager *ManageModular) (denom string, amount
return "", sdkmath.ZeroInt(), err
}

gvgMeta, err := manager.virtualGroupManager.GenerateGlobalVirtualGroupMeta(NewGenerateGVGSecondarySPsPolicyByPrefer(params, manager.gvgPreferSPList))
gvgMeta, err := manager.virtualGroupManager.GenerateGlobalVirtualGroupMeta(NewGenerateGVGSecondarySPsPolicyByPrefer(params, manager.gvgPreferSPList), vgmgr.NewExcludeIDFilter(gfspvgmgr.NewIDSetFromList(manager.spBlackList)))
if err != nil {
return "", sdkmath.ZeroInt(), err
}
Expand Down
12 changes: 6 additions & 6 deletions modular/manager/manage_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"golang.org/x/exp/slices"

"github.com/bnb-chain/greenfield-storage-provider/base/gfspvgmgr"
"github.com/bnb-chain/greenfield-storage-provider/base/types/gfsperrors"
"github.com/bnb-chain/greenfield-storage-provider/base/types/gfspserver"
"github.com/bnb-chain/greenfield-storage-provider/base/types/gfsptask"
Expand Down Expand Up @@ -774,15 +775,14 @@ func (m *ManageModular) PickVirtualGroupFamily(ctx context.Context, task task.Ap
err error
vgf *vgmgr.VirtualGroupFamilyMeta
)

if vgf, err = m.virtualGroupManager.PickVirtualGroupFamily(); err != nil {
if vgf, err = m.virtualGroupManager.PickVirtualGroupFamily(vgmgr.NewPickVGFByGVGFilter(m.spBlackList)); err != nil {
// create a new gvg, and retry pick.
if err = m.createGlobalVirtualGroup(0, nil); err != nil {
log.CtxErrorw(ctx, "failed to create global virtual group", "task_info", task.Info(), "error", err)
return 0, err
}
m.virtualGroupManager.ForceRefreshMeta()
if vgf, err = m.virtualGroupManager.PickVirtualGroupFamily(); err != nil {
if vgf, err = m.virtualGroupManager.PickVirtualGroupFamily(vgmgr.NewPickVGFByGVGFilter(m.spBlackList)); err != nil {
log.CtxErrorw(ctx, "failed to pick vgf", "task_info", task.Info(), "error", err)
return 0, err
}
Expand Down Expand Up @@ -838,7 +838,7 @@ func (m *ManageModular) createGlobalVirtualGroup(vgfID uint32, params *storagety
return err
}
}
gvgMeta, err := m.virtualGroupManager.GenerateGlobalVirtualGroupMeta(NewGenerateGVGSecondarySPsPolicyByPrefer(params, m.gvgPreferSPList))
gvgMeta, err := m.virtualGroupManager.GenerateGlobalVirtualGroupMeta(NewGenerateGVGSecondarySPsPolicyByPrefer(params, m.gvgPreferSPList), vgmgr.NewExcludeIDFilter(gfspvgmgr.NewIDSetFromList(m.spBlackList)))
if err != nil {
return err
}
Expand All @@ -865,14 +865,14 @@ func (m *ManageModular) pickGlobalVirtualGroup(ctx context.Context, vgfID uint32
gvg *vgmgr.GlobalVirtualGroupMeta
)

if gvg, err = m.virtualGroupManager.PickGlobalVirtualGroup(vgfID); err != nil {
if gvg, err = m.virtualGroupManager.PickGlobalVirtualGroup(vgfID, vgmgr.NewExcludeIDFilter(m.gvgBlackList)); err != nil {
// create a new gvg, and retry pick.
if err = m.createGlobalVirtualGroup(vgfID, param); err != nil {
log.CtxErrorw(ctx, "failed to create global virtual group", "vgf_id", vgfID, "error", err)
return gvg, err
}
m.virtualGroupManager.ForceRefreshMeta()
if gvg, err = m.virtualGroupManager.PickGlobalVirtualGroup(vgfID); err != nil {
if gvg, err = m.virtualGroupManager.PickGlobalVirtualGroup(vgfID, vgmgr.NewExcludeIDFilter(m.gvgBlackList)); err != nil {
log.CtxErrorw(ctx, "failed to pick gvg", "vgf_id", vgfID, "error", err)
return gvg, err
}
Expand Down
16 changes: 16 additions & 0 deletions modular/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type ManageModular struct {

recoveryFailedList []string
recoveryTaskMap map[string]string
spBlackList []uint32
gvgBlackList vgmgr.IDSet
}

func (m *ManageModular) Name() string {
Expand Down Expand Up @@ -142,6 +144,20 @@ func (m *ManageModular) Start(ctx context.Context) error {
if err = m.LoadTaskFromDB(); err != nil {
return err
}
m.gvgBlackList = make(map[uint32]struct{}, 0)

if len(m.spBlackList) > 0 {
for _, sspID := range m.spBlackList {
sspJoinGVGs, err := m.baseApp.GfSpClient().ListGlobalVirtualGroupsBySecondarySP(ctx, sspID)
if err != nil {
log.Errorw("failed to list GVGs by secondary sp", "spID", sspID, "error", err)
return err
}
for _, gvg := range sspJoinGVGs {
m.gvgBlackList[gvg.Id] = struct{}{}
}
}
}

go m.delayStartMigrateScheduler()
go m.eventLoop(ctx)
Expand Down
2 changes: 2 additions & 0 deletions modular/manager/manager_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,5 +243,7 @@ func DefaultManagerOptions(manager *ManageModular, cfg *gfspconfig.GfSpConfig) (
manager.gvgPreferSPList = cfg.Manager.GVGPreferSPList
manager.recoveryTaskMap = make(map[string]string)

manager.spBlackList = cfg.Manager.SPBlackList

return nil
}

0 comments on commit dfc1124

Please sign in to comment.