Skip to content

Commit

Permalink
feat: searcher calculates cluster type (#1729)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Oct 9, 2022
1 parent 86a6030 commit f606a4f
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 44 deletions.
71 changes: 42 additions & 29 deletions manager/searcher/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,58 +32,62 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/pkg/math"
"d7y.io/dragonfly/v2/pkg/types"
)

const (
// Condition security domain key
// Condition security domain key.
ConditionSecurityDomain = "security_domain"

// Condition IDC key
// Condition IDC key.
ConditionIDC = "idc"

// Condition netTopology key
// Condition netTopology key.
ConditionNetTopology = "net_topology"

// Condition location key
// Condition location key.
ConditionLocation = "location"
)

const (
// SecurityDomain affinity weight
// clusterTypeWeight cluster type weight.
clusterTypeWeight float64 = 0.03

// SecurityDomain affinity weight.
securityDomainAffinityWeight float64 = 0.4

// IDC affinity weight
// IDC affinity weight.
idcAffinityWeight float64 = 0.3

// NetTopology affinity weight
// NetTopology affinity weight.
netTopologyAffinityWeight = 0.2

// Location affinity weight
locationAffinityWeight = 0.1
// Location affinity weight.
locationAffinityWeight = 0.07
)

const (
// Maximum score
// Maximum score.
maxScore float64 = 1.0

// Minimum score
// Minimum score.
minScore = 0
)

const (
// Maximum number of elements
// Maximum number of elements.
maxElementLen = 5
)

// Scheduler cluster scopes
// Scheduler cluster scopes.
type Scopes struct {
IDC string `mapstructure:"idc"`
Location string `mapstructure:"location"`
NetTopology string `mapstructure:"net_topology"`
}

type Searcher interface {
// FindSchedulerClusters finds scheduler clusters that best matches the evaluation
// FindSchedulerClusters finds scheduler clusters that best matches the evaluation.
FindSchedulerClusters(context.Context, []model.SchedulerCluster, *managerv1.ListSchedulersRequest) ([]model.SchedulerCluster, error)
}

Expand All @@ -100,7 +104,7 @@ func New(pluginDir string) Searcher {
return s
}

// FindSchedulerClusters finds scheduler clusters that best matches the evaluation
// FindSchedulerClusters finds scheduler clusters that best matches the evaluation.
func (s *searcher) FindSchedulerClusters(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *managerv1.ListSchedulersRequest) ([]model.SchedulerCluster, error) {
conditions := client.HostInfo
if len(conditions) <= 0 {
Expand Down Expand Up @@ -130,14 +134,14 @@ func (s *searcher) FindSchedulerClusters(ctx context.Context, schedulerClusters
return false
}

return Evaluate(conditions, si, clusters[i].SecurityGroup.SecurityRules) > Evaluate(conditions, sj, clusters[j].SecurityGroup.SecurityRules)
return Evaluate(conditions, si, clusters[i]) > Evaluate(conditions, sj, clusters[j])
},
)

return clusters, nil
}

// Filter the scheduler clusters that dfdaemon can be used
// Filter the scheduler clusters that dfdaemon can be used.
func FilterSchedulerClusters(conditions map[string]string, schedulerClusters []model.SchedulerCluster) []model.SchedulerCluster {
var clusters []model.SchedulerCluster
securityDomain := conditions[ConditionSecurityDomain]
Expand Down Expand Up @@ -178,29 +182,38 @@ func FilterSchedulerClusters(conditions map[string]string, schedulerClusters []m
return clusters
}

// Evaluate the degree of matching between scheduler cluster and dfdaemon
func Evaluate(conditions map[string]string, scopes Scopes, securityRules []model.SecurityRule) float64 {
return securityDomainAffinityWeight*calculateSecurityDomainAffinityScore(conditions[ConditionSecurityDomain], securityRules) +
// Evaluate the degree of matching between scheduler cluster and dfdaemon.
func Evaluate(conditions map[string]string, scopes Scopes, cluster model.SchedulerCluster) float64 {
return clusterTypeWeight*calculateClusterTypeScore(cluster) +
securityDomainAffinityWeight*calculateSecurityDomainAffinityScore(conditions[ConditionSecurityDomain], cluster.SecurityGroup.SecurityRules) +
idcAffinityWeight*calculateIDCAffinityScore(conditions[ConditionIDC], scopes.IDC) +
locationAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionLocation], scopes.Location) +
netTopologyAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionNetTopology], scopes.NetTopology)
}

