Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: network topology package in scheduler #2380

Merged
merged 1 commit into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 27 additions & 26 deletions scheduler/networktopology/mocks/network_topology_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

104 changes: 39 additions & 65 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package networktopology

import (
"context"
"strconv"
"time"

"github.com/go-redis/redis/v8"

Expand All @@ -30,28 +30,33 @@ import (
"d7y.io/dragonfly/v2/scheduler/storage"
)

const (
// contextTimeout is the timeout of redis invoke.
contextTimeout = 2 * time.Minute
)

// NetworkTopology is an interface for network topology.
type NetworkTopology interface {
// ProbedCount is the number of times the host has been probed.
ProbedCount(hostID string) int64

// LoadDestHosts returns destination hosts for source host.
LoadDestHosts(hostID string) ([]string, bool)
// LoadDestHostIDs loads destination host ids by source host id.
LoadDestHostIDs(string) ([]string, error)

// DeleteHost deletes host.
DeleteHost(hostID string) error
DeleteHost(string) error

// StoreProbe stores probe between two hosts.
StoreProbe(src, dest string, probe *Probe) bool
// ProbedCount is the number of times the host has been probed.
ProbedCount(string) (uint64, error)

// LoadProbes loads probes by source host id and destination host id.
LoadProbes(string, string) Probes
}

// networkTopology is an implementation of network topology.
type networkTopology struct {
// Redis universal client interface.
rdb redis.UniversalClient

// Scheduler config.
config *config.Config
// config is the network topology config.
config config.NetworkTopologyConfig

// Resource interface.
resource resource.Resource
Expand All @@ -61,7 +66,7 @@ type networkTopology struct {
}

// New network topology interface.
func NewNetworkTopology(cfg *config.Config, rdb redis.UniversalClient, resource resource.Resource, storage storage.Storage) (NetworkTopology, error) {
func NewNetworkTopology(cfg config.NetworkTopologyConfig, rdb redis.UniversalClient, resource resource.Resource, storage storage.Storage) (NetworkTopology, error) {
return &networkTopology{
config: cfg,
rdb: rdb,
Expand All @@ -70,79 +75,48 @@ func NewNetworkTopology(cfg *config.Config, rdb redis.UniversalClient, resource
}, nil
}

// ProbedCount is the number of times the host has been probed.
func (n *networkTopology) ProbedCount(hostID string) int64 {
value, err := n.rdb.Get(context.Background(), pkgredis.MakeProbedCountKeyInScheduler(hostID)).Result()
if err != nil {
return 0
}

probedCount, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return 0
}

return probedCount
}

// LoadDestHosts returns destination hosts for source host.
func (n *networkTopology) LoadDestHosts(hostID string) ([]string, bool) {
key := pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*")
keys, err := n.rdb.Keys(context.Background(), key).Result()
if err != nil {
return []string{}, false
}
// LoadDestHostIDs loads destination host ids by source host id.
func (nt *networkTopology) LoadDestHostIDs(hostID string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

destHosts := make([]string, 0)
for _, k := range keys {
destHosts = append(destHosts, k[len(key)-1:])
}

return destHosts, true
return nt.rdb.Keys(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*")).Result()
}

// DeleteHost deletes host.
func (n *networkTopology) DeleteHost(hostID string) error {
// Delete network topology.
if err := n.rdb.Del(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*")).Err(); err != nil {
func (nt *networkTopology) DeleteHost(hostID string) error {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

if err := nt.rdb.Del(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*")).Err(); err != nil {
return err
}

// Delete probes sent by the host.
if err := n.rdb.Del(context.Background(), pkgredis.MakeProbesKeyInScheduler(hostID, "*")).Err(); err != nil {
if err := nt.rdb.Del(ctx, pkgredis.MakeProbesKeyInScheduler(hostID, "*")).Err(); err != nil {
return err
}

// Delete probes sent to the host, and return the number of probes deleted for updating probed count.
count, err := n.rdb.Del(context.Background(), pkgredis.MakeProbesKeyInScheduler("*", hostID)).Result()
count, err := nt.rdb.Del(ctx, pkgredis.MakeProbesKeyInScheduler("*", hostID)).Result()
if err != nil {
return err
}

// Decrease probed count of host.
if err = n.rdb.DecrBy(context.Background(), pkgredis.MakeProbedCountKeyInScheduler(hostID), count).Err(); err != nil {
if err = nt.rdb.DecrBy(ctx, pkgredis.MakeProbedCountKeyInScheduler(hostID), count).Err(); err != nil {
return err
}

return nil
}

// StoreProbe stores probe between two hosts.
func (n *networkTopology) StoreProbe(src, dest string, probe *Probe) bool {
probes := NewProbes(n.rdb, n.config.NetworkTopology.Probe.QueueLength, src, dest)
if err := probes.Enqueue(probe); err != nil {
return false
}

// Update probe count.
if err := n.rdb.Incr(context.Background(), pkgredis.MakeProbedCountKeyInScheduler(src)).Err(); err != nil {
return false
}
// ProbedCount is the number of times the host has been probed.
func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

// Update probed count.
if err := n.rdb.Incr(context.Background(), pkgredis.MakeProbedCountKeyInScheduler(dest)).Err(); err != nil {
return false
}
return nt.rdb.Get(ctx, pkgredis.MakeProbedCountKeyInScheduler(hostID)).Uint64()
}

return true
// LoadProbes loads probes by source host id and destination host id.
func (nt *networkTopology) LoadProbes(srcHostID, destHostID string) Probes {
return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID)
}
40 changes: 20 additions & 20 deletions scheduler/networktopology/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

"github.com/go-redis/redis/v8"

pkgredis "d7y.io/dragonfly/v2/pkg/redis"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/resource"
)

Expand Down Expand Up @@ -81,32 +81,32 @@ type Probes interface {
}

type probes struct {
// config is the probe config.
config config.ProbeConfig

// Redis universal client interface.
rdb redis.UniversalClient

// limit is the length limit of probe queue.
limit int

// src is the source host id.
src string
srcHostID string

// dest is the destination host id.
dest string
destHostID string
}

// NewProbes creates a probes interface.
func NewProbes(rdb redis.UniversalClient, limit int, src string, dest string) Probes {
func NewProbes(cfg config.ProbeConfig, rdb redis.UniversalClient, srcHostID string, destHostID string) Probes {
return &probes{
rdb: rdb,
limit: limit,
src: src,
dest: dest,
config: cfg,
rdb: rdb,
srcHostID: srcHostID,
destHostID: destHostID,
}
}

// Peek returns the oldest probe without removing it.
func (p *probes) Peek() (*Probe, bool) {
str, err := p.rdb.LIndex(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.src, p.dest), 0).Result()
str, err := p.rdb.LIndex(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0).Result()
if err != nil {
return nil, false
}
Expand All @@ -122,13 +122,13 @@ func (p *probes) Peek() (*Probe, bool) {
// Enqueue enqueues probe into the queue.
func (p *probes) Enqueue(probe *Probe) error {
length := p.Length()
if length == int64(p.limit) {
if length == int64(p.config.QueueLength) {
if _, ok := p.Dequeue(); !ok {
return errors.New("remove the oldest probe error")
}
}

probesKey := pkgredis.MakeProbesKeyInScheduler(p.src, p.dest)
probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)
data, err := json.Marshal(probe)
if err != nil {
return err
Expand All @@ -138,7 +138,7 @@ func (p *probes) Enqueue(probe *Probe) error {
return err
}

networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.src, p.dest)
networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID)
if length == 0 {
if _, err := p.rdb.Pipelined(context.Background(), func(rdb redis.Pipeliner) error {
rdb.HSet(context.Background(), networkTopologyKey, "averageRTT", probe.RTT.Nanoseconds())
Expand Down Expand Up @@ -180,7 +180,7 @@ func (p *probes) Enqueue(probe *Probe) error {

// Dequeue removes and returns the oldest probe.
func (p *probes) Dequeue() (*Probe, bool) {
str, err := p.rdb.LPop(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.src, p.dest)).Result()
str, err := p.rdb.LPop(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result()
if err != nil {
return nil, false
}
Expand All @@ -195,7 +195,7 @@ func (p *probes) Dequeue() (*Probe, bool) {

// Length gets the length of probes.
func (p *probes) Length() int64 {
length, err := p.rdb.LLen(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.src, p.dest)).Result()
length, err := p.rdb.LLen(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result()
if err != nil {
return 0
}
Expand All @@ -205,7 +205,7 @@ func (p *probes) Length() int64 {

// CreatedAt is the creation time of probes.
func (p *probes) CreatedAt() time.Time {
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.src, p.dest), "createdAt").Result()
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "createdAt").Result()
if err != nil {
return time.Time{}
}
Expand All @@ -220,7 +220,7 @@ func (p *probes) CreatedAt() time.Time {

// UpdatedAt is the updated time to store probe.
func (p *probes) UpdatedAt() time.Time {
value, err := p.rdb.HGet(context.Background(), fmt.Sprintf("network-topology:%s:%s", p.src, p.dest), "updatedAt").Result()
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "updatedAt").Result()
if err != nil {
return time.Time{}
}
Expand All @@ -235,7 +235,7 @@ func (p *probes) UpdatedAt() time.Time {

// AverageRTT is the average round-trip time of probes.
func (p *probes) AverageRTT() time.Duration {
value, err := p.rdb.HGet(context.Background(), fmt.Sprintf("network-topology:%s:%s", p.src, p.dest), "averageRTT").Result()
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "averageRTT").Result()
if err != nil {
return time.Duration(0)
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err

// Initialize network topology service.
if cfg.NetworkTopology.Enable {
s.networkTopology, err = networktopology.NewNetworkTopology(cfg, rdb, resource, s.storage)
s.networkTopology, err = networktopology.NewNetworkTopology(cfg.NetworkTopology, rdb, resource, s.storage)
if err != nil {
return nil, err
}
Expand Down