// calculateSecurityDomainAffinityScore 0.0~1.0 larger and better
// calculateClusterTypeScore 0.0~1.0 larger and better.
func calculateClusterTypeScore(cluster model.SchedulerCluster) float64 {
if cluster.IsDefault {
return maxScore
}

return minScore
}

// calculateSecurityDomainAffinityScore 0.0~1.0 larger and better.
func calculateSecurityDomainAffinityScore(securityDomain string, securityRules []model.SecurityRule) float64 {
if securityDomain == "" {
return minScore
}

if len(securityRules) == 0 {
return minScore

}

return maxScore
}

// calculateIDCAffinityScore 0.0~1.0 larger and better
// calculateIDCAffinityScore 0.0~1.0 larger and better.
func calculateIDCAffinityScore(dst, src string) float64 {
if dst == "" || src == "" {
return minScore
Expand All @@ -213,7 +226,7 @@ func calculateIDCAffinityScore(dst, src string) float64 {
// Dst has only one element, src has multiple elements separated by "|".
// When dst element matches one of the multiple elements of src,
// it gets the max score of idc.
srcElements := strings.Split(src, "|")
srcElements := strings.Split(src, types.AffinitySeparator)
for _, srcElement := range srcElements {
if strings.EqualFold(dst, srcElement) {
return maxScore
Expand All @@ -223,7 +236,7 @@ func calculateIDCAffinityScore(dst, src string) float64 {
return minScore
}

// calculateMultiElementAffinityScore 0.0~1.0 larger and better
// calculateMultiElementAffinityScore 0.0~1.0 larger and better.
func calculateMultiElementAffinityScore(dst, src string) float64 {
if dst == "" || src == "" {
return minScore
Expand All @@ -233,13 +246,13 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {
return maxScore
}

// Calculate the number of multi-element matches divided by "|"
// Calculate the number of multi-element matches divided by "|".
var score, elementLen int
dstElements := strings.Split(dst, "|")
srcElements := strings.Split(src, "|")
dstElements := strings.Split(dst, types.AffinitySeparator)
srcElements := strings.Split(src, types.AffinitySeparator)
elementLen = math.Min(len(dstElements), len(srcElements))

// Maximum element length is 5
// Maximum element length is 5.
if elementLen > maxElementLen {
elementLen = maxElementLen
}
Expand Down
45 changes: 39 additions & 6 deletions manager/searcher/searcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,22 @@ func TestSchedulerCluster(t *testing.T) {
},
IsDefault: true,
},
{
Name: "baz",
Schedulers: []model.Scheduler{
{
HostName: "baz",
State: "active",
},
},
},
},
conditions: map[string]string{"security_domain": "domain-1"},
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data[0].Name, "bar")
assert.Equal(len(data), 1)
assert.Equal(data[1].Name, "baz")
assert.Equal(len(data), 2)
},
},
{
Expand Down Expand Up @@ -710,6 +720,28 @@ func TestSchedulerCluster(t *testing.T) {
},
},
},
{
Name: "bae",
Scopes: map[string]any{
"idc": "IDC-1",
"location": "LOCATION-2",
"net_topology": "NET_TOPOLOGY-1|NET_TOPOLOGY-2",
},
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
{
Domain: "DOMAIN-2",
},
},
},
Schedulers: []model.Scheduler{
{
HostName: "bae",
State: "active",
},
},
IsDefault: true,
},
},
conditions: map[string]string{
"security_domain": "domain-1",
Expand All @@ -719,11 +751,12 @@ func TestSchedulerCluster(t *testing.T) {
},
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data[0].Name, "bar")
assert.Equal(data[1].Name, "foo")
assert.Equal(data[2].Name, "baz")
assert.Equal(data[3].Name, "bax")
assert.Equal(len(data), 4)
assert.Equal(data[0].Name, "bae")
assert.Equal(data[1].Name, "bar")
assert.Equal(data[2].Name, "foo")
assert.Equal(data[3].Name, "baz")
assert.Equal(data[4].Name, "bax")
assert.Equal(len(data), 5)
},
},
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,8 @@ const (
// DfdaemonMetricsName is name of dfdaemon metrics.
DfdaemonMetricsName = "dfdaemon"
)

const (
// AffinitySeparator is separator of affinity.
AffinitySeparator = "|"
)
15 changes: 8 additions & 7 deletions scheduler/scheduler/evaluator/evaluator_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/math"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/resource"
)

Expand All @@ -34,8 +35,8 @@ const (
// Free load weight.
freeLoadWeight = 0.2

// Host type affinity weight.
hostTypeAffinityWeight = 0.2
// Host type weight.
hostTypeWeight = 0.2

// IDC affinity weight.
idcAffinityWeight = 0.15
Expand Down Expand Up @@ -86,7 +87,7 @@ func (eb *evaluatorBase) Evaluate(parent *resource.Peer, child *resource.Peer, t

return finishedPieceWeight*calculatePieceScore(parent, child, totalPieceCount) +
freeLoadWeight*calculateFreeLoadScore(parent.Host) +
hostTypeAffinityWeight*calculateHostTypeAffinityScore(parent) +
hostTypeWeight*calculateHostTypeScore(parent) +
idcAffinityWeight*calculateIDCAffinityScore(parent.Host, child.Host) +
netTopologyAffinityWeight*calculateMultiElementAffinityScore(parent.Host.NetTopology, child.Host.NetTopology) +
locationAffinityWeight*calculateMultiElementAffinityScore(parent.Host.Location, child.Host.Location)
Expand Down Expand Up @@ -119,8 +120,8 @@ func calculateFreeLoadScore(host *resource.Host) float64 {
return minScore
}

// calculateHostTypeAffinityScore 0.0~1.0 larger and better.
func calculateHostTypeAffinityScore(peer *resource.Peer) float64 {
// calculateHostTypeScore 0.0~1.0 larger and better.
func calculateHostTypeScore(peer *resource.Peer) float64 {
// When the task is downloaded for the first time,
// peer will be scheduled to seed peer first,
// otherwise it will be scheduled to dfdaemon first.
Expand Down Expand Up @@ -157,8 +158,8 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {

// Calculate the number of multi-element matches divided by "|".
var score, elementLen int
dstElements := strings.Split(dst, "|")
srcElements := strings.Split(src, "|")
dstElements := strings.Split(dst, types.AffinitySeparator)
srcElements := strings.Split(src, types.AffinitySeparator)
elementLen = math.Min(len(dstElements), len(srcElements))

// Maximum element length is 5.
Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduler/evaluator/evaluator_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestEvaluatorBase_calculateFreeLoadScore(t *testing.T) {
}
}

func TestEvaluatorBase_calculateHostTypeAffinityScore(t *testing.T) {
func TestEvaluatorBase_calculateHostTypeScore(t *testing.T) {
tests := []struct {
name string
mock func(peer *resource.Peer)
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestEvaluatorBase_calculateHostTypeAffinityScore(t *testing.T) {
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.mock(peer)
tc.expect(t, calculateHostTypeAffinityScore(peer))
tc.expect(t, calculateHostTypeScore(peer))
})
}
}
Expand Down

0 comments on commit f606a4f

Please sign in to comment.